KubeVirt Domain Informer 原理

Jun 27, 2024 20:00 · 2560 words · 6 minute read KubeVirt Kubernetes Golang

KubeVirt 项目中,Domain 结构体只存在于 virt-handlervirt-launcher 内部,并不像 VirtualMachineVirtualMachineInstance 等以 CRD 以及 CR 的形式存储在 Kubernetes 的 etcd 数据库中。但在 virt-handlerVirtualMachineController 中有一个 domainInformer 来通知 libvirt Domain 的相关事件,它的数据源并不是 kube-apiserver

func NewController(
    recorder record.EventRecorder,
    clientset kubecli.KubevirtClient,
    host string,
    migrationIpAddress string,
    virtShareDir string,
    virtPrivateDir string,
    kubeletPodsDir string,
    vmiSourceInformer cache.SharedIndexInformer,
    vmiTargetInformer cache.SharedIndexInformer,
    domainInformer cache.SharedInformer,
   // a lot of parameters here
) (*VirtualMachineController, error) {

    queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virt-handler-vm")

    c := &VirtualMachineController{
        Queue:                       queue,
        recorder:                    recorder,
        clientset:                   clientset,
        host:                        host,
        migrationIpAddress:          migrationIpAddress,
        virtShareDir:                virtShareDir,
        vmiSourceInformer:           vmiSourceInformer,
        vmiTargetInformer:           vmiTargetInformer,
        domainInformer:              domainInformer,
    }

    _, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.addDomainFunc,
        DeleteFunc: c.deleteDomainFunc,
        UpdateFunc: c.updateDomainFunc,
    })
    if err != nil {
        return nil, err
    }
    // a lot of code here
}

本文将解释这个特殊的 Informer 是如何实现的:

https://github.com/kubevirt/kubevirt/blob/096a1a7044ee1e39dc7e2c2b80b3fb28992f9ba5/cmd/virt-handler/virt-handler.go#L265-L269

func (app *virtHandlerApp) Run() {
    // a lot of code here
    domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmiSourceInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
    if err != nil {
        panic(err)
    }
    // a lot of code here
}
  1. List & Watch

正常 Informer 数据源是 kube-apiserver,以 VirtualMachineInstance Informer 为例 https://github.com/kubevirt/kubevirt/blob/bb7eef6380f3c1b2bb80e4d8369ffe3d347d36d1/pkg/controller/virtinformers.go#L433-L438

func (f *kubeInformerFactory) VMI() cache.SharedIndexInformer {
    return f.getInformer("vmiInformer", func() cache.SharedIndexInformer {
        lw := cache.NewListWatchFromClient(f.restClient, "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything())
        return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachineInstance{}, f.defaultResync, GetVMIInformerIndexers())
    })
}
  1. 创建一个 ListWatch 对象,client-go 包提供了该结构,实现了 ListWatcher 接口,参数 f.restClient 是连接 kube-apiserver 的客户端 RESTClient,来自 clientset:

    app.restClient = app.clientSet.RestClient()
    
    // a lot of code here
    
    app.informerFactory = controller.NewKubeInformerFactory(app.restClient, app.clientSet, nil, app.kubevirtNamespace)
    
  2. 使用 ListWatch 对象初始化一个 SharedIndexInformer。

接着我们来看 domainInformer 的初始化相关代码:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L595-L599

func NewSharedInformer(virtShareDir string, watchdogTimeout int, recorder record.EventRecorder, vmiStore cache.Store, resyncPeriod time.Duration) (cache.SharedInformer, error) {
    lw := newListWatchFromNotify(virtShareDir, watchdogTimeout, recorder, vmiStore, resyncPeriod)
    informer := cache.NewSharedInformer(lw, &api.Domain{}, 0)
    return informer, nil
}

这里 ListWatcher 接口由 KubeVirt 自行实现而非 client-go 包提供

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L65-L79

type DomainWatcher struct {
    lock                     sync.Mutex
    wg                       sync.WaitGroup
    stopChan                 chan struct{}
    eventChan                chan watch.Event
    backgroundWatcherStarted bool
    virtShareDir             string
    watchdogTimeout          int
    recorder                 record.EventRecorder
    vmiStore                 cache.Store
    resyncPeriod             time.Duration

    watchDogLock        sync.Mutex
    unresponsiveSockets map[string]int64
}

ListerWatcher 接口的定义:

https://github.com/kubevirt/kubevirt/blob/5199eb4252f1490f9d982abfd7a8892f2aeffe62/vendor/k8s.io/client-go/tools/cache/listwatch.go#L29-L46

// Lister is any object that knows how to perform an initial list.
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    Lister
    Watcher
}

DomainWatcher 分别实现了 Lister 和 Watcher 接口。

Lister

Lister 部分相对简单,只需实现一个 List 方法,返回 Domain 对象列表:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L551-L572

func (d *DomainWatcher) List(_ k8sv1.ListOptions) (runtime.Object, error) {

    log.Log.V(3).Info("Synchronizing domains")
    err := d.startBackground()
    if err != nil {
        return nil, err
    }

    domains, err := d.listAllKnownDomains()
    if err != nil {
        return nil, err
    }

    list := api.DomainList{
        Items: []api.Domain{},
    }

    for _, domain := range domains {
        list.Items = append(list.Items, *domain)
    }
    return &list, nil
}

这里 DomainWatcher 通过 listAllKnownDomains 方法来获取所有 VMI 关联的 Domain 的列表,一个开机的 VM 对应一个 VMI 对应一个 virt-launcher Pod 对应一个 libvirt Domain

VM - VMI - virt-launcher Pod - libvirt Domain

func (d *DomainWatcher) listAllKnownDomains() ([]*api.Domain, error) {
    var domains []*api.Domain

    socketFiles, err := listSockets()
    if err != nil {
        return nil, err
    }
    for _, socketFile := range socketFiles {

        exists, err := diskutils.FileExists(socketFile)
        if err != nil {
            log.Log.Reason(err).Error("failed access cmd client socket")
            continue
        }

        if !exists {
            record, recordExists := findGhostRecordBySocket(socketFile)
            if recordExists {
                domain := api.NewMinimalDomainWithNS(record.Namespace, record.Name)
                domain.ObjectMeta.UID = record.UID
                now := k8sv1.Now()
                domain.ObjectMeta.DeletionTimestamp = &now
                log.Log.Object(domain).Warning("detected stale domain from ghost record")
                domains = append(domains, domain)
            }
            continue
        }

        log.Log.V(3).Infof("List domains from sock %s", socketFile)
        client, err := cmdclient.NewClient(socketFile)
        if err != nil {
            log.Log.Reason(err).Error("failed to connect to cmd client socket")
            // Ignore failure to connect to client.
            // These are all local connections via unix socket.
            // A failure to connect means there's nothing on the other
            // end listening.
            continue
        }
        defer client.Close()

        domain, exists, err := client.GetDomain()
        if err != nil {
            log.Log.Reason(err).Error("failed to list domains on cmd client socket")
            // Failure to get domain list means that client
            // was unable to contact libvirt. As soon as the connection
            // is restored on the client's end, a domain notification will
            // be sent.
            continue
        }
        if exists == true {
            domains = append(domains, domain)
        }
    }
    return domains, nil
}

背景知识:virt-launcher 内部存在一个 cmd server,通过 Unix Domain Socket 向 virt-handler 暴露。因为 virt-handler DaemonSet 运行在特权模式(privileged)且挂载了 /var/lib/kubelet/pods 宿主机路径,所以它能够看到 virt-launcher 的 usock,举个栗子:

$ kubectl get vmi ecs-rocky8-10 -n ns-demo
NAME            AGE   PHASE     IP              NODENAME   READY
ecs-rocky8-10   11d   Running   192.168.100.4   node173    True

$ kubectl get po -n ns-demo | grep ecs-rocky8-10
virt-launcher-ecs-rocky8-10-bxknl            1/1     Running   0               11d

$ kubectl get po virt-launcher-ecs-rocky8-10-bxknl -n ns-demo -o jsonpath='{.metadata.uid}'
69e41c74-7e47-45f7-a4e4-9c193efcf5f8

$ ll /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/
total 8
drwxrwsrwx  2 root qemu    6 Jun 16 09:09 container-disks
drwxrwsrwx  5 root qemu   67 Jun 17 11:32 ephemeral-disks
drwxrwsrwx  2 root qemu    6 Jun 16 09:09 hotplug-disks
drwxrwsrwx  5 root qemu 4096 Jun 27 10:51 libvirt-runtime
drwxrwsrwx 10 root qemu 4096 Jun 17 11:32 private
drwxrwsrwx  5 root qemu   96 Jun 20 10:25 public
drwxrwsrwx  2 root qemu   27 Jun 17 11:32 sockets
drwxrwsrwx  2 root qemu    6 Jun 16 09:09 vhostuser-sockets

$ stat /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
  File: /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
  Size: 0          Blocks: 0          IO Block: 4096   socket
Device: fd00h/64768d Inode: 3267213     Links: 1
Access: (0755/srwxr-xr-x)  Uid: (  107/    qemu)   Gid: (  107/    qemu)
Access: 2024-06-26 11:32:45.034447042 +0800
Modify: 2024-06-17 11:32:21.331893299 +0800
Change: 2024-06-17 11:32:21.342893055 +0800
 Birth: 2024-06-17 11:32:21.331893299 +0800

# virt-handler-r64rw 与 virt-launcher Pod 同节点
$ kubectl exec -it virt-handler-r64rw -n kubevirt -- ls -al /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
Defaulted container "virt-handler" out of: virt-handler, virt-launcher (init)
srwxr-xr-x 1 qemu qemu 0 Jun 17 03:32 /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock

回到 listAllKnownDomains 方法:

  1. 首先拿到同节点所有 virt-launcher 的 launcher-sock。

  2. client, err := cmdclient.NewClient(socketFile) 创建 cmd client,打开 launcher-sock 连接 virt-launcher 端的 cmd server。

  3. 通过 GetDomain API 查询 virt-launcher 关联的 libvirt Domain,其实现为 VirtLauncherClient

  4. client 发送请求至 virt-launcher,随后 virt-launcher 的 cmd server 调用 libvirt API 查询 Domain:

    https://github.com/kubevirt/kubevirt/blob/566fa27585757640d1afdf928ff6b377bda89659/pkg/virt-launcher/virtwrap/cmd-server/server.go#L400-L434

  5. 返回该节点所有 Domain

Watcher

Watcher 部分繁琐一些,要实现 StopResultChan() <-chan Event 两个方法:

https://github.com/kubevirt/kubevirt/blob/7ef8f068a67b92a3008352b6bc9993810e43d140/vendor/k8s.io/apimachinery/pkg/watch/watch.go#L28-L38

// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
    // Stop stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.
    Stop()

    // ResultChan returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, the implementation will close this channel and
    // release any resources used by the watch.
    ResultChan() <-chan Event
}

1. Stop

Stop 方法的实现简单:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L578-L589

func (d *DomainWatcher) Stop() {
    d.lock.Lock()
    defer d.lock.Unlock()

    if d.backgroundWatcherStarted == false {
        return
    }
    close(d.stopChan)
    d.wg.Wait()
    d.backgroundWatcherStarted = false
    close(d.eventChan)
}

关掉 stopChaneventChan 就完事。

2. ResultChan

我们来看 DomainWatcherResultChan() <-chan Event 的实现及其数据的来源:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L591-L593

func (d *DomainWatcher) ResultChan() <-chan watch.Event {
    return d.eventChan
}

就返回了一个 watch.Event 通道,该通道必然有消息生产者。

在 Domain Informer 调用 DomainWatcherList 方法时,startBackground 方法中启动了一个 goroutine:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L303-L357

func (d *DomainWatcher) startBackground() error {
    // a lot of code here
    go func() {
        defer d.wg.Done()

        resyncTicker := time.NewTicker(d.resyncPeriod)
        resyncTickerChan := resyncTicker.C
        defer resyncTicker.Stop()

        // Divide the watchdogTimeout by 3 for our ticker.
        // This ensures we always have at least 2 response failures
        // in a row before we mark the socket as unavailable (which results in shutdown of VMI)
        expiredWatchdogTicker := time.NewTicker(time.Duration((d.watchdogTimeout/3)+1) * time.Second)
        defer expiredWatchdogTicker.Stop()

        expiredWatchdogTickerChan := expiredWatchdogTicker.C

        srvErr := make(chan error)
        go func() {
            defer close(srvErr)
            err := notifyserver.RunServer(d.virtShareDir, d.stopChan, d.eventChan, d.recorder, d.vmiStore)
            srvErr <- err
        }()

        for {
            select {
            case <-resyncTickerChan:
                d.handleResync()
            case <-expiredWatchdogTickerChan:
                d.handleStaleWatchdogFiles()
                d.handleStaleSocketConnections()
            case err := <-srvErr:
                if err != nil {
                    log.Log.Reason(err).Errorf("Unexpected err encountered with Domain Notify aggregation server")
                }

                // server exitted so this goroutine is done.
                return
            }
        }
    }()

    d.backgroundWatcherStarted = true
    return nil
}

其中包含了一个定时器(Ticker),每隔一段时间就调用 handleResync 方法:

https://github.com/kubevirt/kubevirt/blob/25b37338917f1a7bfbe3fc07d87672ef1e39389c/pkg/virt-handler/cache/cache.go#L377-L410

func (d *DomainWatcher) handleResync() {
    socketFiles, err := listSockets()
    if err != nil {
        log.Log.Reason(err).Error("failed to list sockets")
        return
    }

    log.Log.Infof("resyncing virt-launcher domains")
    for _, socket := range socketFiles {
        client, err := cmdclient.NewClient(socket)
        if err != nil {
            log.Log.Reason(err).Error("failed to connect to cmd client socket during resync")
            // Ignore failure to connect to client.
            // These are all local connections via unix socket.
            // A failure to connect means there's nothing on the other
            // end listening.
            continue
        }
        defer client.Close()

        domain, exists, err := client.GetDomain()
        if err != nil {
            // this resync is best effort only.
            log.Log.Reason(err).Errorf("unable to retrieve domain at socket %s during resync", socket)
            continue
        } else if !exists {
            // nothing to sync if it doesn't exist
            continue
        }

        d.eventChan <- watch.Event{Type: watch.Modified, Object: domain}
    }
}

定期通过 cmd client 去连接本节点所有 virt-launcher 的 launcher-sock,查询其 libvirt Domain,并向 eventChan 发送一个固定的 watch.Modified 类型的消息,并将查询到的 Domain 结构填充至消息中。

至此都清楚了,实际上 DomainWatcher 只能定期地去轮询同节点上所有 libvirt Domain,随后给 Domain Informer 发送固定的 Modified 事件,而 Domain 本身在大多数情况下并不会发生状态变更,这和以 kube-apiserver 为数据源的 Informer 的原理不同,kube-apiserver 的 Watch API 具有推送真实对象变更的能力。

当 virt-launcher 调用 libvirt API 拉起 Domain(相当于虚机开机)一段时间后,DomainWatcher 将运行中的 Domain 对象发送至 Domain Informer,回调 VirtualMachineController 在其中注册的 updateDomainFunc 方法,将 Domain 的 NamespaceKey(和 VM/VMI 一致)入队 workqueue。

随后 worker goroutine 在 execute 方法中调用 updateVMIStatus 比对当前 Domain 和 VMI 的状态,并将 VMI 更新至 Running 状态:

    switch domain.Status.Status {
    case api.Shutoff, api.Crashed:
        switch domain.Status.Reason {
        case api.ReasonCrashed, api.ReasonPanicked:
            return v1.Failed, nil
        case api.ReasonDestroyed:
            // When ACPI is available, the domain was tried to be shutdown,
            // and destroyed means that the domain was destroyed after the graceperiod expired.
            // Without ACPI a destroyed domain is ok.
            if isACPIEnabled(vmi, domain) {
                return v1.Failed, nil
            }
            return v1.Succeeded, nil
        case api.ReasonShutdown, api.ReasonSaved, api.ReasonFromSnapshot:
            return v1.Succeeded, nil
        case api.ReasonMigrated:
            // if the domain migrated, we no longer know the phase.
            return vmi.Status.Phase, nil
        }
    case api.Running, api.Paused, api.Blocked, api.PMSuspended:
        return v1.Running, nil
    }