Kubernetes Pod 绑核
Sep 15, 2023 21:00 · 3422 words · 7 minute read
默认配置下,kubelet/Linux 使用 CFS(完全公平调度)算法来为 Pod 分配 CPU,工作负载(Pod 中的进程)会被调度到不同的可用的 CPU 核心,而且大多数工作负载对这种迁移带来的性能损失并不敏感。
CFS 是 Linux 内核中 SCHED_NORMAL 类任务(普通进程)的默认调度算法,摒弃了原先的固定时间片优先级概念,致力于公平地将 CPU 时间分配给任务。
但是 CPU 缓存的亲和性和调度延迟会对少数进程带来显著影响,这类工作负载需要独占 CPU(也就是所谓的“绑核”)。
kubelet CPU manager 策略
通过设置 --cpu-manager-policy
flag 或者配置文件中的 cpuManagerPolicy
字段为 kubelet 指定 CPU manager 策略,目前支持两种:
-
none
:默认策略,通过 Linux 默认的 CFS quota 实现 Guaranteed 和 Burstable Pod 的 CPU 使用限制。 -
static
:允许requests
中 CPU 为整数 的 Guaranteed Pod 独占节点上的 CPU,通过 Linux cpuset cgroup 来实现。static
策略管理一个共享的 CPU 池,包含了节点上所有 CPU。可独占分配的 CPU 数量等于节点 CPU 总数减去 kubelet--kube-reserved
或--system-reserved
选项预留的 CPU 数量(防止 kubelet、containerd、systemd 等系统进程没有 CPU 可用)。BestEffort、Burstable 还有 Guaranteed 声明了非整数值 CPU 的 Pod 都将使用共享池中的 CPU,只有指定整数 CPU 的 Guaranteed Pod,才会被分配可独占的 CPU 资源。
Pod QoS
在 Pod 创建时会 Kubenetes 会为其设置 QoS 类:
Guaranteed Pod 资源限制最严格,最不可能面临驱逐:
- Pod 中的每个容器 CPU 和内存都必须有 request 和 limit
- CPU 和内存的 request 和 limit 都相等
spec:
containers:
- name: guaranteed-pod
image: nginx:latest
resources:
limits:
memory: 200Mi
cpu: 1
requests:
memory: 200Mi
cpu: 1
Burstable Pod 有基于 request 的资源下限保证,但 limit 是非必需的,允许 Pod 在资源可用时灵活地多吃一些资源:
- Pod 不满足 Guaranteed 的判据
- Pod 中至少一个容器有内存或 CPU 的 request 或 limit
spec:
containers:
- name: guaranteed-pod
image: nginx:latest
resources:
limits:
memory: 200Mi
requests:
memory: 200Mi
以上 Pod 属于 Burstable 是因为 request 和 limit 中都缺失了 cpu 数量
BestEffort Pod 可以尝试使用未专门分配给其他 QoS 类中的 Pod 的节点资源,但如果节点遇到资源压力,kubelet 将优先驱逐这种 Pod:
- Pod 中所有容器既没有 request 也没有 limit
spec:
containers:
- name: nginx
image: nginx:latest
Pod 独占 CPU
已知 kubelet 通过 Linux 的 cpusets 来实现 Pod 独占 CPU(即绑核)。
cpusets 可以将一组 CPU 和内存节点分配给一组任务(进程和线程都是任务),表现形式为虚拟文件系统中的树状结构。用户的代码(这里是 kubelet)在 cgroup 虚拟文件系统中创建和删除 cpusets,将 CPU 和内存节点分配给每个 cpuset,并将任务指定给 cpuset,限制这些任务只能使用哪些 CPU 和内存节点。
确保已将 kubelet 的 CPU Manager 策略设置为 static:
$ /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --kubeconfig=/etc/kubernetes/kubelet.conf --config=/var/lib/kubelet/config.yaml --cgroup-driver=systemd --container-runtime-endpoint=unix:///run/containerd/containerd.sock --pod-infra-container-image=registry-1.ict-mec.net:18443/kubesphere/pause:3.9 --node-ip=172.18.22.164 --hostname-override=node164 --authentication-token-webhook=true --authorization-mode=Webhook --kube-reserved=cpu=4,memory=4Gi --system-reserved=cpu=4,memory=24Gi --eviction-hard=memory.available<4096Mi,nodefs.available<10% --cpu-manager-policy=static --topology-manager-policy=single-numa-node --reserved-cpus=0-1,12-13,6-7,18-19 --memory-manager-policy=Static --reserved-memory 0:memory=32Gi
查看该节点的 NUMA 拓扑:
$ numactl -H
available: 1 nodes (0)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
node 0 size: 128357 MB
node 0 free: 91004 MB
node distances:
node 0
0: 10
确定该节点只有一个 NUMA 节点,即一块 CPU,所有 24 个核心都在这个 NUMA 节点上。
接着来看 kubelet 管理的 cgroup 子系统:
$ cat /sys/fs/cgroup/cpuset/kubepods.slice/cpuset.cpus
0-23
24 个核心都被绑定至 kubepods.slice 子系统。
我们创建一个独占的示例 Pod 并找到其 uuid:
$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
name: pod-cpuset-demo
spec:
containers:
- name: stress
image: polinux/stress
resources:
requests:
memory: 512Mi
cpu: 1
limits:
memory: 512Mi
cpu: 1
command: ["stress"]
args: ["--cpu", "1"]
EOF
$ kubectl get po pod-cpuset-demo -o jsonpath='{.metadata.uid}'
33aa7aff-97ad-41eb-b321-40a3cb26804b
按一定的规则拼出路径并查看:
$ ll /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/
total 0
drwxr-xr-x 2 root root 0 Sep 14 23:57 cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope
drwxr-xr-x 2 root root 0 Sep 14 23:57 cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope
# a lot of dirs here
有两个容器的目录,对应 pause 和 stress,分别查看 cpuset.cpus 和 tasks 文件:
-
pause:
$ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope/cpuset.cpus 0-23 $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-8b0630c3bca547790c528cba84a2787f5133cec39f2259f8015396f3c5db8d2d.scope/tasks 558255 $ ps --pid 558255 PID TTY TIME CMD 558255 ? 00:00:00 pause
- cpuset.cpus 文件输出
0-23
,24 个核心都在列,表示进程可以随意使用这 0 - 23 号 CPU。 - tasks 文件输出 pause 进程的 pid,将该 cpuset 与 pause 进程绑定,所以 pause 容器可以随意使用这 0 - 23 号 CPU。
- cpuset.cpus 文件输出
-
stress:
$ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope/cpuset.cpus 2 $ cat /sys/fs/cgroup/cpuset/kubepods.slice/kubepods-pod33aa7aff_97ad_41eb_b321_40a3cb26804b.slice/cri-containerd-e607aa77c01d0a079afc28b94768e5c08bda636dbd854f8154e612824d71099a.scope/tasks 558462 558484 $ pstree -p 558462 stress(558462)───stress(558484)
- cpuset.cpus 文件输出
2
,表示进程只能使用 2 号 CPU。 - tasks 文件输出 nginx 进程的 pid,将该 cpuset 与 stress 进程绑定,所以 stress 容器就与 2 号 CPU 绑定了。
- cpuset.cpus 文件输出
查看 2 号 cpu 的使用率:
top - 23:50:47 up 13:54, 1 user, load average: 3.06, 3.07, 2.64
Tasks: 647 total, 2 running, 644 sleeping, 0 stopped, 1 zombie
%Cpu0 : 2.0 us, 1.0 sy, 0.0 ni, 96.0 id, 0.3 wa, 0.3 hi, 0.3 si, 0.0 st
%Cpu1 : 2.3 us, 2.3 sy, 0.0 ni, 94.7 id, 0.0 wa, 0.3 hi, 0.3 si, 0.0 st
%Cpu2 : 63.9 us, 8.6 sy, 0.0 ni, 27.5 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu3 : 1.0 us, 0.7 sy, 0.0 ni, 98.0 id, 0.0 wa, 0.3 hi, 0.0 si, 0.0 st
%Cpu4 : 0.7 us, 0.7 sy, 0.0 ni, 98.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu5 : 1.0 us, 0.3 sy, 0.0 ni, 98.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu6 : 3.6 us, 1.7 sy, 0.0 ni, 94.1 id, 0.0 wa, 0.3 hi, 0.3 si, 0.0 st
%Cpu7 : 3.3 us, 1.3 sy, 0.0 ni, 94.7 id, 0.0 wa, 0.3 hi, 0.3 si, 0.0 st
%Cpu8 : 2.0 us, 1.0 sy, 0.0 ni, 97.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
%Cpu9 : 1.0 us, 0.7 sy, 0.0 ni, 98.0 id, 0.0 wa, 0.0 hi, 0.3 si, 0.0 st
# a lot of cpus here
MiB Mem : 128357.9 total, 89245.1 free, 23076.4 used, 16036.5 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 104039.1 avail Mem
验证了 stress 进程确实被绑定到 cpu2 上。
kubelet cpusets 实现
check point
修改 CPU Manager 策略文档中提到的 cpu_manager_state 是 kubelet 的 cpuset 状态检查点文件:
$ cat /var/lib/kubelet/cpu_manager_state | jq
{
"policyName": "static",
"defaultCpuSet": "0-1,3-23",
"entries": {
"33aa7aff-97ad-41eb-b321-40a3cb26804b": {
"stress": "2"
}
},
"checksum": 1616165515
}
其中记录了该节点上容器的 cpuset 状态:uuid 为 33aa7aff-97ad-41eb-b321-40a3cb26804b 的 Pod 中的 stress 容器绑定 cpu2。
kubelet CPU manager 相关的实现都在 pkg/kubelet/cm/cpumanager 路径下,先从 CPU manager 初始化入手 https://github.com/kubernetes/kubernetes/blob/v1.26.8/pkg/kubelet/cm/cpumanager/cpu_manager.go:
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var topo *topology.CPUTopology
var policy Policy
var err error
switch policyName(cpuPolicyName) {
case PolicyNone:
policy, err = NewNonePolicy(cpuPolicyOptions)
if err != nil {
return nil, fmt.Errorf("new none policy error: %w", err)
}
case PolicyStatic:
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}
klog.InfoS("Detected CPU topology", "topology", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information.
return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
}
if reservedCPUs.IsZero() {
// The static policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
}
// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
if err != nil {
return nil, fmt.Errorf("new static policy error: %w", err)
}
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
}
// a lot of code here
}
当 cpu manager policy 设置为 static 时,必须为系统服务预留资源。
我们要先找一下 manager
对象的 state
字段在 Start
方法中赋值:
// cpuManagerStateFileName is the file name where cpu manager stores its state
const cpuManagerStateFileName = "cpu_manager_state"
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
if err != nil {
klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
return err
}
m.state = stateImpl
}
注意到 /var/lib/kubelet/cpu_manager_state 文件在此被使用,State
接口的实现 stateCheckpoint
在 state_checkpoint.go 文件中:
type stateCheckpoint struct {
mux sync.RWMutex
policyName string
cache State
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
initialContainers containermap.ContainerMap
}
// NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
cache: NewMemoryState(),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
initialContainers: initialContainers,
}
// a lot of code here
}
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// a lot of code here
// Call down into the policy to assign this container CPUs if required.
err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.ErrorS(err, "Allocate error")
return err
}
return nil
}
找到 static policy 的 Allocate
方法:
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
// a lot of code here
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}
// Call Topology Manager to get the aligned socket affinity across all hint providers.
hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)
return nil
}
-
通过
GetCPUSet
方法从 check point 中读取容器的 cpuset:// GetCPUSet returns current CPU set func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { sc.mux.RLock() defer sc.mux.RUnlock() res, ok := sc.cache.GetCPUSet(podUID, containerName) return res, ok }
stateCheckpoint
从缓存中拿 cpuset,在初始化stateCheckpoint
对象时就从 /var/lib/kubelet/cpu_manager_state 文件中读取如果能拿到,说明之前已经设置过(该容器可能只是重启而非新建)
-
如果没拿到(该容器很可能是新建的),调用
allocateCPUs
方法为其设置 cpuset:func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity) allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs) // If there are aligned CPUs in numaAffinity, attempt to take those first. result := cpuset.NewCPUSet() if numaAffinity != nil { alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs) numAlignedToAlloc := alignedCPUs.Size() if numCPUs < numAlignedToAlloc { numAlignedToAlloc = numCPUs } alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc) if err != nil { return cpuset.NewCPUSet(), err } result = result.Union(alignedCPUs) } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } result = result.Union(remainingCPUs) // Remove allocated CPUs from the shared CPUSet. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result)) klog.InfoS("AllocateCPUs", "result", result) return result, nil }
-
最后用
SetCPUSet
将 cpuset 保存至 check point:// SetCPUSet sets CPU set func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { sc.mux.Lock() defer sc.mux.Unlock() sc.cache.SetCPUSet(podUID, containerName, cset) err := sc.storeState() if err != nil { klog.InfoS("Store state to checkpoint error", "err", err) } }
将该容器的 cpuset 写入缓存,然后持久化至磁盘上的 /var/lib/kubelet/cpu_manager_state 文件中。
cgroup
再来看 kubelet 是如何将 cpuset 应用至 cgroup 的,这里需要读者对 kubelet cgroup 相关代码有所了解,请先阅读 https://blog.crazytaxii.com/posts/cgroup_driver/。本文只以 systemd v1 cgroup driver 为例:
-
func (s *CpusetGroup) Set(path string, r *configs.Resources) error { if r.CpusetCpus != "" { if err := cgroups.WriteFile(path, "cpuset.cpus", r.CpusetCpus); err != nil { return err } } // a lot of code here return nil }
将值(例如
2
)写入 cpuset.cpus 文件。 -
func (s *CpusetGroup) ApplyDir(dir string, r *configs.Resources, pid int) error { // a lot of code here return cgroups.WriteCgroupProc(dir, pid) } func WriteCgroupProc(dir string, pid int) error { // a lot of code here file, err := OpenFile(dir, CgroupProcesses, os.O_WRONLY) // cgroup.procs if err != nil { return fmt.Errorf("failed to write %v: %w", pid, err) } // a lot of code here }
将 pid(例如
558462
)写入 cgroup.procs 文件,同目录下的 tasks 文件是由 cgroup 自己来维护的,会自动更新。
我们反推 r.CpusetCpus
的值从 internalContainerLifecycleImpl
对象的 PreCreateContainer
方法处赋予:
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil {
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()
}
}
// a lot of code here
return nil
}
从 CPU manager 拿到已经分配好的 cpuset.CPUSet
。
再继续反推 PreCreateContainer
方法的调用处:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
// a lot of code here
containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
err = m.runtimeService.StartContainer(ctx, containerID)
// a lot of code here
}
从注释就可以看出,kubelet 进程中 kubeGenericRuntimeManager
对象的 startContainer
方法中创建容器(调用 CRI),准备容器所需的环境(包括 cpuset 的 cgroups 等等),然后启动容器(调用 CRI)。