Kubernetes 调度器在绑定前
Jul 30, 2022 13:30 · 4001 words · 8 minute read
接上篇调度器下面来到 Reserve 和 Permit 扩展点。
Assume
调度器挑选出合适的节点后,在 Reserve 扩展点前会先执行 Assume 步骤 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L498-L509:
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
return
}
先 assumedPodInfo := podInfo.DeepCopy()
深度拷贝一份当前 Pod 信息,然后调用 assume
方法:
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "Scheduler cache AssumePod failed")
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}
调用 AssumePod
方法将 Pod 对象加到缓存中,这样是为了异步执行绑定 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/internal/cache/cache.go#L350-L369:
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods.Insert(key)
return nil
}
“假设” Pod 已经调度成功了,调度器走完流程就开始调度下一个 Pod,无需等待当前 Pod 与节点绑定。
Reserve
Reserve 扩展点为了预防当调度器等待绑定成功时的竞争条件而存在。
调度器来到 Reserve 扩展点,调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L512:
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// a lot of code here
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
return
}
// a lot of code here
}
Reserve 是预留资源 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1039-L1058:
func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.reservePlugins {
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
}
}
return nil
}
相反地 Unreserve 则是将预留的资源释放 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#1070-L1082:
// RunReservePluginsUnreserve runs the Unreserve method in the set of
// configured reserve plugins.
func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
// Execute the Unreserve operation of each reserve plugin in the
// *reverse* order in which the Reserve operation was executed.
for i := len(f.reservePlugins) - 1; i >= 0; i-- {
f.runReservePluginUnreserve(ctx, f.reservePlugins[i], state, pod, nodeName)
}
}
和其他插件一样,唯一实现了 Reserve 扩展点的 VolumeBinding 插件在 frameworkplugins.NewInTreeRegistry
函数实例化一个 runtime.Registry
对象时注册:
func NewInTreeRegistry() runtime.Registry {
// a lot of code here
return runtime.Registry{
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), // VolumeBinding plugin
}
}
我们接下来详细看 VolumeBinding 对 Reserve 扩展点的实现,也就是它的 Reserve
方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#L299-L318:
// Reserve reserves volumes of pod and saves binding status in cycle state.
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
state, err := getStateData(cs)
if err != nil {
return framework.AsStatus(err)
}
// we don't need to hold the lock as only one node will be reserved for the given pod
podVolumes, ok := state.podVolumesByNode[nodeName]
if ok {
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)
if err != nil {
return framework.AsStatus(err)
}
state.allBound = allBound
} else {
// may not exist if the pod does not reference any PVC
state.allBound = true
}
return nil
}
state.podVolumesByNode
是一个 Map,存储了在 Filter 阶段就解析出来的 Pod 存储卷信息。调用 volumeBinder
对象的 AssumePodVolumes
方法将该 Pod 所需要的 PVC 和对应的 PV 在缓存中绑定 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/binder.go#L368-L433:
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
// a lot of code here
// Assume PV
newBindings := []*BindingInfo{}
for _, binding := range podVolumes.StaticBindings {
newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim",
"pod", klog.KObj(assumedPod),
"PV", klog.KObj(binding.pv),
"PVC", klog.KObj(binding.pvc),
"newPV", klog.KObj(newPV),
"dirty", dirty,
)
if err != nil {
klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
b.revertAssumedPVs(newBindings)
return false, err
}
// TODO: can we assume everytime?
if dirty {
err = b.pvCache.Assume(newPV)
if err != nil {
b.revertAssumedPVs(newBindings)
return false, err
}
}
newBindings = append(newBindings, &BindingInfo{pv: newPV, pvc: binding.pvc})
}
// Assume PVCs
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
for _, claim := range podVolumes.DynamicProvisions {
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
b.revertAssumedPVs(newBindings)
b.revertAssumedPVCs(newProvisionedPVCs)
return
}
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
}
podVolumes.StaticBindings = newBindings
podVolumes.DynamicProvisions = newProvisionedPVCs
return
}
在 Kubernetes 中 PVC 和 PV 有静态和动态两种绑定方式。首先遍历静态绑定 podVolumes.StaticBindings
,然后遍历动态绑定 podVolumes.DynamicProvisions
。
如果出问题,调用 VolumeBinding 插件的 Unreserve 方法重置缓存 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#349-L363:
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
s, err := getStateData(cs)
if err != nil {
return
}
// we don't need to hold the lock as only one node may be unreserved
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return
}
pl.Binder.RevertAssumedPodVolumes(podVolumes)
return
}
Permit
最后调度器在绑定之前来到 Permit 扩展点 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L524:
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = SchedulerError
}
// One of the plugins returned status different than success or wait.
fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
return
}
}
运行所有注册过的 Permit 插件所实现的 Permit
方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1094-L1137:
func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
pluginsWaitTime := make(map[string]time.Duration)
statusCode := framework.Success
for _, pl := range f.permitPlugins {
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
}
if status.Code() == framework.Wait {
// Not allowed to be greater than maxTimeout.
if timeout > maxTimeout {
timeout = maxTimeout
}
pluginsWaitTime[pl.Name()] = timeout
statusCode = framework.Wait
} else {
err := status.AsError()
klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
}
}
}
if statusCode == framework.Wait {
waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Wait, msg)
}
return nil
}
实际上目前 Kubernetes 中并未实现 Permit 插件,默认返回 framework.Success
,我们只能结合代码和规范来看:
根据规范 Permit
方法有三种返回值:
-
批准(approve)
framework.Success
所有 Permit 插件都批准,Pod 就可以绑定了
-
拒绝(deny)
framework.Unschedulable
&framework.UnschedulableAndUnresolvable
任一 Permit 插件拒绝,Pod 会被放回调度队列,会触发
Unreserve
-
等待(wait)
framework.Wait
任一 Permit 插件返回“等待”,Pod 被放入一个内部的延时 Pod Map
f.waitingPods
中,直到被批准才会开始绑定周期 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1129-L1135:if statusCode == framework.Wait { waitingPod := newWaitingPod(pod, pluginsWaitTime) f.waitingPods.add(waitingPod) msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name) klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod)) return framework.NewStatus(framework.Wait, msg) }
如果等待超时,Pod 将被拒绝并放回调度队列,同样会触发
Unreserve
https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L557-L584:waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { var reason string if waitOnPermitStatus.IsUnschedulable() { metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) reason = SchedulerError } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, // as the assumed Pod had occupied a certain amount of resources in scheduler cache. // TODO(#103853): de-duplicate the logic. // Avoid moving the assumed Pod itself as it's always Unschedulable. // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { return assumedPod.UID != pod.UID }) } sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) return }
延时绑定
Kubernetes 中 Pod 延时绑定利用 WaitOnPermit
方法实现 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/runtime/framework.go#L1149-L1173:
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod))
startTime := time.Now()
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
s.SetFailedPlugin(s.FailedPlugin())
return s
}
err := s.AsError()
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
}
return nil
}
先判断当前 Pod 是否在延时 Pod Map waitingPods
,接着 s := <-waitingPod.s
阻塞等待其状态。
接着来看延时 Pod waitingPod
及其构造函数 newWaitingPod
:
type waitingPod struct {
pod *v1.Pod
pendingPlugins map[string]*time.Timer
s chan *framework.Status
mu sync.RWMutex
}
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
// Allow() and Reject() calls are non-blocking. This property is guaranteed
// by using non-blocking send to this channel. This channel has a buffer of size 1
// to ensure that non-blocking send will not be ignored - possible situation when
// receiving from this channel happens after non-blocking send.
s: make(chan *framework.Status, 1),
}
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
wp.mu.Lock()
defer wp.mu.Unlock()
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(plugin, msg)
})
}
return wp
}
延时 Pod waitingPods
内部有个“待办”插件表 pendingPlugins
,所谓的插件实际上是一个个定时器。它实现了允许和拒绝两种操作:
-
func (w *waitingPod) Allow(pluginName string) { w.mu.Lock() defer w.mu.Unlock() if timer, exist := w.pendingPlugins[pluginName]; exist { timer.Stop() delete(w.pendingPlugins, pluginName) } // Only signal success status after all plugins have allowed if len(w.pendingPlugins) != 0 { return } // The select clause works as a non-blocking send. // If there is no receiver, it's a no-op (default case). select { case w.s <- framework.NewStatus(framework.Success, ""): default: } }
处于延时绑定中的 Pod 一旦被允许,就发送
framework.Success
状态,Pod 被批准绑定。 -
func (w *waitingPod) Reject(pluginName, msg string) { w.mu.RLock() defer w.mu.RUnlock() for _, timer := range w.pendingPlugins { timer.Stop() } // The select clause works as a non-blocking send. // If there is no receiver, it's a no-op (default case). select { case w.s <- framework.NewStatus(framework.Unschedulable, msg).WithFailedPlugin(pluginName): default: } }
如果等待超时,调用
Reject
方法发送framework.Unschedulable
,Pod 被拒绝绑定,将放回调度队列中。