

Informer 本身逻辑主要聚合在 cache 包中,cache 包设计了以下几个核心接口或类:

  • ListerWatcher 是对原始接口的封装,包括 List 和 Watch 两个方法,是数据的源头。
  • Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,比如Add, Update, Delete, List, Get 等等,Store 有 Queue 和 Indexer 两个扩展接口。
  • Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。在 cache 包中 Queue 被设计用于缓存变更 (Delta),变更就是引发状态改变的事件 (Event)。通过 Queue 将事件产生和事件处理解耦。
  • Reflector 是连接上述 ListerWatcher 和 Queue 的桥梁,将原始数据转换为统一的队列,方便控制器处理。
  • Indexer 也扩展自 Store,在 Store 的基础上增加了多索引设计,即允许对对象采用多种方式建立索引。在 cache 包中 Store 的作用之一是用于保存当前的资源状态,Indexer 继承了 Store 的这部分功能,在重新同步 (resync) 过程中起重要作用。
  • Controller 接口是对某一组控制逻辑的封装,最主要的一个方法是 Run(stopCh <-chan struct{}),一般设计为无限的循环,不断消费队列,直到 stopCh 被关闭。
  • Informer指的是一类专门用于实现 Informer 机制的 Controller。
  • SharedInformer 是对 Controller 的再次封装,目的是实现多个处理程序对同一个资源的事件响应,从 AddEventHandler 这个方法就可以大致明白 SharedInformer 和 Informer(一类专门用于实现 Informer 机制的 Controller) 的区别。
  • SharedIndexInformer 和 SharedInformer 作用是一样的,唯一区别在于 SharedIndexInformer 使用了 Indexer。




// Lister is any object that knows how to perform an initial list.
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {

可以看到ListerWatcher接口是一个组合接口,包含了Lister和Watcher这两个接口,分别包含List和Watch方法,这两个方法都是与资源类型无关的,操作的都是 runtime.Object 对象,在使用的时候一般需要进行反射或转换为具体的类型。


type ListWatch struct {
    ListFunc  ListFunc
    WatchFunc WatchFunc
    // DisableChunking requests no chunking for this list watcher.
    DisableChunking bool

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)


package main

import (
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

func mustClientset() kubernetes.Interface {
    kubeconfig := os.Getenv("KUBECONFIG")

    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)

    clientset, err := kubernetes.NewForConfig(config)

    return clientset

// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
    clientset := mustClientset()              // 创建clientset
    client := clientset.CoreV1().RESTClient() // 客户端,请求器
    resource := "configmaps"                  // GET 请求参数之一
    namespace := "tmp"                        // GET 请求参数之一
    selector := fields.Everything()           // GET 请求参数之一
    lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
    return lw

func main() {
    //clientset := mustClientset()
    //namespaces, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
    //for _, namespace := range namespaces.Items {
    //  fmt.Println(namespace.Name)

    fmt.Println("----- 1-list-watcher -----")

    lw := newConfigMapsListerWatcher()

    // list 的类型为 runtime.Object, 需要经过反射或类型转换才能使用,
    // 传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector
    list, err := lw.List(metav1.ListOptions{})

    // meta 包封装了一些处理 runtime.Object 对象的方法,屏蔽了反射和类型转换的过程,
    // 提取出的 items 类型为 []runtime.Object
    items, err := meta.ExtractList(list)

    fmt.Println("Initial list:")

    for _, item := range items {
        configMap, ok := item.(*corev1.ConfigMap)
        if !ok {

        // 如果只关注 meta 信息,也可以不进行类型转换,而是使用 meta.Accessor 方法
        // accessor, err := meta.Accessor(item)
        // magicconch.Must(err)
        // fmt.Println(accessor.GetName())

    listMetaInterface, err := meta.ListAccessor(list)

    // resourceVersion 在同步过程中非常重要,看下面它在 Watch 接口中的使用
    resourceVersion := listMetaInterface.GetResourceVersion()

    // w 的类型为 watch.Interface,提供 ResultChan 方法读取事件,
    // 和 List 一样,传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector,
    // ResourceVersion 是 Watch 时非常重要的参数,
    // 它代表一次客户端与服务器进行交互时对应的资源版本,
    // 结合另一个参数 ResourceVersionMatch,表示本次请求对 ResourceVersion 的筛选,
    // 比如以下请求表示:获取版本新于 resourceVersion 的事件。
    // 在考虑连接中断和定期重新同步 (resync) 的情况下,
    // 对 ResourceVersion 的管理就变得更为复杂,我们先不考虑这些情况。
    w, err := lw.Watch(metav1.ListOptions{
        ResourceVersion: resourceVersion,

    stopCh := make(chan os.Signal)
    signal.Notify(stopCh, os.Interrupt)

    fmt.Println("Start watching...")

    for {
        select {
        case <-stopCh:
            break loop
        case event, ok := <-w.ResultChan():
            if !ok {
                fmt.Println("Broken channel")
                break loop
            configMap, ok := event.Object.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("%s: %s\n", event.Type, configMap.Name)


----- 1-list-watcher -----
Initial list:
Start watching...
ADDED: demo



// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    optionsModifier := func(options *metav1.ListOptions) {
        options.FieldSelector = fieldSelector.String()
    return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)

// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
        return c.Get().
            VersionedParams(&options, metav1.ParameterCodec).
    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        options.Watch = true
        return c.Get().
            VersionedParams(&options, metav1.ParameterCodec).
    return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}








// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    // We specifically don't want to rate limit watches, so we
    // don't use r.rateLimiter here.
    if r.err != nil {
        return nil, r.err

    client := r.c.Client
    if client == nil {
        client = http.DefaultClient

    isErrRetryableFunc := func(request *http.Request, err error) bool {
        // The watch stream mechanism handles many common partial data errors, so closed
        // connections can be retried in many cases.
        if net.IsProbableEOF(err) || net.IsTimeout(err) {
            return true
        return false
    var retryAfter *RetryAfter
    url := r.URL().String()
    for {
        req, err := r.newHTTPRequest(ctx)
        if err != nil {
            return nil, err

        if retryAfter != nil {
            // We are retrying the request that we already send to apiserver
            // at least once before.
            // This request should also be throttled with the client-internal rate limiter.
            if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
                return nil, err
            retryAfter = nil

        resp, err := client.Do(req)
        updateURLMetrics(ctx, r, resp, err)
        if r.c.base != nil {
            if err != nil {
                r.backoff.UpdateBackoff(r.c.base, err, 0)
            } else {
                r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
        if err == nil && resp.StatusCode == http.StatusOK {
            return r.newStreamWatcher(resp)

        done, transformErr := func() (bool, error) {
            defer readAndCloseResponseBody(resp)

            var retry bool
            retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
            if retry {
                err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
                if err == nil {
                    return false, nil
                klog.V(4).Infof("Could not retry request - %v", err)

            if resp == nil {
                // the server must have sent us an error in 'err'
                return true, nil
            if result := r.transformResponse(resp, req); result.err != nil {
                return true, result.err
            return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
        if done {
            if isErrRetryableFunc(req, err) {
                return watch.NewEmptyWatch(), nil
            if err == nil {
                // if the server sent us an HTTP Response object,
                // we need to return the error object from that.
                err = transformErr
            return nil, err


// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
    // Stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.

    // Returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, the implementation will close this channel and
    // release any resources used by the watch.
    ResultChan() <-chan Event



  • 以上对事件响应是同步的,如果执行复杂的操作会引起阻塞,需要引入队列。
  • 以上代码缺少重新连接和重新同步机制,有可能出现数据不一致问题。


Store 是缓存用的对象存储,包括对对象进行增删改查的基本方法,我们可以通过这个接口定义来看

type Store interface {

    // Add adds the given object to the accumulator associated with the given object's key
    Add(obj interface{}) error

    // Update updates the given object in the accumulator associated with the given object's key
    Update(obj interface{}) error

    // Delete deletes the given object from the accumulator associated with the given object's key
    Delete(obj interface{}) error

    // List returns a list of all the currently non-empty accumulators
    List() []interface{}

    // ListKeys returns a list of all the keys currently associated with non-empty accumulators
    ListKeys() []string

    // Get returns the accumulator associated with the given object's key
    Get(obj interface{}) (item interface{}, exists bool, err error)

    // GetByKey returns the accumulator associated with the given key
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list, you should not reference
    // it after calling this function.
    Replace([]interface{}, string) error

    // Resync is meaningless in the terms appearing here but has
    // meaning in some implementations that have non-trivial
    // additional behavior (e.g., DeltaFIFO).
    Resync() error


type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore
    // keyFunc is used to make the key for objects stored in and retrieved from items, and
    // should be deterministic.
    keyFunc KeyFunc


type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
    // Resync is a no-op and is deprecated
    Resync() error


type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices




Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,用于实现先进先出 (FIFO) 队列。

type Queue interface {

    // Pop blocks until there is at least one key to process or the
    // Queue is closed.  In the latter case Pop returns with an error.
    // In the former case Pop atomically picks one key to process,
    // removes that (key, accumulator) association from the Store, and
    // processes the accumulator.  Pop returns the accumulator that
    // was processed and the result of processing.  The PopProcessFunc
    // may return an ErrRequeue{inner} and in this case Pop will (a)
    // return that (key, accumulator) association to the Queue as part
    // of the atomic processing and (b) return the inner error from
    // Pop.
    Pop(PopProcessFunc) (interface{}, error)

    // AddIfNotPresent puts the given accumulator into the Queue (in
    // association with the accumulator's key) if and only if that key
    // is not already associated with a non-empty accumulator.
    AddIfNotPresent(interface{}) error

    // HasSynced returns true if the first batch of keys have all been
    // popped.  The first batch of keys are those of the first Replace
    // operation if that happened before any Add, AddIfNotPresent,
    // Update, or Delete; otherwise the first batch is empty.
    HasSynced() bool

    // Close the queue



type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // `items` maps a key to a Deltas.
    // Each such Deltas has at least one Delta.
    items map[string]Deltas

    // `queue` maintains FIFO order of keys for consumption in Pop().
    // There are no duplicates in `queue`.
    // A key is in `queue` if and only if it is in `items`.
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update/AddIfNotPresent was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known" --- affecting Delete(),
    // Replace(), and Resync()
    knownObjects KeyListerGetter

    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed bool

    // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
    // DeltaType when Replace() is called (to preserve backwards compat).
    emitDeltaTypeReplaced bool


Reflector 被设计来解决listwatcher中提到的两个问题。我们直接重实例使用开始看起

package main

import (
    corev1 "k8s.io/api/core/v1"

// newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
func newStore() cache.Store {
    return cache.NewStore(cache.MetaNamespaceKeyFunc)

// newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
// 注意在初始化时 store 作为 KnownObjects 参数传入其中,
// 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
// 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
// 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
func newQueue(store cache.Store) cache.Queue {
    return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
        KnownObjects:          store,
        EmitDeltaTypeReplaced: true,

// newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
// 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
    lw := newConfigMapsListerWatcher() // 前面有说明
    // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
    // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
    // 第 4 个参数是 resyncPeriod,
    // 这里传了 0,表示从不重新同步(除非连接超时或者中断),
    // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
    // 重新同步过程中会产生 SYNC 类型的事件。
    return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)

func main() {
    fmt.Println("----- 2-reflector -----")

    store := newStore()
    queue := newQueue(store)
    reflector := newConfigMapsReflector(queue)

    stopCh := make(chan struct{})
    defer close(stopCh)

    // reflector 开始运行后,队列中就会推入新收到的事件
    go reflector.Run(stopCh)

    // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
    // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
    processObj := func(obj interface{}) error {
        // 最先收到的事件会被最先处理
        for _, d := range obj.(cache.Deltas) {
            switch d.Type {
            case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                if _, exists, err := store.Get(d.Object); err == nil && exists {
                    if err := store.Update(d.Object); err != nil {
                        return err
                } else {
                    if err := store.Add(d.Object); err != nil {
                        return err
            case cache.Deleted:
                if err := store.Delete(d.Object); err != nil {
                    return err
            configMap, ok := d.Object.(*corev1.ConfigMap)
            if !ok {
                return fmt.Errorf("not config: %T", d.Object)
            fmt.Printf("%s: %s\n", d.Type, configMap.Name)
        return nil

    fmt.Println("Start syncing...")

    // 持续运行直到 stopCh 关闭
    wait.Until(func() {
        for {
            _, err := queue.Pop(processObj)
    }, time.Second, stopCh)


----- 2-reflector -----
Start syncing...
Replaced: demo1
Replaced: demo
Deleted: demo
Added: demo





// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // The name of the type we expect to place in the store. The name
    // will be the stringification of expectedGVK if provided, and the
    // stringification of expectedType otherwise. It is for display
    // only, and should not be used for parsing or comparison.
    expectedTypeName string
    // An example object of the type we expect to place in the store.
    // Only the type needs to be right, except that when that is
    // `unstructured.Unstructured` the object's `"apiVersion"` and
    // `"kind"` must also be right.
    expectedType reflect.Type
    // The GVK of the object we expect to place in the store if unstructured.
    expectedGVK *schema.GroupVersionKind
    // The destination to sync up with the watch source
    store Store
    // listerWatcher is used to perform lists and watches.
    listerWatcher ListerWatcher

    // backoff manages backoff of ListWatch
    backoffManager wait.BackoffManager
    // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
    initConnBackoffManager wait.BackoffManager

    resyncPeriod time.Duration
    // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock
    // paginatedResult defines whether pagination should be forced for list calls.
    // It is set based on the result of the initial list call.
    paginatedResult bool
    // lastSyncResourceVersion is the resource version token last
    // observed when doing a sync with the underlying store
    // it is thread safe, but not synchronized with the underlying store
    lastSyncResourceVersion string
    // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
    // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
    isLastSyncResourceVersionUnavailable bool
    // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
    // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
    // it will turn off pagination to allow serving them from watch cache.
    // NOTE: It should be used carefully as paginated lists are always served directly from
    // etcd, which is significantly less efficient and may lead to serious performance and
    // scalability problems.
    WatchListPageSize int64
    // Called whenever the ListAndWatch drops the connection with an error.
    watchErrorHandler WatchErrorHandler



func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
        case <-listCh:
        if err != nil {
            return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v: %v", list, err)

        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
        initTrace.Step("Objects extracted")
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("unable to sync list result: %v", err)
        initTrace.Step("SyncWith done")
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
            case <-cancelCh:
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
            resyncCh, cleanup = r.resyncChan()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()

        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case begin exponentially backing off and resend watch request.
            // Do the same for "429" errors.
            if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
            return err

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                case apierrors.IsTooManyRequests(err):
                    klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
            return nil



  • 调用listerWatcher.List方法,获取资源下的所有对象的数据,这个方法会通过api调用到apiServer获取资源列表;
  • 调用listMetaInterface.GetResourceVersion获取资源版本号;
  • 调用meta.ExtractList方法将资源数据转换成资源对象列表;
  • 将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO队列中;
  • 最后调用setLastSyncResourceVersion方法更新资源版本号;



  • 循环调用clientset客户端api与apiServer建立长连接,监控指定资源的变更,如果监控到有资源变更,那么会调用watchHandler处理资源的变更事件。


// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            *resourceVersion = newResourceVersion
            if rvu, ok := r.store.(ResourceVersionUpdater); ok {

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil

watchHandler方法会根据传入的资源类型调用不同的方法转换成不同的Delta然后存入到DeltaFIFO队列中。避免了阻塞问题,但对队列的消费仍然是同步的,需要再实现多 worker 才能提高效率,这一部分实现不包含在 cache 包中

Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多,感兴趣可以深入阅读源码的实现细节。



package main

import (
    corev1 "k8s.io/api/core/v1"

func newController() cache.Controller {
    fmt.Println("----- 3-controller -----")

    lw := newConfigMapsListerWatcher()
    store := newStore()
    queue := newQueue(store)
    cfg := &cache.Config{
        Queue:            queue,
        ListerWatcher:    lw,
        ObjectType:       &corev1.ConfigMap{},
        FullResyncPeriod: 0,
        RetryOnError:     false,
        Process: func(obj interface{}) error {
            for _, d := range obj.(cache.Deltas) {
                switch d.Type {
                case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                    if _, exists, err := store.Get(d.Object); err == nil && exists {
                        if err := store.Update(d.Object); err != nil {
                            return err
                    } else {
                        if err := store.Add(d.Object); err != nil {
                            return err
                case cache.Deleted:
                    if err := store.Delete(d.Object); err != nil {
                        return err
                configMap, ok := d.Object.(*corev1.ConfigMap)
                if !ok {
                    return fmt.Errorf("not config: %T", d.Object)
                fmt.Printf("%s: %s\n", d.Type, configMap.Name)
            return nil
    return cache.New(cfg)

func main() {
    controller := newController()

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go controller.Run(stopCh)





// Config contains all the settings for one of these low-level controllers.
type Config struct {
    // The queue for your objects - has to be a DeltaFIFO due to
    // assumptions in the implementation. Your Process() function
    // should accept the output of this Queue's Pop() method.

    // Something that can list and watch your objects.

    // Something that can process a popped Deltas.
    Process ProcessFunc

    // ObjectType is an example object of the type this controller is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    ObjectType runtime.Object

    // FullResyncPeriod is the period at which ShouldResync is considered.
    FullResyncPeriod time.Duration

    // ShouldResync is periodically used by the reflector to determine
    // whether to Resync the Queue. If ShouldResync is `nil` or
    // returns true, it means the reflector should proceed with the
    // resync.
    ShouldResync ShouldResyncFunc

    // If true, when Process() returns an error, re-enqueue the object.
    // TODO: add interface to let you inject a delay/backoff or drop
    //       the object completely if desired. Pass the object in
    //       question to this interface as a parameter.  This is probably moot
    //       now that this functionality appears at a higher level.
    RetryOnError bool

    // Called whenever the ListAndWatch drops the connection with an error.
    WatchErrorHandler WatchErrorHandler

    // WatchListPageSize is the requested chunk size of initial and relist watch lists.
    WatchListPageSize int64



type Controller interface {
    // Run does two things.  One is to construct and run a Reflector
    // to pump objects/notifications from the Config's ListerWatcher
    // to the Config's Queue and possibly invoke the occasional Resync
    // on that Queue.  The other is to repeatedly Pop from the Queue
    // and process with the Config's ProcessFunc.  Both of these
    // continue until `stopCh` is closed.
    Run(stopCh <-chan struct{})

    // HasSynced delegates to the Config's Queue
    HasSynced() bool

    // LastSyncResourceVersion delegates to the Reflector when there
    // is one, otherwise returns the empty string
    LastSyncResourceVersion() string


func New(c *Config) Controller {
    ctlr := &controller{
        config: *c,
        clock:  &clock.RealClock{},
    return ctlr

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock


func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
    r := NewReflector(
    r.ShouldResync = c.config.ShouldResync
    r.WatchListPageSize = c.config.WatchListPageSize
    r.clock = c.clock
    if c.config.WatchErrorHandler != nil {
        r.watchErrorHandler = c.config.WatchErrorHandler

    c.reflector = r

    var wg wait.Group

    wg.StartWithChannel(stopCh, r.Run)

    wait.Until(c.processLoop, time.Second, stopCh)



Informer指的是一类专门用于实现 Informer 机制的 Controller,其实也是对controller的封装,主要封装了转化的逻辑(obj 由 cache.Deltas 变成了 *corev1.ConfigMap)。


package main

import (
    corev1 "k8s.io/api/core/v1"

func main() {
    fmt.Println("----- 4-informer -----")

    lw := newConfigMapsListerWatcher()
    // 第一个返回的参数是 cache.Store,这里暂时用不到所以直接丢弃
    _, controller := cache.NewInformer(lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("created: %s\n", configMap.Name)
        UpdateFunc: func(old, new interface{}) {
            configMap, ok := old.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("updated: %s\n", configMap.Name)
        DeleteFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("deleted: %s\n", configMap.Name)

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go controller.Run(stopCh)



----- 4-informer -----
Start syncing....
created: demo
created: demo1
deleted: demo
created: demo


1、创建Indexer, Controller两个接口

func NewInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
) (Store, Controller) {
    // This will hold the client state, as we know it.
    clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

    return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)


func newInformer(
    lw ListerWatcher,
    objType runtime.Object,
    resyncPeriod time.Duration,
    h ResourceEventHandler,
    clientState Store,
) Controller {
    // This will hold incoming changes. Note how we pass clientState in as a
    // KeyLister, that way resync operations will result in the correct set
    // of update/delete deltas.
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          clientState,
        EmitDeltaTypeReplaced: true,

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    lw,
        ObjectType:       objType,
        FullResyncPeriod: resyncPeriod,
        RetryOnError:     false,

        Process: func(obj interface{}) error {
            // from oldest to newest
            for _, d := range obj.(Deltas) {
                switch d.Type {
                case Sync, Replaced, Added, Updated:
                    if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                        if err := clientState.Update(d.Object); err != nil {
                            return err
                        h.OnUpdate(old, d.Object)
                    } else {
                        if err := clientState.Add(d.Object); err != nil {
                            return err
                case Deleted:
                    if err := clientState.Delete(d.Object); err != nil {
                        return err
            return nil
    return New(cfg)




在上面的 Informer 设计当中,处理程序作为一个参数传入 NewInformer,如果有另一个处理程序需要处理相同资源,需要另外创建一个 Informer 对象,而队列是不能复用的,因为队列不支持两个消费者同时消费,为了解决这个问题,cache 包中又设计了 SharedInformer,顾名思义,就是多个处理程序共享的 Informer

package main

import (
    corev1 "k8s.io/api/core/v1"

func main() {
    fmt.Println("----- 5-shared-informer -----")

    lw := newConfigMapsListerWatcher()
    sharedInformer := cache.NewSharedInformer(lw, &corev1.ConfigMap{}, 0)
    // 添加一个处理程序
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("created, printing namespace: %s\n", configMap.Namespace)
    // 添加另一个处理程序
        AddFunc: func(obj interface{}) {
            configMap, ok := obj.(*corev1.ConfigMap)
            if !ok {
            fmt.Printf("created, printing name: %s\n", configMap.Name)

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go sharedInformer.Run(stopCh)



----- 5-shared-informer -----
Start syncing....
created, printing namespace: tmp
created, printing namespace: tmp
created, printing name: demo
created, printing name: demo1



type SharedInformer interface {
    // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
    // period.  Events to a single handler are delivered sequentially, but there is no coordination
    // between different handlers.
    AddEventHandler(handler ResourceEventHandler)
    // AddEventHandlerWithResyncPeriod adds an event handler to the
    // shared informer with the requested resync period; zero means
    // this handler does not care about resyncs.  The resync operation
    // consists of delivering to the handler an update notification
    // for every object in the informer's local cache; it does not add
    // any interactions with the authoritative storage.  Some
    // informers do no resyncs at all, not even for handlers added
    // with a non-zero resyncPeriod.  For an informer that does
    // resyncs, and for each handler that requests resyncs, that
    // informer develops a nominal resync period that is no shorter
    // than the requested period but may be longer.  The actual time
    // between any two resyncs may be longer than the nominal period
    // because the implementation takes time to do work and there may
    // be competing load and scheduling noise.
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // GetStore returns the informer's local cache as a Store.
    GetStore() Store
    // GetController is deprecated, it does nothing useful
    GetController() Controller
    // Run starts and runs the shared informer, returning after it stops.
    // The informer will be stopped when stopCh is closed.
    Run(stopCh <-chan struct{})
    // HasSynced returns true if the shared informer's store has been
    // informed by at least one full LIST of the authoritative state
    // of the informer's object collection.  This is unrelated to "resync".
    HasSynced() bool
    // LastSyncResourceVersion is the resource version observed when last synced with the underlying
    // store. The value returned is not synchronized with access to the underlying store and is not
    // thread-safe.
    LastSyncResourceVersion() string

    // The WatchErrorHandler is called whenever ListAndWatch drops the
    // connection with an error. After calling this handler, the informer
    // will backoff and retry.
    // The default implementation looks at the error type and tries to log
    // the error message at an appropriate level.
    // There's only one handler, so if you call this multiple times, last one
    // wins; calling after the informer has been started returns an error.
    // The handler is intended for visibility, not to e.g. pause the consumers.
    // The handler should return quickly - any expensive processing should be
    // offloaded.
    SetWatchErrorHandler(handler WatchErrorHandler) error


// NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
    return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    return sharedIndexInformer


type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector MutationDetector

    listerWatcher ListerWatcher

    // objectType is an example object of the type this informer is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    objectType runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex

    // Called whenever the ListAndWatch drops the connection with an error.
    watchErrorHandler WatchErrorHandler


type SharedIndexInformer interface {
    // AddIndexers add indexers to the informer before it starts.
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer



func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()

fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,

cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,

    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,

func() {
    defer s.startedLock.Unlock()

    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait()              // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
    defer s.startedLock.Unlock()
    s.stopped = true // Don't want any new listeners




在前面使用的 Reflector, Informer 和 SharedInformer 中,在实现事件响应的同时,我们还维护了一份当前最新的资源状态,就是代码中用到的 Store。在 Reflector 演示代码中 Store 由我们自己构建并传入 NewReflector,在 Informer 演示代码中 Store 对象由 NewInformer 返回,在 SharedInformer 演示代码中 Store 对象也可以通过 GetStore 方法获取。 这份最新的资源状态是非常有用的,在很多控制逻辑中,除了事件本身涉及的资源外,还需要关心其他资源的状态,就需要用到 Store 对象。 但是,Store 对象只实现了基本的增删改查功能,就 “查” 而言,只实现了 Get, GetByKey, List, ListKeys 4 个方法,对象的 “key” 是用构建 Store 时指定的 KeyFunc 计算得出的,绝大多数情况下使用的都是 cache.MetaNamespaceKeyFunc 这个方法


  • 根据 namespace 加 name 查找某一个资源;
  • 列出所有资源。


  • 根据 MetaNamespace 之外的某个字段查找资源,比如 label, annotation, status 等等;
  • 根据条件列出部分资源,比如某个 namespace 下的资源,就没有办法通过 Store 做到,只能用 ListerWatcher 直接调用接口,没有本地缓存。

Indexer 就是设计来解决这个问题,功能如其名,就是用来建立索引,我们还是通过实例。

package main

import (
    corev1 "k8s.io/api/core/v1"

func main() {
    fmt.Println("----- 6-indexer -----")

    lw := newConfigMapsListerWatcher()
    indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
    // 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
    // 实际使用中一般不会这样做
    indexer, informer := cache.NewIndexerInformer(
        lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{}, indexers)

    stopCh := make(chan struct{})
    defer close(stopCh)

    fmt.Println("Start syncing....")

    go informer.Run(stopCh)

    // 在 informer 首次同步完成后再操作
    if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
        panic("timed out waiting for caches to sync")

    // 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
    keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
    for _, k := range keys {


----- 6-indexer -----
Start syncing....


1、cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} 是 informers 包中默认的索引,通过这个索引可以根据 namespace 列出资源



  • New
  • NewInformer
  • NewIndexerInformer
  • NewSharedInformer
  • NewSharedIndexInformer

它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数,我们需要先构建 ListWatcher 和 Indexers。

这样的封装明显还不够好,informers 包采用了工厂模式:为每种内置的资源类型创建对应的 Informer 工厂类,要使用某种资源 Informer 的时候直接使用工厂类构建,其实着就是我们最常用的方式,详细的讲解我们在上一篇k8s-informer使用已经说过。

