Kubernetes 调度器与节点打分

Jun 28, 2022 22:30 · 3114 words · 7 minute read Kubernetes Golang

Filter 扩展点过滤节点后如果只剩一个,也谈不上优选了,直接使用它 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L120-L127

func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    // a lot of code here
    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }
    // a lot of code here
}

如果节点数量大于 1,那就得优选 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L129-L132

func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    // a lot of code here
    priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }
}

Filter 一样,Score 扩展点也是调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L393-L495

func prioritizeNodes(
    ctx context.Context,
    extenders []framework.Extender,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (framework.NodeScoreList, error) {
    // If no priority configs are provided, then all nodes will have a score of one.
    // This is required to generate the priority list in the required format
    if len(extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{
                Name:  nodes[i].Name,
                Score: 1,
            })
        }
        return result, nil
    }

    // Run PreScore plugins.
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // Run the Score plugins.
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }
    // a lot of code here
}

实现打分方法的插件叫 SelectorSpread,同 Filter 插件 Fit 一起在 registry 初始化时注册 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/registry.go#L43-L80

func NewInTreeRegistry() runtime.Registry {
    // a lot of code here

    return runtime.Registry{
        selectorspread.Name:                  selectorspread.New,
        // a lot of code here
        noderesources.FitName:                runtime.FactoryAdapter(fts, noderesources.NewFit),
        // a lot of code here
    }
}

SelectorSpread 插件实现了三个扩展点:

  • PreScore
  • Score
  • NormalizeScore

PreScore

当调度器来到 PreScore 扩展点将调用 SelectorSpread 插件的 PreScore 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L180-L197

type SelectorSpread struct {
    sharedLister           framework.SharedLister
    services               corelisters.ServiceLister
    replicationControllers corelisters.ReplicationControllerLister
    replicaSets            appslisters.ReplicaSetLister
    statefulSets           appslisters.StatefulSetLister
}

func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
    if skipSelectorSpread(pod) {
        return nil
    }
    selector := helper.DefaultSelector(
        pod,
        pl.services,
        pl.replicationControllers,
        pl.replicaSets,
        pl.statefulSets,
    )
    state := &preScoreState{
        selector: selector,
    }
    cycleState.Write(preScoreStateKey, state)
    return nil
}

通过 helper.DefaultSelector 函数实例化了一个选择器(selector):

func DefaultSelector(
    pod *v1.Pod,
    sl corelisters.ServiceLister,
    cl corelisters.ReplicationControllerLister,
    rsl appslisters.ReplicaSetLister,
    ssl appslisters.StatefulSetLister,
) labels.Selector {
    labelSet := make(labels.Set)
    // Since services, RCs, RSs and SSs match the pod, they won't have conflicting
    // labels. Merging is safe.

    if services, err := GetPodServices(sl, pod); err == nil {
        for _, service := range services {
            labelSet = labels.Merge(labelSet, service.Spec.Selector)
        }
    }
    selector := labelSet.AsSelector()

    owner := metav1.GetControllerOfNoCopy(pod)
    if owner == nil {
        return selector
    }

    gv, err := schema.ParseGroupVersion(owner.APIVersion)
    if err != nil {
        return selector
    }

    gvk := gv.WithKind(owner.Kind)
    switch gvk {
    case rcKind:
        if rc, err := cl.ReplicationControllers(pod.Namespace).Get(owner.Name); err == nil {
            labelSet = labels.Merge(labelSet, rc.Spec.Selector)
            selector = labelSet.AsSelector()
        }
    case rsKind:
        if rs, err := rsl.ReplicaSets(pod.Namespace).Get(owner.Name); err == nil {
            if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
                if r, ok := other.Requirements(); ok {
                    selector = selector.Add(r...)
                }
            }
        }
    case ssKind:
        if ss, err := ssl.StatefulSets(pod.Namespace).Get(owner.Name); err == nil {
            if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
                if r, ok := other.Requirements(); ok {
                    selector = selector.Add(r...)
                }
            }
        }
    default:
        // Not owned by a supported controller.
    }

    return selector
}
  1. 寻找同命名空间下所有与当前 Pod 标签匹配的 Service
  2. 归并这些 Service 的选择器(Selector
  3. 找到当前 Pod 的所有者(Owner),ReplicationController、ReplicaSet、StatefulSets 中的一种
  4. 再次归并选择器

将选择器保存为 PreScore 的状态,写入调度器缓存中 cycleState.Write(preScoreStateKey, state)

Score

下面来看 Score 扩展点,调度器在此调用 SelectorSpread 插件的 Score 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L80-L105

func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    // a lot of code here
    nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
    if err != nil {
        return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
    }

    count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)
    return int64(count), nil
}

如果未设置拓扑分布约束,则执行 countMatchingPods 函数统计名为 nodeName 节点上匹配标签选择器的 Pod 数量 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L218-L234

func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {
    if len(nodeInfo.Pods) == 0 || selector.Empty() {
        return 0
    }
    count := 0
    for _, p := range nodeInfo.Pods {
        // Ignore pods being deleted for spreading purposes
        // Similar to how it is done for SelectorSpreadPriority
        if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
            if selector.Matches(labels.Set(p.Pod.Labels)) {
                count++
            }
        }
    }
    return count
}

Score 方法最后返回匹配的 Pod 数量。

NormalizeScore

调度器 NormalizeScore 扩展点将调用 SelectorSpread 插件的同名方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L107-L173

// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/interface.go
type NodeScore struct {
    Name  string
    Score int64
}

// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go
func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    if skipSelectorSpread(pod) {
        return nil
    }

    countsByZone := make(map[string]int64, 10)
    maxCountByZone := int64(0)
    maxCountByNodeName := int64(0)

    for i := range scores {
        if scores[i].Score > maxCountByNodeName {
            maxCountByNodeName = scores[i].Score
        }
        nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
        if err != nil {
            return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
        }
        zoneID := utilnode.GetZoneKey(nodeInfo.Node())
        if zoneID == "" {
            continue
        }
        countsByZone[zoneID] += scores[i].Score
    }

    for zoneID := range countsByZone {
        if countsByZone[zoneID] > maxCountByZone {
            maxCountByZone = countsByZone[zoneID]
        }
    }

    haveZones := len(countsByZone) != 0

    maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
    maxCountByZoneFloat64 := float64(maxCountByZone)
    MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)

    for i := range scores {
        // initializing to the default/max node score of maxPriority
        fScore := MaxNodeScoreFloat64
        if maxCountByNodeName > 0 {
            fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)
        }
        // If there is zone information present, incorporate it
        if haveZones {
            nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
            if err != nil {
                return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
            }

            zoneID := utilnode.GetZoneKey(nodeInfo.Node())
            if zoneID != "" {
                zoneScore := MaxNodeScoreFloat64
                if maxCountByZone > 0 {
                    zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
                }
                fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
            }
        }
        scores[i].Score = int64(fScore)
    }
    return nil
}
  1. 先找出节点得分列表中的最高分
  2. 有些节点可能属于某个区域,即携带 topology.kubernetes.io/regiontopology.kubernetes.io/zone 标签,将该节点的得分存入 countsByZone map 中
  3. 找出 countsByZone 中的区域最高分
  4. 再次遍历节点得分列表,分成两种情况:
    • 无区域:按 该节点得分 = 100 * (最高分 - 当前节点得分) / 最高分 公式计算
    • 有区域:
      1. zoneScore = 100 * (区域最高分 - 当前节点区域得分) / 区域最高分
      2. 该节点得分 = fScore * 1/3 + zoneScore * 2/3(总得分由两部分组成,区域部分占 2/3 权重)

可见 NormalizeScore 扩展点才是真正地去给每个节点打分。

调度器

再回到调度框架 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L857-L933

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
    // Run Score method for each node in parallel.
    f.Parallelizer().Until(ctx, len(nodes), func(index int) {
        for _, pl := range f.scorePlugins {
            nodeName := nodes[index].Name
            s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
            if !status.IsSuccess() {
                err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
                errCh.SendErrorWithCancel(err, cancel)
                return
            }
            pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
                Name:  nodeName,
                Score: s,
            }
        }
    })

    // Run NormalizeScore method for each ScorePlugin in parallel.
    f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
        pl := f.scorePlugins[index]
        nodeScoreList := pluginToNodeScores[pl.Name()]
        if pl.ScoreExtensions() == nil {
            return
        }
        status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
        if !status.IsSuccess() {
            err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
            errCh.SendErrorWithCancel(err, cancel)
            return
        }
    })
    if err := errCh.ReceiveError(); err != nil {
        return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
    }
    // a lot of code here
}

调用所有打分插件实现的 Score 方法并行为每个节点(len(nodes))初打分。

然后并行对每个打分插件(len(f.scorePlugins))调用其 NormalizeScore 方法再次打分。

func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
    // a lot of code here
    // Apply score defaultWeights for each ScorePlugin in parallel.
    f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
        pl := f.scorePlugins[index]
        // Score plugins' weight has been checked when they are initialized.
        weight := f.scorePluginWeight[pl.Name()]
        nodeScoreList := pluginToNodeScores[pl.Name()]

        for i, nodeScore := range nodeScoreList {
            // return error if score plugin returns invalid score.
            if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
                err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
                errCh.SendErrorWithCancel(err, cancel)
                return
            }
            nodeScoreList[i].Score = nodeScore.Score * int64(weight)
        }
    })

    return pluginToNodeScores, nil
}

最后根据打分插件的权重,再并行计算一轮。最后返回节点得分表。

调度器拿到最终节点得分表后 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L90-L142

func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    // a lot of code here
    priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }

    host, err := g.selectHost(priorityList)
}

将会调用 selectHost 方法选出最优质的那个节点 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L144-L167

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
    if len(nodeScoreList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }
    maxScore := nodeScoreList[0].Score
    selected := nodeScoreList[0].Name
    cntOfMaxScore := 1
    for _, ns := range nodeScoreList[1:] {
        if ns.Score > maxScore {
            maxScore = ns.Score
            selected = ns.Name
            cntOfMaxScore = 1
        } else if ns.Score == maxScore {
            cntOfMaxScore++
            if rand.Intn(cntOfMaxScore) == 0 {
                // Replace the candidate with probability of 1/cntOfMaxScore
                selected = ns.Name
            }
        }
    }
    return selected, nil
}

也就是得分最高的那个节点。

接下来调度器将来到 ReservePermit 扩展点,为 Pod 预留资源以及在执行绑定操作前对当前 Pod 的调度结果进行最终决策(批准、拒绝或延迟调度);如果找不到合适的节点,调度器会尝试抢占(preemption),都将在下篇论述。