Kubernetes Informer - DeltaFIFO Queue 篇

Mar 31, 2021 22:30 · 3125 words · 7 minute read Kubernetes Golang

Reflector 篇中说到过,在 List & Watch 的对象发生变化(增删改)时,Kubernetes apiserver 会主动将这些变化推送至 Informer 实例,由 Reflector 模块把“增量”事件 push 到一个 DeltaFIFO 队列中。

DeltaFIFO 队列

在 Informer 开始运行时,Reflector 对象的 store 字段被设置为了 DeltaFIFO 队列对象 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L336-L378

DetaFIFO 结构的定义 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L158-L192 如下:

type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
    items map[string]Deltas
    queue []string

    // a lot of code here
}

DeltaFIFO 队列中存储的对象叫 Delta,我们就来看一下它的定义 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L675-L683

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
    Type   DeltaType
    Object interface{}
}

其中 DeltaType 就是 Delta 的类型,有 Added、Updated、Deleted、Replaced、Sync 这么几种 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L656-L673,也就是 Kubernetes 资源对象发生了何种变化。

既然是个队列,那必然是有队列那些通用方法的:

DeltaFIFO 的 queue(也就是所谓的 WorkQueue)字段用于存储资源对象的 key,是通过 KeyOf 方法得到 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L213-L226

// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    if d, ok := obj.(Deltas); ok {
        if len(d) == 0 {
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        obj = d.Newest().Object
    }
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    return f.keyFunc(obj)
}

items(也就是所谓的 LocalStore)通过 map 来存储 Delta 列表。

Reflector 是 DeltaFIFO 队列的数据生产者 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L464

switch event.Type {
case watch.Added:
    err := r.store.Add(event.Object)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    }
case watch.Modified:
    err := r.store.Update(event.Object)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    }
case watch.Deleted:
    // TODO: Will any consumers need access to the "last known
    // state", which is passed in event.Object? If so, may need
    // to change this.
    err := r.store.Delete(event.Object)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    }
case watch.Bookmark:
    // A `Bookmark` means watch has synced here, just update the resourceVersion
default:
    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}

而 DeltaFIFO 队列的数据消费者则是 Informer,它会不断地从队列中 Pop 出“增量”,Pop 方法 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L455-L501

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

Pop 方法的入参是一个 PopProcessFunc 类型的回调函数 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/fifo.go#L26-L28

// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error

由 process 回调函数来处理出队的“增量”对象。

Informer 中的 HandleDeltas 方法就是这么个 process 回调函数 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L494-L537

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }

                isSync := false
                switch {
                case d.Type == Sync:
                    // Sync events are only propagated to listeners that requested resync
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            // Replaced events that didn't change resourceVersion are treated as resync events
                            // and only propagated to listeners that requested resync
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

Informer 会根据资源的变化通过 Indexer 来更新 LocalStore。

在启动 Informer时,HandleDeltas 回调方法被赋值给 Config 的 Process 字段 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L352

cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,

    Process: s.HandleDeltas,
}

那么这个 PopProcessFunc 会在何时被调用呢,也就是 Pop 操作在说明时候发生呢?就在 controller.go 文件中 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/controller.go#L168-L184

// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

就是个死循环,不停地从 DeltaFIFO 队列中 Pop 出“增量”。

通过以上代码,我们大致可以勾勒出 DeltaFIFO 在整个 Informer 中的位置:

以下是 Kubernetes 官方的 sample-controller 项目中提供的 client-go 与 自定义控制器通讯流程图:

为什么这样设计

为什么不 Reflector 直接 List & Watch 去更新 LocalSore 而是要通过 Informer 主动从 DeltaFIFO 中出队呢?

首先队列决定了生产者和消费者之间是异步的,生产者也就是 Reflector 只管将监听到的 Kubernetes 资源对象变化入队,无需关心后续操作,就不会阻塞。

DeltaFIFO 的源码中写了它用于解决这些用例:

  • You want to process every object change (delta) at most once.

    这个需求就将数据结构锁定在了队列、栈等有 Pop 能力的结构,而生产者与消费者的关系 FIFO 队列就可以满足。

  • When you process an object, you want to see everything that’s happened to it since you last processed it.

    监听的对象从过去到现在可能经历了多次变化,每一次变化我们都要过目。

  • You want to process the deletion of some of the objects.

    删除也是一种变化,从有到无。

  • You might want to periodically reprocess objects.

    要定期从数据源强制同步一把,保证缓存的有效性。

Resync

最后我们再来看一下根据 LocalStore 强制更新 DeltaFIFO 缓存的操作 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L592-L638

// Resync adds, with a Sync type of Delta, every object listed by
// `f.knownObjects` whose key is not already queued for processing.
// If `f.knownObjects` is `nil` then Resync does nothing.
func (f *DeltaFIFO) Resync() error {
    f.lock.Lock()
    defer f.lock.Unlock()

    if f.knownObjects == nil {
        return nil
    }

    keys := f.knownObjects.ListKeys()
    for _, k := range keys {
        if err := f.syncKeyLocked(k); err != nil {
            return err
        }
    }
    return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
        klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil
    }

    // If we are doing Resync() and there is already an event queued for that object,
    // we ignore the Resync for it. This is to avoid the race, in which the resync
    // comes with the previous value of object (since queueing an event for the object
    // doesn't trigger changing the underlying store <knownObjects>.
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if len(f.items[id]) > 0 {
        return nil
    }

    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

DeltaFIFO 的 Resync 方法会遍历 Indexer 中的 key,并从 LocalStore 中获取对应的值重新入队,此时的 Delta 类型为 Sync。

通过 Resync 重新入队的“增量”,同样会被 Informer Pop 出来,但处理时有些许不同 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L494-L537

if err := s.indexer.Update(d.Object); err != nil {
    return err
}

isSync := false
switch {
case d.Type == Sync:
    // Sync events are only propagated to listeners that requested resync
    isSync = true
case d.Type == Replaced:
    if accessor, err := meta.Accessor(d.Object); err == nil {
        if oldAccessor, err := meta.Accessor(old); err == nil {
            // Replaced events that didn't change resourceVersion are treated as resync events
            // and only propagated to listeners that requested resync
            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
        }
    }
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)

Sync 类型的“增量”,isSync 会被标记为 true,进而触发 OnUpdate 回调函数。而 OnUpdate 回调函数则会回调我们在编写自定义控制器时定义的 UpdateFunc https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/controller.go#L222-L227

// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    if r.UpdateFunc != nil {
        r.UpdateFunc(oldObj, newObj)
    }
}
sharedInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Minute)
informer := sharedInformerFactory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func() {
        // TODO
    },
    UpdateFunc: func() {
        // TODO
    },
    DeleteFunc: func() {
        // TODO
    }
})

这就要求我们自定义的 UpdateFunc 中比较新旧两个资源对象的版本:

UpdateFunc: func(old, new interface{}) {
    oldResource := old.(*CustomResource)
    newResource := new.(*CustomResource)
    if oldResource.ResourceVersion == newResource.ResourceVersion {
        return
    }
}

如果没变就没啥必要去处理了。