Kubernetes Informer - DeltaFIFO Queue 篇
Mar 31, 2021 22:30 · 3125 words · 7 minute read
Reflector 篇中说到过,在 List & Watch 的对象发生变化(增删改)时,Kubernetes apiserver 会主动将这些变化推送至 Informer 实例,由 Reflector 模块把“增量”事件 push 到一个 DeltaFIFO 队列中。
DeltaFIFO 队列
在 Informer 开始运行时,Reflector
对象的 store
字段被设置为了 DeltaFIFO 队列对象 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L336-L378。
DetaFIFO 结构的定义 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L158-L192 如下:
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// a lot of code here
}
DeltaFIFO 队列中存储的对象叫 Delta,我们就来看一下它的定义 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L675-L683
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
}
其中 DeltaType
就是 Delta 的类型,有 Added、Updated、Deleted、Replaced、Sync 这么几种 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L656-L673,也就是 Kubernetes 资源对象发生了何种变化。
既然是个队列,那必然是有队列那些通用方法的:
- Add https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L236-L243
- Update https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L245-L251
- Delete https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L253-L287
- Pop https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L455-L501
- List https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L393-L400
DeltaFIFO 的 queue
(也就是所谓的 WorkQueue)字段用于存储资源对象的 key,是通过 KeyOf
方法得到 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L213-L226:
// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
而 items
(也就是所谓的 LocalStore)通过 map 来存储 Delta 列表。
Reflector 是 DeltaFIFO 队列的数据生产者 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/reflector.go#L464
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
而 DeltaFIFO 队列的数据消费者则是 Informer,它会不断地从队列中 Pop 出“增量”,Pop
方法 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L455-L501:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
Pop
方法的入参是一个 PopProcessFunc
类型的回调函数 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/fifo.go#L26-L28:
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error
由 process 回调函数来处理出队的“增量”对象。
Informer 中的 HandleDeltas
方法就是这么个 process 回调函数 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L494-L537:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
Informer 会根据资源的变化通过 Indexer 来更新 LocalStore。
在启动 Informer时,HandleDeltas
回调方法被赋值给 Config
的 Process 字段 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L352:
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
那么这个 PopProcessFunc
会在何时被调用呢,也就是 Pop 操作在说明时候发生呢?就在 controller.go 文件中 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/controller.go#L168-L184
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
就是个死循环,不停地从 DeltaFIFO 队列中 Pop 出“增量”。
通过以上代码,我们大致可以勾勒出 DeltaFIFO 在整个 Informer 中的位置:
以下是 Kubernetes 官方的 sample-controller 项目中提供的 client-go 与 自定义控制器通讯流程图:
为什么这样设计
为什么不 Reflector 直接 List & Watch 去更新 LocalSore 而是要通过 Informer 主动从 DeltaFIFO 中出队呢?
首先队列决定了生产者和消费者之间是异步的,生产者也就是 Reflector 只管将监听到的 Kubernetes 资源对象变化入队,无需关心后续操作,就不会阻塞。
DeltaFIFO 的源码中写了它用于解决这些用例:
-
You want to process every object change (delta) at most once.
这个需求就将数据结构锁定在了队列、栈等有 Pop 能力的结构,而生产者与消费者的关系 FIFO 队列就可以满足。
-
When you process an object, you want to see everything that’s happened to it since you last processed it.
监听的对象从过去到现在可能经历了多次变化,每一次变化我们都要过目。
-
You want to process the deletion of some of the objects.
删除也是一种变化,从有到无。
-
You might want to periodically reprocess objects.
要定期从数据源强制同步一把,保证缓存的有效性。
Resync
最后我们再来看一下根据 LocalStore 强制更新 DeltaFIFO 缓存的操作 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/delta_fifo.go#L592-L638:
// Resync adds, with a Sync type of Delta, every object listed by
// `f.knownObjects` whose key is not already queued for processing.
// If `f.knownObjects` is `nil` then Resync does nothing.
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
return nil
}
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
DeltaFIFO 的 Resync
方法会遍历 Indexer 中的 key,并从 LocalStore 中获取对应的值重新入队,此时的 Delta 类型为 Sync。
通过 Resync 重新入队的“增量”,同样会被 Informer Pop 出来,但处理时有些许不同 https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/shared_informer.go#L494-L537:
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
Sync 类型的“增量”,isSync
会被标记为 true
,进而触发 OnUpdate
回调函数。而 OnUpdate
回调函数则会回调我们在编写自定义控制器时定义的 UpdateFunc
https://github.com/kubernetes/client-go/blob/v0.18.6/tools/cache/controller.go#L222-L227:
// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
sharedInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Minute)
informer := sharedInformerFactory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func() {
// TODO
},
UpdateFunc: func() {
// TODO
},
DeleteFunc: func() {
// TODO
}
})
这就要求我们自定义的 UpdateFunc
中比较新旧两个资源对象的版本:
UpdateFunc: func(old, new interface{}) {
oldResource := old.(*CustomResource)
newResource := new.(*CustomResource)
if oldResource.ResourceVersion == newResource.ResourceVersion {
return
}
}
如果没变就没啥必要去处理了。