Kubernetes 动态 PV 实现原理

Feb 20, 2023 22:00 · 3614 words · 8 minute read Kubernetes Container Golang

我们日常使用 Kubernetes 时,已经习惯于在创建 PersistentVolumeClaim(PVC)时带上 storageClassName自动创建出 PersistentVolume(PV):

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: myclaim
spec:
  accessModes:
    - ReadWriteOnce
  volumeMode: Filesystem
  resources:
    requests:
      storage: 8Gi
  storageClassName: slow

然后 PVC 会自动 与 PV 绑定,Pod 就可以使用它来持久化地存储数据了。这种持久化存储的使用方式就是动态地创建 PV,因为其方便快捷的特点被广泛应用。其实还有一种静态创建 PV 的方式,需要集群管理员手动去创建 PV,但因为绝大多数用户(开发者)并不一定了解 PV 背后的存储服务,所以有一定的使用门槛,逐渐被抛弃。

我们看一下这两种 PV 创建方式的官方解释

  • Static(静态)

    A cluster administrator creates a number of PVs. They carry the details of the real storage, which is available for use by cluster users. They exist in the Kubernetes API and are available for consumption.

  • Dynamic(动态)

    When none of the static PVs the administrator created match a user’s PersistentVolumeClaim, the cluster may try to dynamically provision a volume specially for the PVC. This provisioning is based on StorageClasses: the PVC must request a storage class and the administrator must have created and configured that class for dynamic provisioning to occur.

本文只带大家深入挖掘 Kubernetes 是如何“动态”地去创建 PV 的。

StorageClass

动态创建 PV 基于 StorageClass,在 Kubernetes 中部署完存储系统的 CSI Volume Driver 后需要为其创建一个 StorageClass,例如:

  • CephRBD

    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: rook-ceph-block-external
    parameters:
      # ...
    provisioner: rook-ceph.rbd.csi.ceph.com
    reclaimPolicy: Delete
    volumeBindingMode: Immediate
    
  • CephFS

    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: rook-cephfs-external
    parameters:
      # ...
    provisioner: rook-ceph.cephfs.csi.ceph.com
    reclaimPolicy: Delete
    volumeBindingMode: Immediate
    

其中 reclaimPolicyvolumeBindingMode 不再赘述。provisioner 字段的值指向了 CSIDriver 对象的名称,在部署存储系统的 CSI Volume Driver 时,会创建一个 CSIDriver 对象将存储插件注册至 Kubernetes 中:

$ kubectl get CSIDriver
NAME                            ATTACHREQUIRED   PODINFOONMOUNT   STORAGECAPACITY   TOKENREQUESTS   REQUIRESREPUBLISH   MODES        AGE
rook-ceph.cephfs.csi.ceph.com   true             false            false             <unset>         false               Persistent   77d
rook-ceph.rbd.csi.ceph.com      true             false            false             <unset>         false               Persistent   77d

如此我们想要使用哪种存储系统,只要将 PVC 对象 storageClassName 字段的值设置为 CSI Volume Driver 绑定的 StorageClass 名称就行了,非常明确。

Provisioning

在 Kubernetes 中 Provisioning 是指根据 PVC 指示的规格在存储系统中创建相应的存储卷(可能是磁盘,也可能是文件路径等等)。

在部署 CSI Volume Driver 时,会同时启动两种组件:

  • controller 插件,以 Deploynent/StatefulSet 部署
  • node 插件,以 DaemonSet 部署

不熟悉 CSI Volume Driver 的同学请先阅读 Kubernetes 与 CSI

Provisioning 由 controller 插件负责,它所在的 Pod 中有一个 sidecar 容器叫做 csi-provisioner。这是一个 Kubernetes 官方提供的 external provisioner,发送 CreateVolume CSI 请求至 controller 插件,在存储系统中创建一个新的存储卷。

csi-provisioner 组件的代码仓库托管于 [https://github.com/kubernetes-csi/external-provisioner] 项目:

// https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/cmd/csi-provisioner/csi-provisioner.go
func main() {
    // a lot of code here
    provisionController = controller.NewProvisionController(
        clientset,
        provisionerName,
        csiProvisioner,
        provisionerOptions...,
    )

    csiClaimController := ctrl.NewCloningProtectionController(
        clientset,
        claimLister,
        claimInformer,
        claimQueue,
        controllerCapabilities,
    )
    // a lot of code here

    run := func(ctx context.Context) {
        // a lot of code here
        provisionController.Run(ctx)
    }
    // a lot of code here
}

所以 csi-provisioner 本质上是一个控制器集合,由 provisionController 控制器对象中的 Informer 监听 Kubernetes 集群中的 PVC 对象事件并调谐 https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller/controller.go#L678-L692

// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
    client kubernetes.Interface,
    provisionerName string,
    provisioner Provisioner,
    options ...func(*ProvisionController) error,
) *ProvisionController {
    // a lot of code here
    claimHandler := cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },
        UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
        DeleteFunc: func(obj interface{}) {
            // NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
            // or it's not in claimsInProgress and then we don't care
        },
    }

    if controller.claimInformer != nil {
        controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
    } else {
        controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
        controller.claimInformer.AddEventHandler(claimHandler)
    }
    // a lot of code here
}

当我们创建 PVC,ProvisionController 控制器监听到 PVC 的创建事件并调谐该 PVC https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller/controller.go#L1048-L1093

// syncClaim checks if the claim should have a volume provisioned for it and
// provisions one if so. Returns an error if the claim is to be requeued.
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {
    claim, ok := obj.(*v1.PersistentVolumeClaim)
    if !ok {
        return fmt.Errorf("expected claim but got %+v", obj)
    }
    should, err := ctrl.shouldProvision(ctx, claim)
    if err != nil {
        ctrl.updateProvisionStats(claim, err, time.Time{})
        return err
    } else if should {
        startTime := time.Now()

        status, err := ctrl.provisionClaimOperation(ctx, claim) // here
        // a lot of code here
    }
    return nil
}

继续看 ProvisionController 控制器的 provisionClaimOperation 方法 https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller/controller.go#L1329-L1464

// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns nil error only when the volume was provisioned (in which case it also returns ProvisioningFinished),
// a normal error when the volume was not provisioned and provisioning should be retried (requeue the claim),
// or the special errStopProvision when provisioning was impossible and no further attempts to provision should be tried.
func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
    // Most code here is identical to that found in controller.go of kube's PV controller...
    claimClass := util.GetPersistentVolumeClaimClass(claim)

    //  A previous doProvisionClaim may just have finished while we were waiting for
    //  the locks. Check that PV (with deterministic name) hasn't been provisioned
    //  yet.
    pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
    _, exists, err := ctrl.volumes.GetByKey(pvName)
    if err == nil && exists {
        // Volume has been already provisioned, nothing to do.
        klog.Info(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
        return ProvisioningFinished, errStopProvision
    }

    // a lot of code here

    // For any issues getting fields from StorageClass (including reclaimPolicy & mountOptions),
    // retry the claim because the storageClass can be fixed/(re)created independently of the claim
    class, err := ctrl.getStorageClass(claimClass)
    if err != nil {
        klog.Error(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
        return ProvisioningFinished, err
    }

    // a lot of code here
    options := ProvisionOptions{
        StorageClass: class,
        PVName:       pvName,
        PVC:          claim,
        SelectedNode: selectedNode,
    }

    ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))

    volume, result, err := ctrl.provisioner.Provision(ctx, options)

    // a lot of code here
}
  1. pvc- + PVC uid 拼接出 PV 的名称
  2. 检查 PV 是否已存在,确保幂等
  3. 通过 PVC 的 storageClassName 字段拿到使用的 StorageClass 对象

我们接下来看 Provision 方法的实现 https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/pkg/controller/controller.go#L701-L876

func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
    // a lot of code here
    result, state, err := p.prepareProvision(ctx, claim, options.StorageClass, options.SelectedNode)
    if result == nil {
        return nil, state, err
    }
    req := result.req
    volSizeBytes := req.CapacityRange.RequiredBytes
    pvName := req.Name
    provisionerCredentials := req.Secrets

    rep, err := p.csiClient.CreateVolume(createCtx, req)
}

csi-provisioner 向 controller 插件发送 CreateVolume CSI 请求,在存储系统系统中创建一个存储卷。

CreateVolumeRequest 请求对象来自 prepareProvision 方法 https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/pkg/controller/controller.go#L507-L699

// prepareProvision does non-destructive parameter checking and preparations for provisioning a volume.
func (p *csiProvisioner) prepareProvision(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass, selectedNode *v1.Node) (*prepareProvisionResult, controller.ProvisioningState, error) {
    // a lot of code here

    // Create a CSI CreateVolumeRequest and Response
    req := csi.CreateVolumeRequest{
        Name:               pvName,
        Parameters:         sc.Parameters,
        VolumeCapabilities: volumeCaps,
        CapacityRange: &csi.CapacityRange{
            RequiredBytes: int64(volSizeBytes),
        },
    }
    // a lot of code here
    return &prepareProvisionResult{
        fsType:         fsType,
        migratedVolume: migratedVolume,
        req:            &req,
        csiPVSource:    csiPVSource,
    }, controller.ProvisioningNoChange, nil
}

请求中包含了使用的 StorageClass 对象 parameters 字段中参数字典、访问模式、数据卷容量等必要信息。

成功创建好数据卷后,controller 插件向 csi-provisioner 应答 CreateVolumeResponse,这也是 CSI 标准协议。

csi-provisioner 拿到真正的存储卷信息后,会同步地在 Provision 方法中实例化一个 PV 对象并返回 https://github.com/kubernetes-csi/external-provisioner/blob/v3.1.0/pkg/controller/controller.go#L701-L876

func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
    // a lot of code here
    pv := &v1.PersistentVolume{
        ObjectMeta: metav1.ObjectMeta{
            Name: pvName,
        },
        Spec: v1.PersistentVolumeSpec{
            AccessModes:  options.PVC.Spec.AccessModes,
            MountOptions: options.StorageClass.MountOptions,
            Capacity: v1.ResourceList{
                v1.ResourceName(v1.ResourceStorage): bytesToQuantity(respCap),
            },
            // TODO wait for CSI VolumeSource API
            PersistentVolumeSource: v1.PersistentVolumeSource{
                CSI: result.csiPVSource,
            },
        },
    }

    // a lot of code here
    return pv, controller.ProvisioningFinished, nil
}

回到 Provision 方法的调用者 provisionClaimOperation 方法中:

// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns nil error only when the volume was provisioned (in which case it also returns ProvisioningFinished),
// a normal error when the volume was not provisioned and provisioning should be retried (requeue the claim),
// or the special errStopProvision when provisioning was impossible and no further attempts to provision should be tried.
func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
    // a lot of code here
    volume, result, err := ctrl.provisioner.Provision(ctx, options)
    // a lot of code here
    if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil {
        return ProvisioningFinished, err
    }
}

在此向 Kubernetes API server 发送请求创建 PV 对象。至此我们看到在集群中出现 PV。

绑定

PVC 与 PV 的绑定,是由 kube-controller-manager 中的 PersistentVolumeController 控制器完成的,它会调谐集群中的 PVC 对象 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/controller/volume/persistentvolume/pv_controller.go#L244-L268

// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
    klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))

    // Set correct "migrated-to" annotations on PVC and update in API server if
    // necessary
    newClaim, err := ctrl.updateClaimMigrationAnnotations(ctx, claim)
    if err != nil {
        // Nothing was saved; we will fall back into the same
        // condition in the next call to this method
        return err
    }
    claim = newClaim

    if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) {
        return ctrl.syncUnboundClaim(ctx, claim)
    } else {
        return ctrl.syncBoundClaim(claim)
    }
}

将 PVC 与未绑定的 PV 绑定 https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/controller/volume/persistentvolume/pv_controller.go#L327-L467

// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {
    // a lot of code here
    obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
    if err != nil {
        return err
    }
    if !found {
        // User asked for a PV that does not exist.
        // OBSERVATION: pvc is "Pending"
        // Retry later.
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
        if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
            return err
        }
        return nil
    } else {
        volume, ok := obj.(*v1.PersistentVolume)
        if !ok {
            return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
        }
        klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
        if volume.Spec.ClaimRef == nil {
            // a lot of code here
            } else if err = ctrl.bind(volume, claim); err != nil {
                // On any error saving the volume or the claim, subsequent
                // syncClaim will finish the binding.
                return err
            }
            // OBSERVATION: pvc is "Bound", pv is "Bound"
            return nil
        }
    }
}

通过 PVC 的 volumeName 字段找到相应的 PV。所谓的“绑定”,即将 PVC 的 phase 字段更新为 Bound,代码就不贴了。

总结

我们理了一遍“动态”创建持久化存储的完整过程,其中多个控制器组件参与了异步协作:

  • csi-provisioner 监听集群中的 PVC 对象事件,同步地创建真正的存储卷和 PV 对象
  • kube-controller-manager 监听集群中的 PVC 对象事件,确定 PV 成功创建无误,将 PVC 与 PV 绑定

并不复杂,我们在设计基于 Kubernetes 的系统时也可以借鉴这种模式。下篇我将谈谈 Kubernetes 中 WaitForFirstConsumer(延迟绑定)是如何实现的。