Kubernetes Informer - Reflector 篇

Mar 7, 2021 17:30 · 3436 words · 7 minute read Kubernetes Golang

什么是 Informer?

Informer(也叫做 SharedInformer)是 Kubernetes 控制器(controller)中的模块,是控制器调谐循环(reconcile loop)与 Kubernetes apiserver 事件(也就是 etcd 中 Kubernetes API 数据变化)挂接的桥梁,我们通过 apiserver 增删改某个 Kubernetes API 对象,该资源对应的控制器中的 Informer 会立即感知到这个事件并作出调谐。

先大致看一眼源码 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L168-L174

// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
    SharedInformer
    // AddIndexers add indexers to the informer before it starts.
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}

https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L257-L301

type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector MutationDetector

    listerWatcher ListerWatcher

    // objectType is an example object of the type this informer is
    // expected to handle.  Only the type needs to be right, except
    // that when that is `unstructured.Unstructured` the object's
    // `"apiVersion"` and `"kind"` must also be right.
    objectType runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    resyncCheckPeriod time.Duration
    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    blockDeltas sync.Mutex
}

嵌套的 SharedInformer 定义中注释有点多,就不贴源码了 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L34-L166

如何使用 Informer?

只需寥寥几行代码即可在自己的项目(一般都是自定义控制器)中使用 client-go 库的 Informer 包:

import (
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
)

kubeClient, err := kubernetes.NewForConfig(cfg)
stopCh := make(chan struct{})
defer close(stopCh)

sharedInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Minute)
informer := sharedInformerFactory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func() {
        // TODO
    },
    UpdateFunc: func() {
        // TODO
    },
    DeleteFunc: func() {
        // TODO
    }
})
go sharedInformerFactory.Start(stopCh)
  1. 实例化一个 Kubernetes 资源 ClientSet
  2. 实例化一个 SharedInformer 对象
  3. 得到具体 Pod 资源的 Informer 对象
  4. 为 Informer 添加增改删事件的回调方法
  5. 使用一个单独的 goroutine 启动 Informer,stopCh 用于在进程退出前通知 Informer 优雅退出

深入挖掘 Informer

Informer 由三部分构成:

  1. Reflector:Informer 通过 Reflector 与 Kubernetes apiserver 建立连接并 ListAndWatch Kubernetes 资源对象的变化,并将此“增量” push 入 DeltaFIFO Queue
  2. DeltaFIFO Queue:Informer 从该队列中 pop 增量,或创建或更新或删除本地缓存(Local Store)
  3. Indexer:将增量中的 Kubernetes 资源对象保存到本地缓存中,并为其创建索引,这份缓存与 etcd 中的数据是完全一致的。控制器只从本地缓存通过索引读取数据,这样做减小了 apiserver 和 etcd 的压力。

Reflector

当 Informer 启动时 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/controller.go#L127-L#132 会实例化一个 Reflector。我们先找到 Reflector 的定义 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L48-L98 和 Reflector 的启动方法 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L171-L182

func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

最主要的就是 ListAndWatch 方法 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L207-L441

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            // a lot of code here

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                r.setIsLastSyncResourceVersionUnavailable(true)
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()

        // a lot of code here

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    // watch part
}

我们要看一下 controller 结构中的 Config.ListerWatcher 是怎么来的:在实例化 SharedIndexInformerhttps://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L193 会传入 ListerWatcher 参数,而 Kubernetes API 定义相关代码中(通过代码生成器生成)都有固定的 NewFilteredXXXInformer 函数,其中调用了 NewSharedIndexInformerListWatch 对象就是在初始化的。

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}
  1. 首先 r.listerWatcher.List 获取所有资源数据
    • 调用 Kubernetes API 资源 NewFilteredXXXInformer 时初始化 ListWatch 时传入的 ListFunc 函数,通过 ClientSet 客户端与 apiserver 交互并获取 XXX 资源列表数据;
  2. listMetaInterface.GetResourceVersion 获取资源版本号
    • 每个资源对象都有 ResourceVersion,当 Kubernetes 资源对象变化时,其资源版本也会发生变化
  3. meta.ExtractList 将资源数据转换为资源对象列表
  4. r.syncWith 将资源对象和资源版本号存储到 DeltaFIFO 队列中或替换掉已存在的资源
  5. r.setLastSyncResourceVersion 设置最新的资源版本号

ListAndWatch 本质上就是通过 apiserver 的 List API 获取所有最新版本的 Kubernetes 资源对象;通过 Watch API 来监听所有这些资源对象的变化。

下面就来看下所谓的 Watch https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L207-L411

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    // list
    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch {
            case isExpiredError(err):
                // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
                // has a semantic that it returns data at least as fresh as provided RV.
                // So first try to LIST with setting RV to resource version of last observed object.
                klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
            case err == io.EOF:
                // watch closed normally
            case err == io.ErrUnexpectedEOF:
                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return nil
        }

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

r.listerWatcher.Watch 毫无疑问要回调 NewFilteredXXXInformer 中初始化 ListWatch 时传入的 WatchFunc 函数。

我们还是以 Pod 这种 Kubernetes API 资源为例 https://github.com/kubernetes/client-go/blob/v0.18.6/informers/core/v1/pod.go#L55-L78

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            // list func code gere
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

再来看下 client.CoreV1().Pods(namespace).Watch 的实现 https://github.com/kubernetes/client-go/blob/v0.18.6/kubernetes/typed/core/v1/pod.go#L100-L113

// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    opts.Watch = true
    return c.client.Get().
        Namespace(c.ns).
        Resource("pods").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Watch(ctx)
}

https://github.com/kubernetes/client-go/blob/v0.18.6/rest/request.go#L625-L690

// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {

    url := r.URL().String()
    req, err := http.NewRequest(r.verb, url, r.body)
    if err != nil {
        return nil, err
    }
    req = req.WithContext(ctx)
    req.Header = r.headers
    client := r.c.Client
    if client == nil {
        client = http.DefaultClient
    }
    r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
    resp, err := client.Do(req)
    // a lot of code here
    if resp.StatusCode != http.StatusOK {
        defer resp.Body.Close()
        if result := r.transformResponse(resp, req); result.err != nil {
            return nil, result.err
        }
        return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
    }

    contentType := resp.Header.Get("Content-Type")
    mediaType, params, err := mime.ParseMediaType(contentType)
    if err != nil {
        klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
    }
    objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
    if err != nil {
        return nil, err
    }

    frameReader := framer.NewFrameReader(resp.Body)
    watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)

    return watch.NewStreamWatcher(
        restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
        // use 500 to indicate that the cause of the error is unknown - other error codes
        // are more specific to HTTP interactions, and set a reason
        errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
    ), nil
}

很典型的 Golang HTTP 请求代码,但是 Kubernetes apiserver 首次应答的 HTTP Header 中会携带上 Transfer-Encoding: chunked,表示分块传输,客户端会保持这条 TCP 连接并等待下一个数据块。如此 apiserver 会主动将监听的 Kubernetes 资源对象的变化不断地推送给客户端:

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked

{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Pod","apiVersion":"v1",...}}

当 Watch 接收到数据,也就是资源对象发生了变化,就要做出相应的处理 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L422-L498:

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}

watchHandler 方法会判断资源对象的变化类型(增删改)并更新 DeltaFIFO 队列。这里的 r.store 为什么会是 DeltaFIFO 队列呢?

https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L66

type Reflector struct {
    // The destination to sync up with the watch source
    store Store
}

是因为 Reflector 定义中的 store 字段是一个 Store interface。而在 Informer 开始运行时,这个 store 被设置为了 DeltaFIFO 队列对象 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L336-L378

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          s.indexer,
        EmitDeltaTypeReplaced: true,
    })

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
}

所以 Reflector 的主要作用是 List & Watch 某些资源对象,并将此“增量”事件 push 入一个 DeltaFIFO 队列。