Kubernetes 调度器与节点打分
Jun 28, 2022 22:30 · 3114 words · 7 minute read
Filter 扩展点过滤节点后如果只剩一个,也谈不上优选了,直接使用它 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L120-L127:
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// a lot of code here
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
// a lot of code here
}
如果节点数量大于 1,那就得优选 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L129-L132
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// a lot of code here
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
}
和 Filter 一样,Score 扩展点也是调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L393-L495:
func prioritizeNodes(
ctx context.Context,
extenders []framework.Extender,
fwk framework.Framework,
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(extenders) == 0 && !fwk.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodeScore{
Name: nodes[i].Name,
Score: 1,
})
}
return result, nil
}
// Run PreScore plugins.
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
if !preScoreStatus.IsSuccess() {
return nil, preScoreStatus.AsError()
}
// Run the Score plugins.
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return nil, scoreStatus.AsError()
}
// a lot of code here
}
实现打分方法的插件叫 SelectorSpread,同 Filter 插件 Fit 一起在 registry 初始化时注册 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/registry.go#L43-L80:
func NewInTreeRegistry() runtime.Registry {
// a lot of code here
return runtime.Registry{
selectorspread.Name: selectorspread.New,
// a lot of code here
noderesources.FitName: runtime.FactoryAdapter(fts, noderesources.NewFit),
// a lot of code here
}
}
SelectorSpread 插件实现了三个扩展点:
- PreScore
- Score
- NormalizeScore
PreScore
当调度器来到 PreScore 扩展点将调用 SelectorSpread 插件的 PreScore 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L180-L197:
type SelectorSpread struct {
sharedLister framework.SharedLister
services corelisters.ServiceLister
replicationControllers corelisters.ReplicationControllerLister
replicaSets appslisters.ReplicaSetLister
statefulSets appslisters.StatefulSetLister
}
func (pl *SelectorSpread) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
if skipSelectorSpread(pod) {
return nil
}
selector := helper.DefaultSelector(
pod,
pl.services,
pl.replicationControllers,
pl.replicaSets,
pl.statefulSets,
)
state := &preScoreState{
selector: selector,
}
cycleState.Write(preScoreStateKey, state)
return nil
}
通过 helper.DefaultSelector
函数实例化了一个选择器(selector):
func DefaultSelector(
pod *v1.Pod,
sl corelisters.ServiceLister,
cl corelisters.ReplicationControllerLister,
rsl appslisters.ReplicaSetLister,
ssl appslisters.StatefulSetLister,
) labels.Selector {
labelSet := make(labels.Set)
// Since services, RCs, RSs and SSs match the pod, they won't have conflicting
// labels. Merging is safe.
if services, err := GetPodServices(sl, pod); err == nil {
for _, service := range services {
labelSet = labels.Merge(labelSet, service.Spec.Selector)
}
}
selector := labelSet.AsSelector()
owner := metav1.GetControllerOfNoCopy(pod)
if owner == nil {
return selector
}
gv, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
return selector
}
gvk := gv.WithKind(owner.Kind)
switch gvk {
case rcKind:
if rc, err := cl.ReplicationControllers(pod.Namespace).Get(owner.Name); err == nil {
labelSet = labels.Merge(labelSet, rc.Spec.Selector)
selector = labelSet.AsSelector()
}
case rsKind:
if rs, err := rsl.ReplicaSets(pod.Namespace).Get(owner.Name); err == nil {
if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
case ssKind:
if ss, err := ssl.StatefulSets(pod.Namespace).Get(owner.Name); err == nil {
if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
default:
// Not owned by a supported controller.
}
return selector
}
- 寻找同命名空间下所有与当前 Pod 标签匹配的 Service
- 归并这些 Service 的选择器(Selector)
- 找到当前 Pod 的所有者(Owner),ReplicationController、ReplicaSet、StatefulSets 中的一种
- 再次归并选择器
将选择器保存为 PreScore 的状态,写入调度器缓存中 cycleState.Write(preScoreStateKey, state)
。
Score
下面来看 Score 扩展点,调度器在此调用 SelectorSpread 插件的 Score
方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L80-L105:
func (pl *SelectorSpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// a lot of code here
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
}
count := countMatchingPods(pod.Namespace, s.selector, nodeInfo)
return int64(count), nil
}
如果未设置拓扑分布约束,则执行 countMatchingPods
函数统计名为 nodeName
节点上匹配标签选择器的 Pod 数量 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L218-L234:
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *framework.NodeInfo) int {
if len(nodeInfo.Pods) == 0 || selector.Empty() {
return 0
}
count := 0
for _, p := range nodeInfo.Pods {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if namespace == p.Pod.Namespace && p.Pod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(p.Pod.Labels)) {
count++
}
}
}
return count
}
Score
方法最后返回匹配的 Pod 数量。
NormalizeScore
调度器 NormalizeScore 扩展点将调用 SelectorSpread 插件的同名方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go#L107-L173:
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/interface.go
type NodeScore struct {
Name string
Score int64
}
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/selectorspread/selector_spread.go
func (pl *SelectorSpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
if skipSelectorSpread(pod) {
return nil
}
countsByZone := make(map[string]int64, 10)
maxCountByZone := int64(0)
maxCountByNodeName := int64(0)
for i := range scores {
if scores[i].Score > maxCountByNodeName {
maxCountByNodeName = scores[i].Score
}
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID == "" {
continue
}
countsByZone[zoneID] += scores[i].Score
}
for zoneID := range countsByZone {
if countsByZone[zoneID] > maxCountByZone {
maxCountByZone = countsByZone[zoneID]
}
}
haveZones := len(countsByZone) != 0
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxNodeScoreFloat64 := float64(framework.MaxNodeScore)
for i := range scores {
// initializing to the default/max node score of maxPriority
fScore := MaxNodeScoreFloat64
if maxCountByNodeName > 0 {
fScore = MaxNodeScoreFloat64 * (float64(maxCountByNodeName-scores[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", scores[i].Name, err))
}
zoneID := utilnode.GetZoneKey(nodeInfo.Node())
if zoneID != "" {
zoneScore := MaxNodeScoreFloat64
if maxCountByZone > 0 {
zoneScore = MaxNodeScoreFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
scores[i].Score = int64(fScore)
}
return nil
}
- 先找出节点得分列表中的最高分
- 有些节点可能属于某个区域,即携带
topology.kubernetes.io/region
或topology.kubernetes.io/zone
标签,将该节点的得分存入countsByZone
map 中 - 找出
countsByZone
中的区域最高分 - 再次遍历节点得分列表,分成两种情况:
可见 NormalizeScore 扩展点才是真正地去给每个节点打分。
调度器
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
// Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
Name: nodeName,
Score: s,
}
}
})
// Run NormalizeScore method for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
if pl.ScoreExtensions() == nil {
return
}
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
errCh.SendErrorWithCancel(err, cancel)
return
}
})
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}
// a lot of code here
}
调用所有打分插件实现的 Score
方法并行为每个节点(len(nodes)
)初打分。
然后并行对每个打分插件(len(f.scorePlugins)
)调用其 NormalizeScore
方法再次打分。
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
// a lot of code here
// Apply score defaultWeights for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList[i].Score = nodeScore.Score * int64(weight)
}
})
return pluginToNodeScores, nil
}
最后根据打分插件的权重,再并行计算一轮。最后返回节点得分表。
调度器拿到最终节点得分表后 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L90-L142:
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// a lot of code here
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
host, err := g.selectHost(priorityList)
}
将会调用 selectHost
方法选出最优质的那个节点 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/generic_scheduler.go#L144-L167:
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
if len(nodeScoreList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
maxScore := nodeScoreList[0].Score
selected := nodeScoreList[0].Name
cntOfMaxScore := 1
for _, ns := range nodeScoreList[1:] {
if ns.Score > maxScore {
maxScore = ns.Score
selected = ns.Name
cntOfMaxScore = 1
} else if ns.Score == maxScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
selected = ns.Name
}
}
}
return selected, nil
}
也就是得分最高的那个节点。
接下来调度器将来到 Reserve 和 Permit 扩展点,为 Pod 预留资源以及在执行绑定操作前对当前 Pod 的调度结果进行最终决策(批准、拒绝或延迟调度);如果找不到合适的节点,调度器会尝试抢占(preemption),都将在下篇论述。