Kubernetes 调度器与绑定
Sep 10, 2022 11:30 · 2788 words · 6 minute read
一个 Pod 调度过程可以分为两个周期:
- 调度周期(scheduling cycle)
- PreFilter
- Filter
- PostFilter
- PreScore
- Score
- PostScore
- Normalize Score
- Reserve
- Permit
- 绑定周期(binding cycle)
- WaitOnPermit
- PreBind
- Bind
- PostBind
确定了最适合当前 Pod 的节点后,调度来到绑定周期。
绑定操作是异步的,同时调度器将开始调度下一个 Pod https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L550-L637:
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
}()
WaitOnPermit
WaitOnPermit 是一个比较特殊的扩展点,用于实现 Pod 延迟绑定。严格意义上它并不是一个扩展点,用户无法通过自定义插件来扩展该锚点(注意其他扩展点在 schedulerOne
中都是调用类似 fwk.RunXXXPlugins
的方法)。
https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L557-L584:
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
reason = SchedulerError
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
// TODO(#103853): de-duplicate the logic.
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
}
sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
return
}
在执行 fwk.WaitOnPermit
方法时该 goroutine 处于阻塞状态,期间一旦接收到 framework.Success
,表示延时绑定中的 Pod 被批准绑定,绑定流程就会继续往下进行。
PreBind
绑定周期来到 PreBind 扩展点,调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L586-L602:
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
// a lot of code here
for _, pl := range f.preBindPlugins {
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
}
}
return nil
}
func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// a lot of code here
status := pl.PreBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
实现预绑定的插件叫 VolumeBinding,在 registry 初始化时注册 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/registry.go#L43-L80:
func NewInTreeRegistry() runtime.Registry {
// a lot of code here
return runtime.Registry{
// a lot of code here
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
}
}
调度器调用 VolumeBinding 插件的 PreBind 方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go#L320-L347:
type VolumeBinding struct {
Binder SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
scorer volumeCapacityScorer
fts feature.Features
}
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
s, err := getStateData(cs)
if err != nil {
return framework.AsStatus(err)
}
if s.allBound {
// no need to bind volumes
return nil
}
// we don't need to hold the lock as only one node will be pre-bound for the given pod
podVolumes, ok := s.podVolumesByNode[nodeName]
if !ok {
return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
}
klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod))
err = pl.Binder.BindPodVolumes(pod, podVolumes)
if err != nil {
klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
return framework.AsStatus(err)
}
klog.V(5).InfoS("Success binding volumes for pod", "pod", klog.KObj(pod))
return nil
}
顺着 pl.Binder.BindPodVolumes
调用找到它的实现所在 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/volumebinding/binder.go#L441-L470:
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
// a lot of code here
bindings := podVolumes.StaticBindings
claimsToProvision := podVolumes.DynamicProvisions
// Start API operations
err = b.bindAPIUpdate(assumedPod, bindings, claimsToProvision)
if err != nil {
return err
}
err = wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
return b, err
})
if err != nil {
return fmt.Errorf("binding volumes: %w", err)
}
return nil
}
在先前 Reserve 扩展点调度器已经将当前 Pod 所需 PVC 和 PV 在缓存中绑定了,所谓绑定即将 PV 的 spec.claimRef
指向 PVC https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/controller/volume/persistentvolume/util/util.go#L131-L143:
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("unexpected error getting claim reference: %w", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
在 PreBind 扩展点将缓存中的 PV 更新至数据库,持久化 PVC 与 PV 的绑定关系。
如果出问题,和 Reserve 扩展点一样,走 fwk.RunReservePluginsUnreserve
方法调用 VolumeBinding 插件的 Unreserve
方法重置缓存。
Bind
Bind 扩展点同样也是调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#L604-L636:
err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
// a lot of code here
for _, bp := range f.bindPlugins {
status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
if status != nil && status.Code() == framework.Skip {
continue
}
if !status.IsSuccess() {
err := status.AsError()
klog.ErrorS(err, "Failed running Bind plugin", "plugin", bp.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", bp.Name(), err))
}
return status
}
return status
}
func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
// a lot of code here
status := bp.Bind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
实现绑定的插件有且只有一个叫 DefaultBinder,实现了 Bind
方法 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go#L50-L62:
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
klog.V(3).InfoS("Attempting to bind pod to node", "pod", klog.KObj(p), "node", nodeName)
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
Target: v1.ObjectReference{Kind: "Node", Name: nodeName},
}
err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
if err != nil {
return framework.AsStatus(err)
}
return nil
}
为当前 Pod 创建一个 Binding 对象表示 Pod 与节点之间的绑定关系。
同样地如果出问题,走 fwk.RunReservePluginsUnreserve
方法调用 VolumeBinding 插件的 Unreserve
方法重置缓存。
PostBind
绑定成功后,调度器将执行整个调度流程中最后一个扩展点 PostBind。调用注册过的插件 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/scheduler.go#627-628:
// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
//a lot of code here
for _, pl := range f.postBindPlugins {
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
}
}
func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
// a lot of code here
startTime := time.Now()
pl.PostBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
}
目前调度器中还没有内置插件实现 PostBind 扩展点,用户可以根据自己的需求自己来实现。
调度后
当 Pod 尘埃落定,节点上的 kubelet 能监听到这一事件,它会在节点上启动 Pod 中的容器——这也是一个“漫长”的过程,将另开新篇详细阐述。