Kubernetes 调度器与绑定

Sep 10, 2022 11:30 · 2788 words · 6 minute read Kubernetes Golang

一个 Pod 调度过程可以分为两个周期:

  1. 调度周期(scheduling cycle)
    1. PreFilter
    2. Filter
    3. PostFilter
    4. PreScore
    5. Score
    6. PostScore
    7. Normalize Score
    8. Reserve
    9. Permit
  2. 绑定周期(binding cycle)
    1. WaitOnPermit
    2. PreBind
    3. Bind
    4. PostBind

确定了最适合当前 Pod 的节点后,调度来到绑定周期

绑定操作是异步的,同时调度器将开始调度下一个 Pod https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L550-L637

// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
    bindingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
}()

WaitOnPermit

WaitOnPermit 是一个比较特殊的扩展点,用于实现 Pod 延迟绑定。严格意义上它并不是一个扩展点,用户无法通过自定义插件来扩展该锚点(注意其他扩展点在 schedulerOne 中都是调用类似 fwk.RunXXXPlugins 的方法)。

https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L557-L584

waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
    var reason string
    if waitOnPermitStatus.IsUnschedulable() {
        metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        reason = v1.PodReasonUnschedulable
    } else {
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        reason = SchedulerError
    }
    // trigger un-reserve plugins to clean up state associated with the reserved Pod
    fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    } else {
        // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
        // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
        // TODO(#103853): de-duplicate the logic.
        // Avoid moving the assumed Pod itself as it's always Unschedulable.
        // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
        // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
        defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
            return assumedPod.UID != pod.UID
        })
    }
    sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
    return
}

在执行 fwk.WaitOnPermit 方法时该 goroutine 处于阻塞状态,期间一旦接收到 framework.Success,表示延时绑定中的 Pod 被批准绑定,绑定流程就会继续往下进行。

PreBind

绑定周期来到 PreBind 扩展点,调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L586-L602

// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L955-L982

func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    // a lot of code here
    for _, pl := range f.preBindPlugins {
        status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
        if !status.IsSuccess() {
            err := status.AsError()
            klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
            return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
        }
    }
    return nil
}

func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // a lot of code here
    status := pl.PreBind(ctx, state, pod, nodeName)
    f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
    return status
}

实现预绑定的插件叫 VolumeBinding,在 registry 初始化时注册 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/registry.go#L43-L80

func NewInTreeRegistry() runtime.Registry {
    // a lot of code here
    return runtime.Registry{
        // a lot of code here
        volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
    }
}

调度器调用 VolumeBinding 插件的 PreBind 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#L320-L347

type VolumeBinding struct {
    Binder    SchedulerVolumeBinder
    PVCLister corelisters.PersistentVolumeClaimLister
    scorer    volumeCapacityScorer
    fts       feature.Features
}

func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    s, err := getStateData(cs)
    if err != nil {
        return framework.AsStatus(err)
    }
    if s.allBound {
        // no need to bind volumes
        return nil
    }
    // we don't need to hold the lock as only one node will be pre-bound for the given pod
    podVolumes, ok := s.podVolumesByNode[nodeName]
    if !ok {
        return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
    }
    klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod))
    err = pl.Binder.BindPodVolumes(pod, podVolumes)
    if err != nil {
        klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
        return framework.AsStatus(err)
    }
    klog.V(5).InfoS("Success binding volumes for pod", "pod", klog.KObj(pod))
    return nil
}

顺着 pl.Binder.BindPodVolumes 调用找到它的实现所在 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/binder.go#L441-L470

func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
    // a lot of code here
    bindings := podVolumes.StaticBindings
    claimsToProvision := podVolumes.DynamicProvisions

    // Start API operations
    err = b.bindAPIUpdate(assumedPod, bindings, claimsToProvision)
    if err != nil {
        return err
    }

    err = wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
        b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
        return b, err
    })
    if err != nil {
        return fmt.Errorf("binding volumes: %w", err)
    }
    return nil
}

在先前 Reserve 扩展点调度器已经将当前 Pod 所需 PVC 和 PV 在缓存中绑定了,所谓绑定即将 PV 的 spec.claimRef 指向 PVC https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/controller/volume/persistentvolume/util/util.go#L131-L143

// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
    volume.Spec.ClaimRef.Name != claim.Name ||
    volume.Spec.ClaimRef.Namespace != claim.Namespace ||
    volume.Spec.ClaimRef.UID != claim.UID {

    claimRef, err := reference.GetReference(scheme.Scheme, claim)
    if err != nil {
        return nil, false, fmt.Errorf("unexpected error getting claim reference: %w", err)
    }
    volumeClone.Spec.ClaimRef = claimRef
    dirty = true
}

PreBind 扩展点将缓存中的 PV 更新至数据库,持久化 PVC 与 PV 的绑定关系。

如果出问题,和 Reserve 扩展点一样,走 fwk.RunReservePluginsUnreserve 方法调用 VolumeBinding 插件的 Unreserve 方法重置缓存。

Bind

Bind 扩展点同样也是调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L604-L636

err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)

https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L984-L1016

func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    // a lot of code here
    for _, bp := range f.bindPlugins {
        status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
        if status != nil && status.Code() == framework.Skip {
            continue
        }
        if !status.IsSuccess() {
            err := status.AsError()
            klog.ErrorS(err, "Failed running Bind plugin", "plugin", bp.Name(), "pod", klog.KObj(pod))
            return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", bp.Name(), err))
        }
        return status
    }
    return status
}

func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    // a lot of code here
    status := bp.Bind(ctx, state, pod, nodeName)
    f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
    return status
}

实现绑定的插件有且只有一个叫 DefaultBinder,实现了 Bind 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go#L50-L62

func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
    klog.V(3).InfoS("Attempting to bind pod to node", "pod", klog.KObj(p), "node", nodeName)
    binding := &v1.Binding{
        ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
        Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
    }
    err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
    if err != nil {
        return framework.AsStatus(err)
    }
    return nil
}

为当前 Pod 创建一个 Binding 对象表示 Pod 与节点之间的绑定关系。

同样地如果出问题,走 fwk.RunReservePluginsUnreserve 方法调用 VolumeBinding 插件的 Unreserve 方法重置缓存。

PostBind

绑定成功后,调度器将执行整个调度流程中最后一个扩展点 PostBind。调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#627-628

// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1018-L1037

// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
    //a lot of code here
    for _, pl := range f.postBindPlugins {
        f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
    }
}

func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
    // a lot of code here
    startTime := time.Now()
    pl.PostBind(ctx, state, pod, nodeName)
    f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
}

目前调度器中还没有内置插件实现 PostBind 扩展点,用户可以根据自己的需求自己来实现。

调度后

当 Pod 尘埃落定,节点上的 kubelet 能监听到这一事件,它会在节点上启动 Pod 中的容器——这也是一个“漫长”的过程,将另开新篇详细阐述。

Kubernetes 调度器系列