KubeVirt Domain Informer 原理
Jun 27, 2024 20:00 · 2560 words · 6 minute read
在 KubeVirt 项目中,Domain 结构体只存在于 virt-handler 与 virt-launcher 内部,并不像 VirtualMachine、VirtualMachineInstance 等以 CRD 以及 CR 的形式存储在 Kubernetes 的 etcd 数据库中。但在 virt-handler 的 VirtualMachineController 中有一个 domainInformer 来通知 libvirt Domain 的相关事件,它的数据源并不是 kube-apiserver:
func NewController(
recorder record.EventRecorder,
clientset kubecli.KubevirtClient,
host string,
migrationIpAddress string,
virtShareDir string,
virtPrivateDir string,
kubeletPodsDir string,
vmiSourceInformer cache.SharedIndexInformer,
vmiTargetInformer cache.SharedIndexInformer,
domainInformer cache.SharedInformer,
// a lot of parameters here
) (*VirtualMachineController, error) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virt-handler-vm")
c := &VirtualMachineController{
Queue: queue,
recorder: recorder,
clientset: clientset,
host: host,
migrationIpAddress: migrationIpAddress,
virtShareDir: virtShareDir,
vmiSourceInformer: vmiSourceInformer,
vmiTargetInformer: vmiTargetInformer,
domainInformer: domainInformer,
}
_, err = domainInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addDomainFunc,
DeleteFunc: c.deleteDomainFunc,
UpdateFunc: c.updateDomainFunc,
})
if err != nil {
return nil, err
}
// a lot of code here
}
本文将解释这个特殊的 Informer 是如何实现的:
func (app *virtHandlerApp) Run() {
// a lot of code here
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmiSourceInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
if err != nil {
panic(err)
}
// a lot of code here
}
- List & Watch
正常 Informer 数据源是 kube-apiserver,以 VirtualMachineInstance Informer 为例 https://github.com/kubevirt/kubevirt/blob/bb7eef6380f3c1b2bb80e4d8369ffe3d347d36d1/pkg/controller/virtinformers.go#L433-L438:
func (f *kubeInformerFactory) VMI() cache.SharedIndexInformer {
return f.getInformer("vmiInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "virtualmachineinstances", k8sv1.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachineInstance{}, f.defaultResync, GetVMIInformerIndexers())
})
}
-
创建一个 ListWatch 对象,client-go 包提供了该结构,实现了
ListWatcher
接口,参数f.restClient
是连接 kube-apiserver 的客户端RESTClient
,来自 clientset:app.restClient = app.clientSet.RestClient() // a lot of code here app.informerFactory = controller.NewKubeInformerFactory(app.restClient, app.clientSet, nil, app.kubevirtNamespace)
-
使用 ListWatch 对象初始化一个 SharedIndexInformer。
接着我们来看 domainInformer
的初始化相关代码:
func NewSharedInformer(virtShareDir string, watchdogTimeout int, recorder record.EventRecorder, vmiStore cache.Store, resyncPeriod time.Duration) (cache.SharedInformer, error) {
lw := newListWatchFromNotify(virtShareDir, watchdogTimeout, recorder, vmiStore, resyncPeriod)
informer := cache.NewSharedInformer(lw, &api.Domain{}, 0)
return informer, nil
}
这里 ListWatcher
接口由 KubeVirt
自行实现而非 client-go 包提供:
type DomainWatcher struct {
lock sync.Mutex
wg sync.WaitGroup
stopChan chan struct{}
eventChan chan watch.Event
backgroundWatcherStarted bool
virtShareDir string
watchdogTimeout int
recorder record.EventRecorder
vmiStore cache.Store
resyncPeriod time.Duration
watchDogLock sync.Mutex
unresponsiveSockets map[string]int64
}
ListerWatcher
接口的定义:
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
DomainWatcher
分别实现了 Lister 和 Watcher 接口。
Lister
Lister 部分相对简单,只需实现一个 List
方法,返回 Domain 对象列表:
func (d *DomainWatcher) List(_ k8sv1.ListOptions) (runtime.Object, error) {
log.Log.V(3).Info("Synchronizing domains")
err := d.startBackground()
if err != nil {
return nil, err
}
domains, err := d.listAllKnownDomains()
if err != nil {
return nil, err
}
list := api.DomainList{
Items: []api.Domain{},
}
for _, domain := range domains {
list.Items = append(list.Items, *domain)
}
return &list, nil
}
这里 DomainWatcher
通过 listAllKnownDomains
方法来获取所有 VMI 关联的 Domain 的列表,一个开机的 VM 对应一个 VMI 对应一个 virt-launcher Pod 对应一个 libvirt Domain。
VM - VMI - virt-launcher Pod - libvirt Domain
func (d *DomainWatcher) listAllKnownDomains() ([]*api.Domain, error) {
var domains []*api.Domain
socketFiles, err := listSockets()
if err != nil {
return nil, err
}
for _, socketFile := range socketFiles {
exists, err := diskutils.FileExists(socketFile)
if err != nil {
log.Log.Reason(err).Error("failed access cmd client socket")
continue
}
if !exists {
record, recordExists := findGhostRecordBySocket(socketFile)
if recordExists {
domain := api.NewMinimalDomainWithNS(record.Namespace, record.Name)
domain.ObjectMeta.UID = record.UID
now := k8sv1.Now()
domain.ObjectMeta.DeletionTimestamp = &now
log.Log.Object(domain).Warning("detected stale domain from ghost record")
domains = append(domains, domain)
}
continue
}
log.Log.V(3).Infof("List domains from sock %s", socketFile)
client, err := cmdclient.NewClient(socketFile)
if err != nil {
log.Log.Reason(err).Error("failed to connect to cmd client socket")
// Ignore failure to connect to client.
// These are all local connections via unix socket.
// A failure to connect means there's nothing on the other
// end listening.
continue
}
defer client.Close()
domain, exists, err := client.GetDomain()
if err != nil {
log.Log.Reason(err).Error("failed to list domains on cmd client socket")
// Failure to get domain list means that client
// was unable to contact libvirt. As soon as the connection
// is restored on the client's end, a domain notification will
// be sent.
continue
}
if exists == true {
domains = append(domains, domain)
}
}
return domains, nil
}
背景知识:virt-launcher 内部存在一个 cmd server,通过 Unix Domain Socket 向 virt-handler 暴露。因为 virt-handler DaemonSet 运行在特权模式(privileged)且挂载了 /var/lib/kubelet/pods 宿主机路径,所以它能够看到 virt-launcher 的 usock,举个栗子:
$ kubectl get vmi ecs-rocky8-10 -n ns-demo
NAME AGE PHASE IP NODENAME READY
ecs-rocky8-10 11d Running 192.168.100.4 node173 True
$ kubectl get po -n ns-demo | grep ecs-rocky8-10
virt-launcher-ecs-rocky8-10-bxknl 1/1 Running 0 11d
$ kubectl get po virt-launcher-ecs-rocky8-10-bxknl -n ns-demo -o jsonpath='{.metadata.uid}'
69e41c74-7e47-45f7-a4e4-9c193efcf5f8
$ ll /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/
total 8
drwxrwsrwx 2 root qemu 6 Jun 16 09:09 container-disks
drwxrwsrwx 5 root qemu 67 Jun 17 11:32 ephemeral-disks
drwxrwsrwx 2 root qemu 6 Jun 16 09:09 hotplug-disks
drwxrwsrwx 5 root qemu 4096 Jun 27 10:51 libvirt-runtime
drwxrwsrwx 10 root qemu 4096 Jun 17 11:32 private
drwxrwsrwx 5 root qemu 96 Jun 20 10:25 public
drwxrwsrwx 2 root qemu 27 Jun 17 11:32 sockets
drwxrwsrwx 2 root qemu 6 Jun 16 09:09 vhostuser-sockets
$ stat /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
File: /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
Size: 0 Blocks: 0 IO Block: 4096 socket
Device: fd00h/64768d Inode: 3267213 Links: 1
Access: (0755/srwxr-xr-x) Uid: ( 107/ qemu) Gid: ( 107/ qemu)
Access: 2024-06-26 11:32:45.034447042 +0800
Modify: 2024-06-17 11:32:21.331893299 +0800
Change: 2024-06-17 11:32:21.342893055 +0800
Birth: 2024-06-17 11:32:21.331893299 +0800
# virt-handler-r64rw 与 virt-launcher Pod 同节点
$ kubectl exec -it virt-handler-r64rw -n kubevirt -- ls -al /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
Defaulted container "virt-handler" out of: virt-handler, virt-launcher (init)
srwxr-xr-x 1 qemu qemu 0 Jun 17 03:32 /var/lib/kubelet/pods/69e41c74-7e47-45f7-a4e4-9c193efcf5f8/volumes/kubernetes.io~empty-dir/sockets/launcher-sock
回到 listAllKnownDomains
方法:
-
首先拿到同节点所有 virt-launcher 的 launcher-sock。
-
client, err := cmdclient.NewClient(socketFile)
创建 cmd client,打开 launcher-sock 连接 virt-launcher 端的 cmd server。 -
通过
GetDomain
API 查询 virt-launcher 关联的 libvirt Domain,其实现为 VirtLauncherClient。 -
client 发送请求至 virt-launcher,随后 virt-launcher 的 cmd server 调用 libvirt API 查询 Domain:
-
返回该节点所有 Domain
Watcher
Watcher 部分繁琐一些,要实现 Stop
和 ResultChan() <-chan Event
两个方法:
// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
// Stop stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// ResultChan returns a chan which will receive all the events. If an error occurs
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan Event
}
1. Stop
Stop
方法的实现简单:
func (d *DomainWatcher) Stop() {
d.lock.Lock()
defer d.lock.Unlock()
if d.backgroundWatcherStarted == false {
return
}
close(d.stopChan)
d.wg.Wait()
d.backgroundWatcherStarted = false
close(d.eventChan)
}
2. ResultChan
我们来看 DomainWatcher
对 ResultChan() <-chan Event
的实现及其数据的来源:
func (d *DomainWatcher) ResultChan() <-chan watch.Event {
return d.eventChan
}
就返回了一个 watch.Event
通道,该通道必然有消息生产者。
在 Domain Informer 调用 DomainWatcher
的 List
方法时,startBackground
方法中启动了一个 goroutine:
func (d *DomainWatcher) startBackground() error {
// a lot of code here
go func() {
defer d.wg.Done()
resyncTicker := time.NewTicker(d.resyncPeriod)
resyncTickerChan := resyncTicker.C
defer resyncTicker.Stop()
// Divide the watchdogTimeout by 3 for our ticker.
// This ensures we always have at least 2 response failures
// in a row before we mark the socket as unavailable (which results in shutdown of VMI)
expiredWatchdogTicker := time.NewTicker(time.Duration((d.watchdogTimeout/3)+1) * time.Second)
defer expiredWatchdogTicker.Stop()
expiredWatchdogTickerChan := expiredWatchdogTicker.C
srvErr := make(chan error)
go func() {
defer close(srvErr)
err := notifyserver.RunServer(d.virtShareDir, d.stopChan, d.eventChan, d.recorder, d.vmiStore)
srvErr <- err
}()
for {
select {
case <-resyncTickerChan:
d.handleResync()
case <-expiredWatchdogTickerChan:
d.handleStaleWatchdogFiles()
d.handleStaleSocketConnections()
case err := <-srvErr:
if err != nil {
log.Log.Reason(err).Errorf("Unexpected err encountered with Domain Notify aggregation server")
}
// server exitted so this goroutine is done.
return
}
}
}()
d.backgroundWatcherStarted = true
return nil
}
其中包含了一个定时器(Ticker),每隔一段时间就调用 handleResync
方法:
func (d *DomainWatcher) handleResync() {
socketFiles, err := listSockets()
if err != nil {
log.Log.Reason(err).Error("failed to list sockets")
return
}
log.Log.Infof("resyncing virt-launcher domains")
for _, socket := range socketFiles {
client, err := cmdclient.NewClient(socket)
if err != nil {
log.Log.Reason(err).Error("failed to connect to cmd client socket during resync")
// Ignore failure to connect to client.
// These are all local connections via unix socket.
// A failure to connect means there's nothing on the other
// end listening.
continue
}
defer client.Close()
domain, exists, err := client.GetDomain()
if err != nil {
// this resync is best effort only.
log.Log.Reason(err).Errorf("unable to retrieve domain at socket %s during resync", socket)
continue
} else if !exists {
// nothing to sync if it doesn't exist
continue
}
d.eventChan <- watch.Event{Type: watch.Modified, Object: domain}
}
}
定期通过 cmd client 去连接本节点所有 virt-launcher 的 launcher-sock,查询其 libvirt Domain,并向 eventChan
发送一个固定的 watch.Modified
类型的消息,并将查询到的 Domain 结构填充至消息中。
至此都清楚了,实际上 DomainWatcher
只能定期地去轮询同节点上所有 libvirt Domain,随后给 Domain Informer 发送固定的 Modified 事件,而 Domain 本身在大多数情况下并不会发生状态变更,这和以 kube-apiserver 为数据源的 Informer 的原理不同,kube-apiserver 的 Watch API 具有推送真实对象变更的能力。
当 virt-launcher 调用 libvirt API 拉起 Domain(相当于虚机开机)一段时间后,DomainWatcher
将运行中的 Domain 对象发送至 Domain Informer,回调 VirtualMachineController 在其中注册的 updateDomainFunc
方法,将 Domain 的 NamespaceKey(和 VM/VMI 一致)入队 workqueue。
随后 worker goroutine 在 execute
方法中调用 updateVMIStatus
比对当前 Domain 和 VMI 的状态,并将 VMI 更新至 Running 状态:
switch domain.Status.Status {
case api.Shutoff, api.Crashed:
switch domain.Status.Reason {
case api.ReasonCrashed, api.ReasonPanicked:
return v1.Failed, nil
case api.ReasonDestroyed:
// When ACPI is available, the domain was tried to be shutdown,
// and destroyed means that the domain was destroyed after the graceperiod expired.
// Without ACPI a destroyed domain is ok.
if isACPIEnabled(vmi, domain) {
return v1.Failed, nil
}
return v1.Succeeded, nil
case api.ReasonShutdown, api.ReasonSaved, api.ReasonFromSnapshot:
return v1.Succeeded, nil
case api.ReasonMigrated:
// if the domain migrated, we no longer know the phase.
return vmi.Status.Phase, nil
}
case api.Running, api.Paused, api.Blocked, api.PMSuspended:
return v1.Running, nil
}