Kubernetes 调度器在绑定前

Jul 30, 2022 13:30 · 4001 words · 8 minute read Kubernetes Golang

上篇调度器下面来到 ReservePermit 扩展点。

Assume

调度器挑选出合适的节点后,在 Reserve 扩展点前会先执行 Assume 步骤 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L498-L509

// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // This is most probably result of a BUG in retrying logic.
    // We report an error here so that pod scheduling can be retried.
    // This relies on the fact that Error will check if the pod has been bound
    // to a node and if so will not add it back to the unscheduled pods queue
    // (otherwise this would cause an infinite loop).
    sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
    return
}

assumedPodInfo := podInfo.DeepCopy() 深度拷贝一份当前 Pod 信息,然后调用 assume 方法:

// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.
    assumed.Spec.NodeName = host

    if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
        klog.ErrorS(err, "Scheduler cache AssumePod failed")
        return err
    }
    // if "assumed" is a nominated pod, we should remove it from internal cache
    if sched.SchedulingQueue != nil {
        sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    return nil
}

调用 AssumePod 方法将 Pod 对象加到缓存中,这样是为了异步执行绑定 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/internal/cache/cache.go#L350-L369

func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()
    if _, ok := cache.podStates[key]; ok {
        return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
    }

    cache.addPod(pod)
    ps := &podState{
        pod: pod,
    }
    cache.podStates[key] = ps
    cache.assumedPods.Insert(key)
    return nil
}

“假设” Pod 已经调度成功了,调度器走完流程就开始调度下一个 Pod,无需等待当前 Pod 与节点绑定。

Reserve

Reserve 扩展点为了预防当调度器等待绑定成功时的竞争条件而存在

调度器来到 Reserve 扩展点,调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L512

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // a lot of code here
    // Run the Reserve method of reserve plugins.
    if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        // trigger un-reserve to clean up state associated with the reserved Pod
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
        return
    }
    // a lot of code here
}

Reserve 是预留资源 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1039-L1058

func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    startTime := time.Now()
    defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    for _, pl := range f.reservePlugins {
        status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
        if !status.IsSuccess() {
            err := status.AsError()
            klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
            return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
        }
    }
    return nil
}

相反地 Unreserve 则是将预留的资源释放 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#1070-L1082

// RunReservePluginsUnreserve runs the Unreserve method in the set of
// configured reserve plugins.
func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
    startTime := time.Now()
    defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    // Execute the Unreserve operation of each reserve plugin in the
    // *reverse* order in which the Reserve operation was executed.
    for i := len(f.reservePlugins) - 1; i >= 0; i-- {
        f.runReservePluginUnreserve(ctx, f.reservePlugins[i], state, pod, nodeName)
    }
}

和其他插件一样,唯一实现了 Reserve 扩展点的 VolumeBinding 插件在 frameworkplugins.NewInTreeRegistry 函数实例化一个 runtime.Registry 对象时注册:

func NewInTreeRegistry() runtime.Registry {
    // a lot of code here
    return runtime.Registry{
        volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New), // VolumeBinding plugin
    }
}

我们接下来详细看 VolumeBinding 对 Reserve 扩展点的实现,也就是它的 Reserve 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#L299-L318

// Reserve reserves volumes of pod and saves binding status in cycle state.
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    state, err := getStateData(cs)
    if err != nil {
        return framework.AsStatus(err)
    }
    // we don't need to hold the lock as only one node will be reserved for the given pod
    podVolumes, ok := state.podVolumesByNode[nodeName]
    if ok {
        allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)
        if err != nil {
            return framework.AsStatus(err)
        }
        state.allBound = allBound
    } else {
        // may not exist if the pod does not reference any PVC
        state.allBound = true
    }
    return nil
}

state.podVolumesByNode 是一个 Map,存储了在 Filter 阶段就解析出来的 Pod 存储卷信息。调用 volumeBinder 对象的 AssumePodVolumes 方法将该 Pod 所需要的 PVC 和对应的 PV 在缓存中绑定 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/binder.go#L368-L433

func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
    // a lot of code here

    // Assume PV
    newBindings := []*BindingInfo{}
    for _, binding := range podVolumes.StaticBindings {
        newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
        klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim",
            "pod", klog.KObj(assumedPod),
            "PV", klog.KObj(binding.pv),
            "PVC", klog.KObj(binding.pvc),
            "newPV", klog.KObj(newPV),
            "dirty", dirty,
        )
        if err != nil {
            klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
            b.revertAssumedPVs(newBindings)
            return false, err
        }
        // TODO: can we assume everytime?
        if dirty {
            err = b.pvCache.Assume(newPV)
            if err != nil {
                b.revertAssumedPVs(newBindings)
                return false, err
            }
        }
        newBindings = append(newBindings, &BindingInfo{pv: newPV, pvc: binding.pvc})
    }

    // Assume PVCs
    newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
    for _, claim := range podVolumes.DynamicProvisions {
        // The claims from method args can be pointing to watcher cache. We must not
        // modify these, therefore create a copy.
        claimClone := claim.DeepCopy()
        metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnSelectedNode, nodeName)
        err = b.pvcCache.Assume(claimClone)
        if err != nil {
            b.revertAssumedPVs(newBindings)
            b.revertAssumedPVCs(newProvisionedPVCs)
            return
        }

        newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
    }

    podVolumes.StaticBindings = newBindings
    podVolumes.DynamicProvisions = newProvisionedPVCs
    return
}

在 Kubernetes 中 PVC 和 PV 有静态和动态两种绑定方式。首先遍历静态绑定 podVolumes.StaticBindings,然后遍历动态绑定 podVolumes.DynamicProvisions

如果出问题,调用 VolumeBinding 插件的 Unreserve 方法重置缓存 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#349-L363

func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
    s, err := getStateData(cs)
    if err != nil {
        return
    }
    // we don't need to hold the lock as only one node may be unreserved
    podVolumes, ok := s.podVolumesByNode[nodeName]
    if !ok {
        return
    }
    pl.Binder.RevertAssumedPodVolumes(podVolumes)
    return
}

Permit

最后调度器在绑定之前来到 Permit 扩展点 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L524

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // One of the plugins returned status different than success or wait.
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
        return
    }
}

运行所有注册过的 Permit 插件所实现的 Permit 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1094-L1137

func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    startTime := time.Now()
    defer func() {
        metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    }()
    pluginsWaitTime := make(map[string]time.Duration)
    statusCode := framework.Success
    for _, pl := range f.permitPlugins {
        status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
        if !status.IsSuccess() {
            if status.IsUnschedulable() {
                klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
                status.SetFailedPlugin(pl.Name())
                return status
            }
            if status.Code() == framework.Wait {
                // Not allowed to be greater than maxTimeout.
                if timeout > maxTimeout {
                    timeout = maxTimeout
                }
                pluginsWaitTime[pl.Name()] = timeout
                statusCode = framework.Wait
            } else {
                err := status.AsError()
                klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
                return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
            }
        }
    }
    if statusCode == framework.Wait {
        waitingPod := newWaitingPod(pod, pluginsWaitTime)
        f.waitingPods.add(waitingPod)
        msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
        klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
        return framework.NewStatus(framework.Wait, msg)
    }
    return nil
}

实际上目前 Kubernetes 中并未实现 Permit 插件,默认返回 framework.Success,我们只能结合代码和规范来看:

根据规范 Permit 方法有三种返回值:

  1. 批准(approve)framework.Success

    所有 Permit 插件都批准,Pod 就可以绑定了

  2. 拒绝(deny)framework.Unschedulable & framework.UnschedulableAndUnresolvable

    任一 Permit 插件拒绝,Pod 会被放回调度队列,会触发 Unreserve

  3. 等待(wait)framework.Wait

    任一 Permit 插件返回“等待”,Pod 被放入一个内部的延时 Pod Map f.waitingPods 中,直到被批准才会开始绑定周期 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1129-L1135

    if statusCode == framework.Wait {
        waitingPod := newWaitingPod(pod, pluginsWaitTime)
        f.waitingPods.add(waitingPod)
        msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
        klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
        return framework.NewStatus(framework.Wait, msg)
    }
    

    如果等待超时,Pod 将被拒绝并放回调度队列,同样会触发 Unreserve https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L557-L584

    waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
    if !waitOnPermitStatus.IsSuccess() {
        var reason string
        if waitOnPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // trigger un-reserve plugins to clean up state associated with the reserved Pod
        fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
        } else {
            // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
            // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
            // TODO(#103853): de-duplicate the logic.
            // Avoid moving the assumed Pod itself as it's always Unschedulable.
            // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
            // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
            defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
                return assumedPod.UID != pod.UID
            })
        }
        sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
        return
    }
    

延时绑定

Kubernetes 中 Pod 延时绑定利用 WaitOnPermit 方法实现 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1149-L1173

// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
    waitingPod := f.waitingPods.get(pod.UID)
    if waitingPod == nil {
        return nil
    }
    defer f.waitingPods.remove(pod.UID)
    klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod))

    startTime := time.Now()
    s := <-waitingPod.s
    metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))

    if !s.IsSuccess() {
        if s.IsUnschedulable() {
            klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
            s.SetFailedPlugin(s.FailedPlugin())
            return s
        }
        err := s.AsError()
        klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
        return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
    }
    return nil
}

先判断当前 Pod 是否在延时 Pod Map waitingPods,接着 s := <-waitingPod.s 阻塞等待其状态。

接着来看延时 Pod waitingPod 及其构造函数 newWaitingPod

type waitingPod struct {
    pod            *v1.Pod
    pendingPlugins map[string]*time.Timer
    s              chan *framework.Status
    mu             sync.RWMutex
}

func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
    wp := &waitingPod{
        pod: pod,
        // Allow() and Reject() calls are non-blocking. This property is guaranteed
        // by using non-blocking send to this channel. This channel has a buffer of size 1
        // to ensure that non-blocking send will not be ignored - possible situation when
        // receiving from this channel happens after non-blocking send.
        s: make(chan *framework.Status, 1),
    }

    wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
    // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
    // lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
    wp.mu.Lock()
    defer wp.mu.Unlock()
    for k, v := range pluginsMaxWaitTime {
        plugin, waitTime := k, v
        wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
            msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
                waitTime, plugin)
            wp.Reject(plugin, msg)
        })
    }

    return wp
}

延时 Pod waitingPods 内部有个“待办”插件表 pendingPlugins,所谓的插件实际上是一个个定时器。它实现了允许拒绝两种操作:

  • Allow

    func (w *waitingPod) Allow(pluginName string) {
        w.mu.Lock()
        defer w.mu.Unlock()
        if timer, exist := w.pendingPlugins[pluginName]; exist {
            timer.Stop()
            delete(w.pendingPlugins, pluginName)
        }
    
        // Only signal success status after all plugins have allowed
        if len(w.pendingPlugins) != 0 {
            return
        }
    
        // The select clause works as a non-blocking send.
        // If there is no receiver, it's a no-op (default case).
        select {
        case w.s <- framework.NewStatus(framework.Success, ""):
        default:
        }
    }
    

    处于延时绑定中的 Pod 一旦被允许,就发送 framework.Success 状态,Pod 被批准绑定。

  • Reject

    func (w *waitingPod) Reject(pluginName, msg string) {
        w.mu.RLock()
        defer w.mu.RUnlock()
        for _, timer := range w.pendingPlugins {
            timer.Stop()
        }
    
        // The select clause works as a non-blocking send.
        // If there is no receiver, it's a no-op (default case).
        select {
        case w.s <- framework.NewStatus(framework.Unschedulable, msg).WithFailedPlugin(pluginName):
        default:
        }
    }
    

    如果等待超时,调用 Reject 方法发送 framework.Unschedulable,Pod 被拒绝绑定,将放回调度队列中。