Kubernetes Pod 绑核

Sep 15, 2023 21:00 · 3422 words · 7 minute read Kubernetes Container Linux

默认配置下,kubelet/Linux 使用 CFS(完全公平调度)算法来为 Pod 分配 CPU,工作负载(Pod 中的进程)会被调度到不同的可用的 CPU 核心,而且大多数工作负载对这种迁移带来的性能损失并不敏感。

CFS 是 Linux 内核中 SCHED_NORMAL 类任务(普通进程)的默认调度算法,摒弃了原先的固定时间片优先级概念,致力于公平地将 CPU 时间分配给任务。

但是 CPU 缓存的亲和性和调度延迟会对少数进程带来显著影响,这类工作负载需要独占 CPU(也就是所谓的“绑核”)。

kubelet CPU manager 策略

通过设置 --cpu-manager-policy flag 或者配置文件中的 cpuManagerPolicy 字段为 kubelet 指定 CPU manager 策略,目前支持两种:

  • none:默认策略,通过 Linux 默认的 CFS quota 实现 GuaranteedBurstable Pod 的 CPU 使用限制。

  • static:允许 requests 中 CPU 为整数Guaranteed Pod 独占节点上的 CPU,通过 Linux cpuset cgroup 来实现。

    static 策略管理一个共享的 CPU 池,包含了节点上所有 CPU。可独占分配的 CPU 数量等于节点 CPU 总数减去 kubelet --kube-reserved--system-reserved 选项预留的 CPU 数量(防止 kubelet、containerd、systemd 等系统进程没有 CPU 可用)。BestEffortBurstable 还有 Guaranteed 声明了非整数值 CPU 的 Pod 都将使用共享池中的 CPU,只有指定整数 CPU 的 Guaranteed Pod,才会被分配可独占的 CPU 资源。

Pod QoS

在 Pod 创建时会 Kubenetes 会为其设置 QoS 类:

Guaranteed Pod 资源限制最严格,最不可能面临驱逐:

  • Pod 中的每个容器 CPU 和内存都必须有 request 和 limit
  • CPU 和内存的 request 和 limit 都相等
spec:
  containers:
  - name: guaranteed-pod
    image: nginx:latest
    resources:
      limits:
        memory: 200Mi
        cpu: 1
      requests:
        memory: 200Mi
        cpu: 1

Burstable Pod 有基于 request 的资源下限保证,但 limit 是非必需的,允许 Pod 在资源可用时灵活地多吃一些资源

  • Pod 不满足 Guaranteed 的判据
  • Pod 中至少一个容器有内存或 CPU 的 request 或 limit
spec:
  containers:
  - name: guaranteed-pod
    image: nginx:latest
    resources:
      limits:
        memory: 200Mi
      requests:
        memory: 200Mi

以上 Pod 属于 Burstable 是因为 request 和 limit 中都缺失了 cpu 数量

BestEffort Pod 可以尝试使用未专门分配给其他 QoS 类中的 Pod 的节点资源,但如果节点遇到资源压力,kubelet 将优先驱逐这种 Pod

  • Pod 中所有容器既没有 request 也没有 limit
spec:
  containers:
  - name: nginx
    image: nginx:latest

Pod 独占 CPU

已知 kubelet 通过 Linux 的 cpusets 来实现 Pod 独占 CPU(即绑核)。

cpusets 可以将一组 CPU 和内存节点分配给一组任务(进程和线程都是任务),表现形式为虚拟文件系统中的树状结构。用户的代码(这里是 kubelet)在 cgroup 虚拟文件系统中创建和删除 cpusets,将 CPU 和内存节点分配给每个 cpuset,并将任务指定给 cpuset,限制这些任务只能使用哪些 CPU 和内存节点。

确保已将 kubelet 的 CPU Manager 策略设置为 static

$ /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf --config=/var/lib/kubelet/config.yaml --cgroup-driver=systemd --container-runtime-endpoint=unix:///run/containerd/containerd.sock --pod-infra-container-image=registry-1.ict-mec.net:18443/kubesphere/pause:3.9 --node-ip=172.18.22.164 --hostname-override=node164 --authentication-token-webhook=true --authorization-mode=Webhook --kube-reserved=cpu=4,memory=4Gi --system-reserved=cpu=4,memory=24Gi --eviction-hard=memory.available<4096Mi,nodefs.available<10% --cpu-manager-policy=static --topology-manager-policy=single-numa-node --reserved-cpus=0-1,12-13,6-7,18-19 --memory-manager-policy=Static --reserved-memory 0:memory=32Gi

查看该节点的 NUMA 拓扑:

$ numactl -H
available: 1 nodes (0)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
node 0 size: 128357 MB
node 0 free: 91004 MB
node distances:
node   0
  0:  10

确定该节点只有一个 NUMA 节点,即一块 CPU,所有 24 个核心都在这个 NUMA 节点上。

接着来看 kubelet 管理的 cgroup 子系统:

$ cat /sys/fs/cgroup/cpuset/kubepods.slice/cpuset.cpus
0-23

24 个核心都被绑定至 kubepods.slice 子系统。

我们创建一个独占的示例 Pod 并找到其 uuid:

$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: pod-cpuset-demo
spec:
  containers:
  - name: stress
    image: polinux/stress
    resources:
      requests:
        memory: 512Mi
        cpu: 1
      limits:
        memory: 512Mi
        cpu: 1
    command: ["stress"]
    args: ["--cpu", "1"]
EOF

$ kubectl get po pod-cpuset-demo -o jsonpath='{.metadata.uid}'
33aa7aff-97ad-41eb-b321-40a3cb26804b

一定的规则拼出路径并查看:

$ ll /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/
total 0
drwxr-xr-x 2 root root 0 Sep 14 23:57 cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope
drwxr-xr-x 2 root root 0 Sep 14 23:57 cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope
# a lot of dirs here

有两个容器的目录,对应 pause 和 stress,分别查看 cpuset.cpus 和 tasks 文件:

  • pause:

    $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope/cpuset.cpus
    0-23
    
    $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope/tasks
    558255
    
    $ ps --pid 558255
    PID TTY          TIME CMD
    558255 ?        00:00:00 pause
    
    • cpuset.cpus 文件输出 0-23,24 个核心都在列,表示进程可以随意使用这 0 - 23 号 CPU。
    • tasks 文件输出 pause 进程的 pid,将该 cpuset 与 pause 进程绑定,所以 pause 容器可以随意使用这 0 - 23 号 CPU。
  • stress:

    $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope/cpuset.cpus
    2
    
    $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope/tasks
    558462
    558484
    
    $ pstree -p 558462
    stress(558462)───stress(558484)
    
    • cpuset.cpus 文件输出 2,表示进程只能使用 2 号 CPU。
    • tasks 文件输出 nginx 进程的 pid,将该 cpuset 与 stress 进程绑定,所以 stress 容器就与 2 号 CPU 绑定了。

查看 2 号 cpu 的使用率:

top - 23:50:47 up 13:54,  1 user,  load average: 3.06, 3.07, 2.64
Tasks: 647 total,   2 running, 644 sleeping,   0 stopped,   1 zombie
%Cpu0  :  2.0 us,  1.0 sy,  0.0 ni, 96.0 id,  0.3 wa,  0.3 hi,  0.3 si,  0.0 st
%Cpu1  :  2.3 us,  2.3 sy,  0.0 ni, 94.7 id,  0.0 wa,  0.3 hi,  0.3 si,  0.0 st
%Cpu2  : 63.9 us,  8.6 sy,  0.0 ni, 27.5 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu3  :  1.0 us,  0.7 sy,  0.0 ni, 98.0 id,  0.0 wa,  0.3 hi,  0.0 si,  0.0 st
%Cpu4  :  0.7 us,  0.7 sy,  0.0 ni, 98.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu5  :  1.0 us,  0.3 sy,  0.0 ni, 98.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu6  :  3.6 us,  1.7 sy,  0.0 ni, 94.1 id,  0.0 wa,  0.3 hi,  0.3 si,  0.0 st
%Cpu7  :  3.3 us,  1.3 sy,  0.0 ni, 94.7 id,  0.0 wa,  0.3 hi,  0.3 si,  0.0 st
%Cpu8  :  2.0 us,  1.0 sy,  0.0 ni, 97.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu9  :  1.0 us,  0.7 sy,  0.0 ni, 98.0 id,  0.0 wa,  0.0 hi,  0.3 si,  0.0 st
# a lot of cpus here
MiB Mem : 128357.9 total,  89245.1 free,  23076.4 used,  16036.5 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used. 104039.1 avail Mem

验证了 stress 进程确实被绑定到 cpu2 上。

kubelet cpusets 实现

check point

修改 CPU Manager 策略文档中提到的 cpu_manager_state 是 kubelet 的 cpuset 状态检查点文件:

$ cat /var/lib/kubelet/cpu_manager_state | jq
{
  "policyName": "static",
  "defaultCpuSet": "0-1,3-23",
  "entries": {
    "33aa7aff-97ad-41eb-b321-40a3cb26804b": {
      "stress": "2"
    }
  },
  "checksum": 1616165515
}

其中记录了该节点上容器的 cpuset 状态:uuid 为 33aa7aff-97ad-41eb-b321-40a3cb26804b 的 Pod 中的 stress 容器绑定 cpu2。

kubelet CPU manager 相关的实现都在 pkg/kubelet/cm/cpumanager 路径下,先从 CPU manager 初始化入手 https://github.com/kubernetes/kubernetes/blob/v1.26.8/pkg/kubelet/cm/cpumanager/cpu_manager.go

// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
    var topo *topology.CPUTopology
    var policy Policy
    var err error

    switch policyName(cpuPolicyName) {

    case PolicyNone:
        policy, err = NewNonePolicy(cpuPolicyOptions)
        if err != nil {
        return nil, fmt.Errorf("new none policy error: %w", err)
        }

    case PolicyStatic:
        topo, err = topology.Discover(machineInfo)
        if err != nil {
        return nil, err
        }
        klog.InfoS("Detected CPU topology", "topology", topo)

        reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
        if !ok {
        // The static policy cannot initialize without this information.
        return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
        }
        if reservedCPUs.IsZero() {
        // The static policy requires this to be nonzero. Zero CPU reservation
        // would allow the shared pool to be completely exhausted. At that point
        // either we would violate our guarantee of exclusivity or need to evict
        // any pod that has at least one container that requires zero CPUs.
        // See the comments in policy_static.go for more details.
        return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
        }

        // Take the ceiling of the reservation, since fractional CPUs cannot be
        // exclusively allocated.
        reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
        numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
        policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
        if err != nil {
        return nil, fmt.Errorf("new static policy error: %w", err)
        }

    default:
        return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
    }

    // a lot of code here
    }

当 cpu manager policy 设置为 static 时,必须为系统服务预留资源

我们要先找一下 manager 对象的 state 字段在 Start 方法中赋值:

// cpuManagerStateFileName is the file name where cpu manager stores its state
const cpuManagerStateFileName = "cpu_manager_state"

    func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
    stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
    if err != nil {
        klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
        return err
    }
    m.state = stateImpl
}

注意到 /var/lib/kubelet/cpu_manager_state 文件在此被使用,State 接口的实现 stateCheckpointstate_checkpoint.go 文件中:

type stateCheckpoint struct {
    mux               sync.RWMutex
    policyName        string
    cache             State
    checkpointManager checkpointmanager.CheckpointManager
    checkpointName    string
    initialContainers containermap.ContainerMap
}

// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
    checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
    }
    stateCheckpoint := &stateCheckpoint{
        cache:             NewMemoryState(),
        policyName:        policyName,
        checkpointManager: checkpointManager,
        checkpointName:    checkpointName,
        initialContainers: initialContainers,
    }

    // a lot of code here
}

再来看 manager 对象的 Allocate 方法:

func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
    // a lot of code here

    // Call down into the policy to assign this container CPUs if required.
    err := m.policy.Allocate(m.state, p, c)
    if err != nil {
        klog.ErrorS(err, "Allocate error")
        return err
    }

    return nil
}

找到 static policy 的 Allocate 方法:

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
    // a lot of code here
    if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
        p.updateCPUsToReuse(pod, container, cpuset)
        klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
        return nil
    }

    // Call Topology Manager to get the aligned socket affinity across all hint providers.
    hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
    klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)

    // Allocate CPUs according to the NUMA affinity contained in the hint.
    cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
    if err != nil {
        klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
        return err
    }
    s.SetCPUSet(string(pod.UID), container.Name, cpuset)
    p.updateCPUsToReuse(pod, container, cpuset)

    return nil
}
  1. 通过 GetCPUSet 方法从 check point 中读取容器的 cpuset:

    // GetCPUSet returns current CPU set
    func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
        sc.mux.RLock()
        defer sc.mux.RUnlock()
    
        res, ok := sc.cache.GetCPUSet(podUID, containerName)
        return res, ok
    }
    

    stateCheckpoint 从缓存中拿 cpuset,在初始化 stateCheckpoint 对象时就从 /var/lib/kubelet/cpu_manager_state 文件中读取

    如果能拿到,说明之前已经设置过(该容器可能只是重启而非新建)

  2. 如果没拿到(该容器很可能是新建的),调用 allocateCPUs 方法为其设置 cpuset:

    func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
        klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
    
        allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
    
        // If there are aligned CPUs in numaAffinity, attempt to take those first.
        result := cpuset.NewCPUSet()
        if numaAffinity != nil {
            alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)
    
            numAlignedToAlloc := alignedCPUs.Size()
            if numCPUs < numAlignedToAlloc {
            numAlignedToAlloc = numCPUs
            }
    
            alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
            if err != nil {
            return cpuset.NewCPUSet(), err
            }
    
            result = result.Union(alignedCPUs)
        }
    
        // Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
        remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
        if err != nil {
            return cpuset.NewCPUSet(), err
        }
        result = result.Union(remainingCPUs)
    
        // Remove allocated CPUs from the shared CPUSet.
        s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
    
        klog.InfoS("AllocateCPUs", "result", result)
        return result, nil
    }
    
  3. 最后用 SetCPUSet 将 cpuset 保存至 check point:

    // SetCPUSet sets CPU set
    func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
        sc.mux.Lock()
        defer sc.mux.Unlock()
        sc.cache.SetCPUSet(podUID, containerName, cset)
        err := sc.storeState()
        if err != nil {
            klog.InfoS("Store state to checkpoint error", "err", err)
        }
    }
    

    将该容器的 cpuset 写入缓存,然后持久化至磁盘上的 /var/lib/kubelet/cpu_manager_state 文件中。

cgroup

再来看 kubelet 是如何将 cpuset 应用至 cgroup 的,这里需要读者对 kubelet cgroup 相关代码有所了解,请先阅读 https://blog.crazytaxii.com/posts/cgroup_driver/。本文只以 systemd v1 cgroup driver 为例:

  • cpuset.cpus

    func (s *CpusetGroup) Set(path string, r *configs.Resources) error {
        if r.CpusetCpus != "" {
            if err := cgroups.WriteFile(path, "cpuset.cpus", r.CpusetCpus); err != nil {
                return err
            }
        }
        // a lot of code here
        return nil
    }
    

    将值(例如 2)写入 cpuset.cpus 文件。

  • cgroup.procs

    func (s *CpusetGroup) ApplyDir(dir string, r *configs.Resources, pid int) error {
        // a lot of code here
        return cgroups.WriteCgroupProc(dir, pid)
    }
    
    func WriteCgroupProc(dir string, pid int) error {
        // a lot of code here
        file, err := OpenFile(dir, CgroupProcesses, os.O_WRONLY) // cgroup.procs
        if err != nil {
            return fmt.Errorf("failed to write %v: %w", pid, err)
        }
        // a lot of code here
    }
    

    将 pid(例如 558462)写入 cgroup.procs 文件,同目录下的 tasks 文件是由 cgroup 自己来维护的,会自动更新

我们反推 r.CpusetCpus 的值从 internalContainerLifecycleImpl 对象的 PreCreateContainer 方法处赋予:

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
    if i.cpuManager != nil {
        allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
        if !allocatedCPUs.IsEmpty() {
            containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()
        }
    }

    // a lot of code here

    return nil
}

从 CPU manager 拿到已经分配好的 cpuset.CPUSet

再继续反推 PreCreateContainer 方法的调用处:

// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    // a lot of code here
    containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)

    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)

    err = m.runtimeService.StartContainer(ctx, containerID)
    // a lot of code here
}

从注释就可以看出,kubelet 进程中 kubeGenericRuntimeManager 对象的 startContainer 方法中创建容器(调用 CRI),准备容器所需的环境(包括 cpuset 的 cgroups 等等),然后启动容器(调用 CRI)。

参考