Kubernetes 调度器与节点筛选

Jun 22, 2022 22:30 · 3458 words · 7 minute read Kubernetes Golang

在定义 Kubernetes Pod 时,可以选择设置每个容器所需的资源。最常见的资源无非就是 CPU 和 RAM。

---
apiVersion: v1
kind: Pod
metadata:
  name: frontend
spec:
  containers:
  - name: app
    image: images.my-company.example/app:v4
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "128Mi"
        cpu: "500m"
  - name: log-aggregator
    image: images.my-company.example/log-aggregator:v6
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "128Mi"
        cpu: "500m"

当声明了 Pod 中容器的资源 request,kube-scheduler 会使用该信息来决策将 Pod 调度到集群中的哪个节点上。

而如果在 Pod 定义中声明 limit,kubelet 会利用 Linux cgroup 来限制容器的最大可使用资源,本文不做详解。

Kubernetes 调度器

Kubernetes 调度器(kube-scheduler)是将 Pod 分配至 Node 的控制面进程:从调度队列中取出 Pod,根据 Pod 定义中的限制声明以及集群中的可用资源找到 Pod 落点,并将 Pod 和节点绑定。

调度器基于的调度框架是插件化的。将各组插件添加至调度器中,并与调度器一起编译。这个架构使得大多数调度行为或功能以插件的形式实现,保持调度器内核的轻量化和可维护。

请自行阅读调度器框架的设计提案

一个 Pod 调度过程可以分为两个周期:

  1. 调度周期(scheduling cycle)
    1. PreFilter
    2. Filter
    3. PostFilter
    4. PreScore
    5. Score
    6. PostScore
    7. Normalize Score
    8. Reserve
    9. Permit
  2. 绑定周期(binding cycle)
    1. WaitOnPermit
    2. PreBind
    3. Bind
    4. PostBind

调度周期为 Pod 选择合适的节点;绑定周期将决策应用至集群。调度周期是串行化的;而绑定周期则是并发执行的。

Pod 调度过程如图所示,在两个周期中都已经定义好各阶段的扩展点,随着调度的进行,各扩展点都会调用到注册过的插件。

调度器初始化

kube-scheduler 启动参数请参考 https://kubernetes.io/docs/reference/command-line-tools-reference/kube-scheduler/

选主:虽然 Kubernetes 集群中可能存在多个 kube-scheduler Pod,但只有一个调度器在真正地运作 https://github.com/kubernetes/kubernetes/blob/v1.23.6/cmd/kube-scheduler/app/server.go#L193-L221

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                close(waitingForLeader)
                sched.Run(ctx)
            },
            OnStoppedLeading: func() {
                select {
                case <-ctx.Done():
                    // We were asked to terminate. Exit 0.
                    klog.InfoS("Requested to terminate, exiting")
                    os.Exit(0)
                default:
                    // We lost the lock.
                    klog.ErrorS(nil, "Leaderelection lost")
                    os.Exit(1)
                }
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

sched 就是调度器实例,当选的 kube-scheduler 进程将运行调度器 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L293-L298

func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}

调度器的 sched.scheduleOne 方法包含了对 Pod 的整一个调度流程,将按图上所有扩展点的顺序往下执行 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L424-L638

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }
    // a lot of code here

    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
    // a lot of code here
}

sched.Algorithm.Schedule 方法包含了 PreFilter、Filter、Score 和 Normalize Score 等步骤,本文只解析 Filter 相关。

我们再来看调度器实例的初始化 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L207-L279

    configurator := &Configurator{
        componentConfigVersion:   options.componentConfigVersion,
        client:                   client,
        kubeConfig:               options.kubeConfig,
        recorderFactory:          recorderFactory,
        informerFactory:          informerFactory,
        schedulerCache:           schedulerCache,
        StopEverything:           stopEverything,
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        podInitialBackoffSeconds: options.podInitialBackoffSeconds,
        podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
        profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
        registry:                 registry,
        nodeInfoSnapshot:         snapshot,
        extenders:                options.extenders,
        frameworkCapturer:        options.frameworkCapturer,
        parallellism:             options.parallelism,
        clusterEventMap:          clusterEventMap,
    }

    metrics.Register()

    // Create the config from component config
    sched, err := configurator.create()

sched 对象是从一个 Configurator 对象实例化来的 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/factory.go#L86-L198

func (c *Configurator) create() (*Scheduler, error) {
    // a lot of code here
    profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
        frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion),
        frameworkruntime.WithClientSet(c.client),
        frameworkruntime.WithKubeConfig(c.kubeConfig),
        frameworkruntime.WithInformerFactory(c.informerFactory),
        frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot),
        frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),
        frameworkruntime.WithPodNominator(nominator),
        frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)),
        frameworkruntime.WithClusterEventMap(c.clusterEventMap),
        frameworkruntime.WithParallelism(int(c.parallellism)),
        frameworkruntime.WithExtenders(extenders),
    )
    if err != nil {
        return nil, fmt.Errorf("initializing profiles: %v", err)
    }
    // a lot of code here
    algo := NewGenericScheduler(
        c.schedulerCache,
        c.nodeInfoSnapshot,
        c.percentageOfNodesToScore,
    )

    return &Scheduler{
        SchedulerCache:  c.schedulerCache,
        Algorithm:       algo,
        Extenders:       extenders,
        Profiles:        profiles,
        NextPod:         internalqueue.MakeNextPodFunc(podQueue),
        Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
        StopEverything:  c.StopEverything,
        SchedulingQueue: podQueue,
    }, nil
}

在这里看到了 sched.Algorithmsched.Profiles 是如何被初始化的:NewGenericScheduler 函数返回了一个通用调度器 genericScheduler 对象

再来挖一下定义了调度行为的插件是如何被注册到调度框架中的。

上面的 Configurator 结构中的 registry 对象由 frameworkplugins.NewInTreeRegistry 函数实例化 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L236

// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go
registry := frameworkplugins.NewInTreeRegistry()

// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/registry.go
func NewInTreeRegistry() runtime.Registry {
    fts := plfeature.Features{
        EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
        EnablePodDisruptionBudget:          feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
        EnablePodOverhead:                  feature.DefaultFeatureGate.Enabled(features.PodOverhead),
        EnableReadWriteOncePod:             feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
        EnableVolumeCapacityPriority:       feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
        EnableCSIStorageCapacity:           feature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity),
    }

    return runtime.Registry{
        selectorspread.Name:                  selectorspread.New,
        imagelocality.Name:                   imagelocality.New,
        tainttoleration.Name:                 tainttoleration.New,
        nodename.Name:                        nodename.New,
        nodeports.Name:                       nodeports.New,
        nodeaffinity.Name:                    nodeaffinity.New,
        podtopologyspread.Name:               podtopologyspread.New,
        nodeunschedulable.Name:               nodeunschedulable.New,
        noderesources.FitName:                runtime.FactoryAdapter(fts, noderesources.NewFit),
        noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
        volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
        volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
        volumezone.Name:                      volumezone.New,
        nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
        nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
        nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
        nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
        nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
        interpodaffinity.Name:                runtime.FactoryAdapter(fts, interpodaffinity.New),
        queuesort.Name:                       queuesort.New,
        defaultbinder.Name:                   defaultbinder.New,
        defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
    }
}

runtime.Registry 对象是一个插件表,各插件都在此初始化和注册。

根据 request 过滤节点

Fit 插件已被注册到调度框架中,实现了两个扩展点:

  • PreFilter
  • Filter

PreFilter

当调度器来到 PreFilter 扩展点将调用插件的 PreFilter 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/noderesources/fit.go#181-L185

func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
    cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, f.enablePodOverhead))
    return nil
}

通过 computePodResourceRequest 函数计算当前 Pod 所需要的资源 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/noderesources/fit.go#135-L179

func computePodResourceRequest(pod *v1.Pod, enablePodOverhead bool) *preFilterState {
    result := &preFilterState{}
    for _, container := range pod.Spec.Containers {
        result.Add(container.Resources.Requests)
    }

    // take max_resource(sum_pod, any_init_container)
    for _, container := range pod.Spec.InitContainers {
        result.SetMaxResource(container.Resources.Requests)
    }

    // If Overhead is being utilized, add to the total requests for the pod
    if pod.Spec.Overhead != nil && enablePodOverhead {
        result.Add(pod.Spec.Overhead)
    }

    return result
}
  1. 累加 Pod 对象中所有容器 request 资源(CPU、内存和存储)
  2. 遍历 Pod 所有 init 容器,只要有 init 容器 request 资源超过上一步计算出的资源和,则使该值覆盖
  3. 如果 Pod 定义了 spec.overhead,就一起加上

将计算好的 Pod 所需资源写入调度器缓存中 cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, f.enablePodOverhead))

Filter

当调度器来到 Filter 扩展点将调用其 Filter 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/noderesources/fit.go#L218-L238

func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    s, err := getPreFilterState(cycleState)
    if err != nil {
        return framework.AsStatus(err)
    }

    insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources, f.ignoredResourceGroups)
    // a lot of code here
}

真正判断某个节点的资源是否足够的是 fitsRequest 函数 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/noderesources/fit.go#L256-L329

func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
    insufficientResources := make([]InsufficientResource, 0, 4)

    allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
    if len(nodeInfo.Pods)+1 > allowedPodNumber {
        insufficientResources = append(insufficientResources, InsufficientResource{
            v1.ResourcePods,
            "Too many pods",
            1,
            int64(len(nodeInfo.Pods)),
            int64(allowedPodNumber),
        })
    }

    if podRequest.MilliCPU == 0 &&
        podRequest.Memory == 0 &&
        podRequest.EphemeralStorage == 0 &&
        len(podRequest.ScalarResources) == 0 {
        return insufficientResources
    }

    if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
        insufficientResources = append(insufficientResources, InsufficientResource{
            v1.ResourceCPU,
            "Insufficient cpu",
            podRequest.MilliCPU,
            nodeInfo.Requested.MilliCPU,
            nodeInfo.Allocatable.MilliCPU,
        })
    }
    if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
        insufficientResources = append(insufficientResources, InsufficientResource{
            v1.ResourceMemory,
            "Insufficient memory",
            podRequest.Memory,
            nodeInfo.Requested.Memory,
            nodeInfo.Allocatable.Memory,
        })
    }
    if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - nodeInfo.Requested.EphemeralStorage) {
        insufficientResources = append(insufficientResources, InsufficientResource{
            v1.ResourceEphemeralStorage,
            "Insufficient ephemeral-storage",
            podRequest.EphemeralStorage,
            nodeInfo.Requested.EphemeralStorage,
            nodeInfo.Allocatable.EphemeralStorage,
        })
    }

    for rName, rQuant := range podRequest.ScalarResources {
        if v1helper.IsExtendedResourceName(rName) {
            // If this resource is one of the extended resources that should be ignored, we will skip checking it.
            // rName is guaranteed to have a slash due to API validation.
            var rNamePrefix string
            if ignoredResourceGroups.Len() > 0 {
                rNamePrefix = strings.Split(string(rName), "/")[0]
            }
            if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
                continue
            }
        }
        if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
            insufficientResources = append(insufficientResources, InsufficientResource{
                rName,
                fmt.Sprintf("Insufficient %v", rName),
                podRequest.ScalarResources[rName],
                nodeInfo.Requested.ScalarResources[rName],
                nodeInfo.Allocatable.ScalarResources[rName],
            })
        }
    }

    return insufficientResources
}
  1. 判断是否已经达到节点允许的最大 Pod 数量
  2. 判断 Pod 所需资源(CPU、内存和存储)是否超过节点可分配的资源
  3. 判断其他扩展资源(比如 GPU),不能超过该节点上可分配的数量

通用调度器会对的一个给定的节点列表进行筛选,挑选出一个或多个合适的节点 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L256-L338

func (g *genericScheduler) findNodesThatPassFilters(
    ctx context.Context,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    diagnosis framework.Diagnosis,
    nodes []*framework.NodeInfo) ([]*v1.Node, error) {
    numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
    feasibleNodes := make([]*v1.Node, numNodesToFind)

    if !fwk.HasFilterPlugins() {
        length := len(nodes)
        for i := range feasibleNodes {
            feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
        }
        g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
        return feasibleNodes, nil
    }

    checkNode := func(i int) {
        // We check the nodes starting from where we left off in the previous scheduling cycle,
        // this is to make sure all nodes have the same chance of being examined across pods.
        nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
        status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
        if status.Code() == framework.Error {
            errCh.SendErrorWithCancel(status.AsError(), cancel)
            return
        }
        if status.IsSuccess() {
            length := atomic.AddInt32(&feasibleNodesLen, 1)
            if length > numNodesToFind {
                cancel()
                atomic.AddInt32(&feasibleNodesLen, -1)
            } else {
                feasibleNodes[length-1] = nodeInfo.Node()
            }
        } else {
            statusesLock.Lock()
            diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
            diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
            statusesLock.Unlock()
        }
    }
    // a lot of code here
}

为了避免候选节点过多导致效率低下,调度器只会对一部分节点筛选。先通过 numFeasibleNodesToFind 方法算出将被筛选的节点数量:

const minFeasibleNodesToFind = 100

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
        return numAllNodes
    }

    adaptivePercentage := g.percentageOfNodesToScore
    if adaptivePercentage <= 0 {
        basePercentageOfNodesToScore := int32(50)
        adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
        if adaptivePercentage < minFeasibleNodesPercentageToFind {
            adaptivePercentage = minFeasibleNodesPercentageToFind
        }
    }

    numNodes = numAllNodes * adaptivePercentage / 100
    if numNodes < minFeasibleNodesToFind {
        return minFeasibleNodesToFind
    }

    return numNodes
}

如果经过筛选只有一个节点符合资源要求,那没得选就是它了,调度器会将 Pod 与该节点绑定;反之筛选后有多个节点满足条件,那就要进入打分(Score)环节,将在打分篇中详细阐述。