kubernetes存储架构之PV controller源码解读
kubernetes存储之PV controller源码解读
摘要
本文介绍kubernetes存储架构的基础,并重点对PV controller的源码进行了学习
引入
从一个业务场景出发,假如你所在的公司,有个物理机,上面部署了web服务器,随着业务增长,本地磁盘空间不够使用了。业务使用方想你所在的基础设施部门申请加1T的块设备存储空间。于是你给出了解决方案:
- 方案一:给物理机添加一块新磁盘
实施步骤如下:
-
获取一块磁盘(买、借都可以)
-
将磁盘插入服务器插槽
-
物理机OS层面识别磁盘,并格式化后再mount到目录
- 方案二: 假如已经一个ceph存储集群了,也可以从ceph存储划一个卷,映射给物理机使用
实施步骤如下:
- rbd create 创建一个卷
- rbd map将卷映射给物理机
- 物理机OS层面识别磁盘,并格式化后再mount到目录
再进一步设想,假如业务使用的容器POD,而不是物理机呢?
方案一:我们把pod当成物理机,与物理机一样,在pod内部安装ceph rbd客户端,再把卷rbd map给pod使用,最后进入pod 把卷mount到用户需要的目录。一切都手动完成,效率低、也是非主流方式。
方案二:使用kubernetes+ceph csi,实现为pod添加持久化存储的自动化
kubernetes存储架构
存储基础
在kubernetes环境中,通过PVC与PV对存储卷进行抽象。
PV与PVC
-
PersistentVolumeClaim (简称PVC): 是用户存储的请求。它和Pod类似。Pod消耗Node资源,PVC消耗PV资源。Pod可以请求特定级别的资源(CPU和MEM)。PVC可以请求特定大小和访问模式的PV。
-
PersistentVolume (简称PV): 由管理员设置的存储,它是集群的一部分。就像节点(Node)是集群中的资源一样,PV也是集群中的资源。它包含存储类型,存储大小和访问模式。它的生命周期独立于Pod,例如当使用它的Pod销毁时对PV没有影响。
PV提供方式
- 静态PV:集群管理员创建许多PV,它们包含可供集群用户使用的实际存储的详细信息。
- 动态PV:当管理员创建的静态PV都不匹配用户创建的PersistentVolumeClaim时,集群会为PVC动态的配置卷。此配置基于StorageClasses:PVC必须请求存储类(storageclasses),并且管理员必须已创建并配置该类,以便进行动态创建。
PV 的访问模式
- ReadWriteOnce - 卷以读写方式挂载到单个节点
- ReadOnlyMany - 卷以只读方式挂载到多个节点
- ReadWriteMany - 卷以读写方式挂载到多个节点
PVC回收策略
- Retain - 手动回收。在删除pvc后PV变为Released不可用状态, 若想重新被使用,需要管理员删除pv,重新创建pv,删除pv并不会删除存储的资源,只是删除pv对象而已;若想保留数据,请使用该Retain。
- Recycle - 基本擦洗(rm -rf /thevolume/)。 删除pvc自动清除PV中的数据,效果相当于执行 rm -rf /thevolume/。删除pvc时,pv的状态由Bound变为Available。此时可重新被pvc申请绑定。
- Delete - 删除存储上的对应存储资源。关联的存储资产(如AWS EBS,GCE PD,Azure磁盘或OpenStack Cinder卷)将被删除。NFS不支持delete策略。
PV的状态
- Available(可用状态) - 一块空闲资源还没有被任何声明绑定
- Bound(绑定状态) - 声明分配到PVC进行绑定,PV进入绑定状态
- Released(释放状态) - PVC被删除,PV进入释放状态,等待回收处理
- Failed(失败状态) - PV执行自动清理回收策略失败
(图片来自于网络)
PVC的状态
- Pending(等待状态) - 等待绑定PV
- Bound(绑定状态) - PV已绑定PVC
- Lost(绑定丢失) - 再次绑定PV后进入Bound状态
(图片来自于网络)
PVC yaml 字段描述
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:annotations:# 这个注解用于标记 PV 是否已经成功绑定到一个 PVC。当 PV 和 PVC 成功绑定时,Kubernetes 会在 PV 上添加这个注解# Kubernetes pv controller 控制器会检查这个注解来确定绑定过程的状态,从而进行相应的管理操作。pv.kubernetes.io/bind-completed: "yes"# 这个注解用于标记 PV 是否是由 Kubernetes 的存储控制器自动绑定到 PVC 的。如果 PV 是通过 StorageClass 动态创建的,或者是由控制器自动绑定的,这个注解会被设置为 "yes"。pv.kubernetes.io/bound-by-controller: "yes"# 这个注解用于指定创建 PV 的存储预配置器(storageClass)的名称。在动态存储管理中,存储预配置器会根据 PVC 的请求自动创建和配置 PV,而不需要管理员手动创建 PVvolume.beta.kubernetes.io/storage-provisioner: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pvc-protection# PVC 的名称name: basepvc-dg03dev8jxdh# PVC所在的命名空间namespace: publicresourceVersion: "11508437467"selfLink: /api/v1/namespaces/public/persistentvolumeclaims/basepvc-dg03dev80407964jxdh-swap# 用于唯一表示PVC,同时用于管理 PVC 与 PersistentVolume (PV) 之间的绑定关系。uid: 020270a5-52f4-11ef-b4be-e8ebd398881c
spec:# PVC 的访问模式accessModes:- ReadWriteOnce# 如果是克隆卷,这里用于标识由哪个快照创建而来的dataSource: nullresources:requests:# 请求卷的空间大小storage: 51GistorageClassName: csi-virt-rbd# block 或 filesystem,是否需要文件系统格式化volumeMode: Filesystem# 绑定PV的名称,规范是"pvc-<pvc的uid>"volumeName: pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
status:accessModes:- ReadWriteOncecapacity:storage: 51Gi# PVC当前的状态 phase: Bound
PV yaml 字段描述
---
apiVersion: v1
kind: PersistentVolume
metadata:annotations:# 这个注解用于指定创建 PV 的存储预配置器的名称。pv.kubernetes.io/provisioned-by: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pv-protectionlabels:baymax.io/rbd-cluster-name: ceph10name: pvc-020270a5-52f4-11ef-b4be-e8ebd398881cresourceVersion: "10132283575"selfLink: /api/v1/persistentvolumes/pvc-020270a5-52f4-11ef-b4be-e8ebd398881cuid: 0220ec79-52f4-11ef-b4be-e8ebd398881c
spec:accessModes:- ReadWriteOncecapacity:storage: 51Gi# claimRef 用于记录 PV 绑定到的 PVC 的信息,包括 PVC 的名称、命名空间和 UID。claimRef:apiVersion: v1kind: PersistentVolumeClaim# pvc的名称name: basepvc-dg03dev80407964jxdh-swapnamespace: publicresourceVersion: "10132283544"# pvc的uiduid: 020270a5-52f4-11ef-b4be-e8ebd398881c# PV 对应的 volume卷由csi进行生命周期管理 csi:# 存储驱动的名称driver: rbd.csi.obs.com# 卷格式化时指定的文件系统类型fsType: ext4volumeAttributes:adminKeyring: AQBdZFtkGyvfxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)baymax.io/rbd-pvc-name: basepvc-dg03dev80407964jxdh-swapbaymax.io/rbd-pvc-namespace: publicclustername: ceph10monitors: 10.x.x.x:6789,10.x.x.x:6789,10.x.x.x:6789 (IP隐藏)storage.kubernetes.io/csiProvisionerIdentity: 1678171935067-8081-rbd.csi.obs.comuserKeyring: AQBKqlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)# volumeHandle 是由 CSI 存储驱动程序生成的唯一标识符,用于标识唯一标识一个特定的存储卷。 # 在 PV 和 PVC 成功绑定后,volumeHandle 确保 Kubernetes 能够正确地将 PV 与 PVC 关联起来。volumeHandle: csi-rbd-pvc-020270a5-52f4-11ef-b4be-e8ebd398881c# 回收策略 persistentVolumeReclaimPolicy: Delete# storageClass的名称storageClassName: csi-virt-rbdvolumeMode: Filesystem
status:phase: Bound
** storageClass yaml 字段描述**
# kubectl get storageclasses.storage.k8s.io ceph-nbd
NAME PROVISIONER AGE
ceph-nbd ceph-nbd.csi.obs.com 53d
# kubectl get storageclasses.storage.k8s.io ceph-nbd -oyaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:annotations:kubectl.kubernetes.io/last-applied-configuration: |{"apiVersion":"storage.k8s.io/v1","kind":"StorageClass","metadata":{"annotations":{},"name":"ceph-nbd"},"provisioner":"ceph-nbd.csi.obs.com","reclaimPolicy":"Delete"}creationTimestamp: 2024-10-25T02:03:28Zname: ceph-nbdresourceVersion: "10961600994"selfLink: /apis/storage.k8s.io/v1/storageclasses/ceph-nbduid: 53c697ad-9275-11ef-88f0-e8ebd3986310
provisioner: ceph-nbd.csi.obs.com
# volume回收模式
reclaimPolicy: Delete
# 绑定策略,立即还是延迟
volumeBindingMode: Immediate
kubernetes存储结构
在kubernetes中,与存储相关的组件如下:
(图片来自于网络)
-
PV Controller: 负责
PV/PVC
的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作; -
AD Controller:负责存储设备的
Attach/Detach
操作,将设备挂载到目标节点; -
Volume Manager:管理卷的
Mount/Unmount
操作、卷设备的格式化以及挂载到一些公用目录上的操作; -
Volume Plugins:它主要是对上面所有挂载功能的实现;
PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。
接下来,本文将对PV controller的源码进行分析,深入学习PV controller是如何实现的。
PV controller 的启动
如果对 kube-controller-manager 组件有一定了解,kube-controller-manager 包括了多种controller,包括node contrller、deployment controller、service controller、Daemonset controller、PV controller和AD controller等。
在kube-controller-manager进程启动时,会依次启动这些controller.
pv controller 启动流程如下:
main()—> NewHyperKubeCommand()—>kubecontrollermanager.NewControllerManagerCommand()—>Run() —> StartControllers() —> NewControllerInitializers() —> startPersistentVolumeBinderController(ctx) —>NewController()—> go volumeController.Run(ctx.Stop)
kube-controller-manager 在启动Run()时,会调用startPersistentVolumeBinderController(ctx)函数,在函数内部会调用函数NewController(),先实例化一个PV controller,再通过协程启动PV controller: go volumeController.Run(ctx.Stop)
源码路径: k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
备注: 本文代码是v1.12版本
NewController
PV controller 结构体包括的如下字段,重要的有:各种lister,pv controller通过lister 可以从本地cache中查询资源的信息和状态;包括2个本地cache缓存空间,用来分别保存k8s已有的volume和claims的信息与状态。另外定义了2个消息队列,当有新的claim和volume 对应的创建、删除、更新等事件时,就将对应事件放到队列,等待PV controller进一步处理。
startPersistentVolumeBinderController(ctx) 源码中使用这个函数实例化一个PV controller并通过协程方式启动。
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {params := persistentvolumecontroller.ControllerParameters{KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,VolumePlugins: ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),Cloud: ctx.Cloud,ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),PodInformer: ctx.InformerFactory.Core().V1().Pods(),NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,}// 实例化一个pv controllervolumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)if volumeControllerErr != nil {return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)}// 协程方式启动 PV controllergo volumeController.Run(ctx.Stop)return nil, true, nil
}
NewController() 创建一个新的pv controller
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {// 初始化一个 事件记录器eventRecorder := p.EventRecorderif eventRecorder == nil {broadcaster := record.NewBroadcaster()broadcaster.StartLogging(glog.Infof)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})}// 实例化一个PV controllercontroller := &PersistentVolumeController{volumes: newPersistentVolumeOrderedIndex(),claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),kubeClient: p.KubeClient,eventRecorder: eventRecorder,runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),cloud: p.Cloud,enableDynamicProvisioning: p.EnableDynamicProvisioning,clusterName: p.ClusterName,createProvisionedPVRetryCount: createProvisionedPVRetryCount,createProvisionedPVInterval: createProvisionedPVInterval,claimQueue: workqueue.NewNamed("claims"),volumeQueue: workqueue.NewNamed("volumes"),resyncPeriod: p.SyncPeriod,}// Prober is nil because PV is not aware of Flexvolume.if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)}/// 定义volume事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.VolumeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },},)controller.volumeLister = p.VolumeInformer.Lister()controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced// 定义claim事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.ClaimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },},)controller.claimLister = p.ClaimInformer.Lister()controller.claimListerSynced = p.ClaimInformer.Informer().HasSyncedcontroller.classLister = p.ClassInformer.Lister()controller.classListerSynced = p.ClassInformer.Informer().HasSyncedcontroller.podLister = p.PodInformer.Lister()controller.podListerSynced = p.PodInformer.Informer().HasSyncedcontroller.NodeLister = p.NodeInformer.Lister()controller.NodeListerSynced = p.NodeInformer.Informer().HasSyncedreturn controller, nil
}
ctrl.Run()
Run()
是pv controller启动入口, run() 的作用是启动了三个工作协程:
ctrl.resync: 定期重新同步控制器的状态,确保控制器的状态与集群中的实际状态一致,具体的实现是:定时循环查询出pv和pvc列表,然后放入到队列volumeQueue和claimQueue中,让volumeWorker和claimWorker进行消费。
ctrl.volumeWorker:通过消费队列volumeQueue,来处理与 PersistentVolume (PV) 相关的操作,包括创建、绑定、释放和删除 PV
ctrl.claimWorker:通过消费队列claimQueue,来处理与 PersistentVolumeClaim (PVC) 相关的操作,包括创建、绑定和删除 PVC。
任务:
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer ctrl.claimQueue.ShutDown()defer ctrl.volumeQueue.ShutDown()klog.Infof("Starting persistent volume controller")defer klog.Infof("Shutting down persistent volume controller")if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {return}ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)// 核心代码! 启动三个协程go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)go wait.Until(ctrl.volumeWorker, time.Second, stopCh)go wait.Until(ctrl.claimWorker, time.Second, stopCh)metrics.Register(ctrl.volumes.store, ctrl.claims)<-stopCh
}
简单图示
ctrl.resync
作用是定时从cache中获取 pvc列表数据并放入到消费队列 ctrl.claimQueue,之后ctrl.claimWorker 循环任务来消费处理内部数据;同时定时从cache中获取 PV列表数据,并放入消费队列ctrl.volumeQueue,之后ctrl.volumeWorker 循环任务来消费处理内部数据
//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) resync() {klog.V(4).Infof("resyncing PV controller")// 遍历pvc ,将pvc 放入 ctrl.claimQueue,等待处理pvcs, err := ctrl.claimLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list claims: %s", err)return}// 将 pvc 放入消费队列 ctrl.claimQueuefor _, pvc := range pvcs {ctrl.enqueueWork(ctrl.claimQueue, pvc)}// 遍历pv ,将 pv 放入 ctrl.volumeQueue,等待处理pvs, err := ctrl.volumeLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list persistent volumes: %s", err)return}// 将 pv 放入消费队列 ctrl.volumeQueuefor _, pv := range pvs {ctrl.enqueueWork(ctrl.volumeQueue, pv)}
}
简单图示:
这是一个典型的”生产者与消费者“模型,ctrl.resync负责将数据放入2个队列,之后ctrl.volumeWorker与ctrl.claimWorker消费队列里面的数据。
ctrl.volumeWorker
volumeWorker会不断循环消费volumeQueue队列里面的数据,然后获取到相应的PV执行updateVolume操作。
volumeWorker()函数的代码比较长,我们展开分析一下。
// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {workFunc := func() bool {// 从消费队列 ctrl.volumeQueue 取一个 PV(或称为volume)keyObj, quit := ctrl.volumeQueue.Get()if quit {return true}defer ctrl.volumeQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("volumeWorker[%s]", key)_, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)return false}// 通过 volume name 查询本地cache,从cache中获取到 volume 对象volume, err := ctrl.volumeLister.Get(name)if err == nil {// The volume still exists in informer cache, the event must have// been add/update/sync// 如果 volume 存在,则用updateVolume() 函数处理这个volumectrl.updateVolume(volume)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting volume %q from informer: %v", key, err)return false}// The volume is not in informer cache, the event must have been// "delete"volumeObj, found, err := ctrl.volumes.store.GetByKey(key)if err != nil {glog.V(2).Infof("error getting volume %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the volume from its cacheglog.V(2).Infof("deletion of volume %q was already processed", key)return false}volume, ok := volumeObj.(*v1.PersistentVolume)if !ok {glog.Errorf("expected volume, got %+v", volumeObj)return false}// 如果volume资源不存在,说明需要删除volume,则调用方法ctrl.deleteVolume(volume),让底层删除卷 ctrl.deleteVolume(volume)return false}// 不断执行 workFunc() 函数for {if quit := workFunc(); quit {glog.Infof("volume worker queue shutting down")return}}
}
updateVolume() 方法,新了一个check后调用ctrl.syncVolume(volume)进一步处理。
// updateVolume runs in worker thread and handles "volume added",
// "volume updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {// Store the new volume version in the cache and do not process it if this// is an old version.new, err := ctrl.storeVolumeUpdate(volume)if err != nil {glog.Errorf("%v", err)}if !new {return}// 根据当前 PV 对象的规格对 PV 和 PVC 进行绑定或者解绑err = ctrl.syncVolume(volume)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)} else {glog.Errorf("could not sync volume %q: %+v", volume.Name, err)}}
}
ctrl.syncVolume(volume)
是一个重要的函数,需要再仔细解读
说明: 代码中的 “volume” 等价于 PV, “claim” 等价于 “PVC”
syncVolume方法为核心方法,主要调谐更新pv的状态:
(1)如果spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available;
(2)如果spec.claimRef不为空,则该pv已经与pvc bound过了,此时若对应的pvc不存在,则更新pv状态为released;
(3)如果pv对应的pvc被删除了,调用ctrl.reclaimVolume根据pv的回收策略进行相应操作,如果是retain,则不做操作,如果是delete,则调用volume plugin来删除底层存储,并删除pv对象(当volume plugin为csi时,将走out-tree逻辑,pv controller不做删除存储与pv对象的操作,由external provisioner组件来完成该操作)。
// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))// [Unit test set 4]// 如果 volume.Spec.ClaimRef 为 nil,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef == nil {// Volume is unusedglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else /* pv.Spec.ClaimRef != nil */ {// Volume is bound to a claim.// 如果 volume.Spec.ClaimRef.UID 为 ”“,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef.UID == "" {// The PV is reserved for a PVC; that PVC has not yet been// bound to this PV; the PVC sync will handle it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil}glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Get the PVC by _name_// 通过 volume.Spec.ClaimRef 从本地 cache 中查询到对应的 PVC 对象var claim *v1.PersistentVolumeClaimclaimName := claimrefToClaimKey(volume.Spec.ClaimRef)obj, found, err := ctrl.claims.GetByKey(claimName)if err != nil {return err}// 如果没有在本地 cache 中查找到 PVC,同时 PV 的 annotation // 又是包括"pv.kubernetes.io/bound-by-controller"的// 需要再去 apiserver 做double-checkif !found && metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// If PV is bound by external PV binder (e.g. kube-scheduler), it's// possible on heavy load that corresponding PVC is not synced to// controller local cache yet. So we need to double-check PVC in// 1) informer cache// 2) apiserver if not found in informer cache// to make sure we will not reclaim a PV wrongly.// Note that only non-released and non-failed volumes will be// updated to Released state when PVC does not eixst.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// 从 cache 中查找 pvcobj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)if !found {// 如果 cache 中不存在则再去 apiserver 查询是否存在obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)}}}// 如果都没找到 PVC,则抛出错误if !found {glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Fall through with claim = nil// 如果 PVC 找到了, 将 cache 中的 PVC 转换为 PVC 对象} else {var ok boolclaim, ok = obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))}// 如果 PVC 的 UID 与 PV 中的 UID 不同,说明绑定错误了if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {// The claim that the PV was pointing to was deleted, and another// with the same name created.glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Treat the volume as bound to a missing claim.claim = nil}// 如果 PVC 不存在,而且 PV 的状态是 ”Released“ 或 "Failed", 则将 apiserver 和 本地 cache中的PV 状态都更新为 "Released"if claim == nil {// If we get into this block, the claim must have been deleted;// NOTE: reclaimVolume may either release the PV back into the pool or// recycle it or do nothing (retain)// Do not overwrite previous Failed state - let the user see that// something went wrong, while we still re-try to reclaim the// volume.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err = ctrl.reclaimVolume(volume); err != nil {// Release failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 如果 claim 不为 nil 而且 claim.Spec.VolumeName == "",则再检查 volumeMode 是否匹配// 如果不匹配,则写 evenRecorder 后返回} else if claim.Spec.VolumeName == "" {if isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec); err != nil || isMisMatch {// Binding for the volume won't be called in syncUnboundClaim,// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)// Skipping syncClaimreturn nil}// 如果 volumeMode 是匹配的,则继续往下走: 判断 PV 是否有 annotation: "pv.kubernetes.io/bound-by-controller" // 如果有这个annotation, 后续会交给 PVC sync 来处理// 如果没有这个annotation, 等待用户自行修复if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The binding is not completed; let PVC sync handle itglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)} else {// Dangling PV; try to re-establish the link in the PVC syncglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)}// In both cases, the volume is Bound and the claim is Pending.// Next syncClaim will fix it. To speed it up, we enqueue the claim// into the controller, which results in syncClaim to be called// shortly (and in the right worker goroutine).// This speeds up binding of provisioned volumes - provisioner saves// only the new PV and it expects that next syncClaim will bind the// claim to it.// 把 PVC 加入 claimQueue 队列,等待 syncClaim() 任务来处理ctrl.claimQueue.Add(claimToClaimKey(claim))return nil// 如果 claim 不为空,而且 claim.Spec.VolumeName == volume.Name,说明 binding 是正确的。// 这是将volume 的状态更新为 Bound} else if claim.Spec.VolumeName == volume.Name {// Volume is bound to a claim properly, update status if necessaryglog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else {// Volume is bound to a claim, but the claim is bound elsewhere// 如果PV 的 annotation 包括 "pv.kubernetes.io/provisioned-by" 并且回收策略 volume.Spec.PersistentVolumeReclaimPolicy 是 Delete时,// 再判断 PV 的phase 如果不是 Released 和 Failed,则更新 PV 的Phase为 Releasedif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {// This volume was dynamically provisioned for this claim. The// claim got bound elsewhere, and thus this volume is not// needed. Delete it.// Mark the volume as Released for external deleters and to let// the user know. Don't overwrite existing Failed status!if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)// 更新 PV 的 phaseif volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}// 之后再 调用 ctrl.reclaimVolume(volume) 对PV 进行回收处理if err = ctrl.reclaimVolume(volume); err != nil {// Deletion failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 执行 unbindVolume()操作} else {// Volume is bound to a claim, but the claim is bound elsewhere// and it's not dynamically provisioned.if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// This is part of the normal operation of the controller; the// controller tried to use this volume for a claim but the claim// was fulfilled by another volume. We did this; fix it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)if err = ctrl.unbindVolume(volume); err != nil {return err}return nil} else {// The PV must have been created with this ptr; leave it alone.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)// This just updates the volume phase and clears// volume.Spec.ClaimRef.UID. It leaves the volume pre-bound// to the claim.if err = ctrl.unbindVolume(volume); err != nil {return err}return nil}}}}
}
由于代码较长,梳理出主要逻辑的流程图大致如下:
updateVolumePhase()
- 更新etcd中 volume 的phase状态
- 更新本地informer cache中的phase状态
// updateVolumePhase saves new volume phase to API server.
func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)// 如果 Phase 已经满足,就什么也不做if volume.Status.Phase == phase {// Nothing to do.glog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)return volume, nil}// 核心代码!volumeClone := volume.DeepCopy()volumeClone.Status.Phase = phasevolumeClone.Status.Message = message// 核心代码!请求 api server 对 volume 的状态进行修改newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)return newVol, err}// 之后再修改本地 informer 中 cache 中的数据_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return newVol, err}glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)return newVol, err
}
unbindVolume(volume)
作用是将 一个 Bound 状态的 volume 执行 unbound 操作
- 将本地cache中的 volume 进行unbond操作:1) volumeClone.Spec.ClaimRef = nil 2) volumeClone.Annotations 中删除 “pv.kubernetes.io/bound-by-controller” 字段; 3) volume phase 修改为 “Available”
- 发起http请求,对apiserver中的 volume 进行unbound操作(同上)。
// unbindVolume rolls back previous binding of the volume. This may be necessary
// when two controllers bound two volumes to single claim - when we detect this,
// only one binding succeeds and the second one must be rolled back.
// This method updates both Spec and Status.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Save the PV only when any modification is necessary.volumeClone := volume.DeepCopy()if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The volume was bound by the controller.// 将volume中的 ClaimRef 置为 nilvolumeClone.Spec.ClaimRef = nil// 从删除volume的annotation对应字段delete(volumeClone.Annotations, annBoundByController)if len(volumeClone.Annotations) == 0 {// No annotations look better than empty annotation map (and it's easier// to test).volumeClone.Annotations = nil}} else {// The volume was pre-bound by user. Clear only the binging UID.// 如果是用户手动创建的volume 则把 UID 设置为""volumeClone.Spec.ClaimRef.UID = ""}newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)return err}_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return err}glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)// Update the status_, err = ctrl.updateVolumePhase(newVol, v1.VolumeAvailable, "")return err
}
ctrl.reclaimVolume
// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
// starts appropriate reclaim action.
func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {switch volume.Spec.PersistentVolumeReclaimPolicy {// 如果回收策略是 Retain ,则什么也不做case v1.PersistentVolumeReclaimRetain:glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)// 如果回收策略是 recycle, 调用plugin 执行 volume 的清理操作(清理后,该PV后续可以被重复使用)case v1.PersistentVolumeReclaimRecycle:glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))ctrl.scheduleOperation(opName, func() error {ctrl.recycleVolumeOperation(volume)return nil})// 如果回收策略是 Delete,调用plugin 执行 volume 的 delete操作case v1.PersistentVolumeReclaimDelete:glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))startTime := time.Now()// 调用plugin 执行 volume 的 delete操作ctrl.scheduleOperation(opName, func() error {pluginName, err := ctrl.deleteVolumeOperation(volume)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err)return err})default:// Unknown PersistentVolumeReclaimPolicyif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil {return err}}return nil
}
ctrl.deleteVolumeOperation()
// deleteVolumeOperation deletes a volume. This method is running in standalone
// goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) {glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)// This method may have been waiting for a volume lock for some time.// Previous deleteVolumeOperation might just have saved an updated version, so// read current volume state now.newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})if err != nil {glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)return "", nil}needsReclaim, err := ctrl.isVolumeReleased(newVolume)if err != nil {glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)return "", nil}if !needsReclaim {glog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name)return "", nil}// 核心! 执行 delete volume 操作pluginName, deleted, err := ctrl.doDeleteVolume(volume)if err != nil {// Delete failed, update the volume and emit an event.glog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)if vol.IsDeletedVolumeInUse(err) {// The plugin needs more time, don't mark the volume as Failed// and send Normal event onlyctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())} else {// The plugin failed, mark the volume as Failed and send Warning// eventif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil {glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)// Save failed, retry on the next deletion attemptreturn pluginName, err}}// Despite the volume being Failed, the controller will retry deleting// the volume in every syncVolume() call.return pluginName, err}if !deleted {// The volume waits for deletion by an external plugin. Do nothing.return pluginName, nil}glog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name)// Delete the volume// 当底层 volume 被删除后,则将apiserver中的 volume if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {// Oops, could not delete the volume and therefore the controller will// try to delete the volume again on next update. We _could_ maintain a// cache of "recently deleted volumes" and avoid unnecessary deletion,// this is left out as future optimization.glog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err)return pluginName, nil}return pluginName, nil
}// doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
// the volume plugin name. Also, it returns 'true', when the volume was deleted and
// 'false' when the volume cannot be deleted because of the deleter is external. No
// error should be reported in this case.
func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolume) (string, bool, error) {glog.V(4).Infof("doDeleteVolume [%s]", volume.Name)var err error// 查到插件 volume pluginplugin, err := ctrl.findDeletablePlugin(volume)if err != nil {return "", false, err}if plugin == nil {// External deleter is requested, do nothingglog.V(3).Infof("external deleter for volume %q requested, ignoring", volume.Name)return "", false, nil}// Plugin foundpluginName := plugin.GetPluginName()glog.V(5).Infof("found a deleter plugin %q for volume %q", pluginName, volume.Name)spec := vol.NewSpecFromPersistentVolume(volume, false)// 创建一个plugin 的 deleterdeleter, err := plugin.NewDeleter(spec)if err != nil {// Cannot create deleterreturn pluginName, false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)}//调用 Delete()方法执行 存储层面volume的删除opComplete := util.OperationCompleteHook(pluginName, "volume_delete")err = deleter.Delete()opComplete(&err)if err != nil {// Deleter failedreturn pluginName, false, err}glog.V(2).Infof("volume %q deleted", volume.Name)return pluginName, true, nil
}// findDeletablePlugin finds a deleter plugin for a given volume. It returns
// either the deleter plugin or nil when an external deleter is requested.
func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.PersistentVolume) (vol.DeletableVolumePlugin, error) {// Find a plugin. Try to find the same plugin that provisioned the volumevar plugin vol.DeletableVolumePluginif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) {provisionPluginName := volume.Annotations[annDynamicallyProvisioned]if provisionPluginName != "" {// 通过 volume 中 annotation的 "pv.kubernetes.io/provisioned-by" 查找对应的 plugin 名称plugin, err := ctrl.volumePluginMgr.FindDeletablePluginByName(provisionPluginName)if err != nil {if !strings.HasPrefix(provisionPluginName, "kubernetes.io/") {// External provisioner is requested, do not report errorreturn nil, nil}return nil, err}return plugin, nil}}// 如果上面没找到再去 volume 的 Spec中查找// The plugin that provisioned the volume was not found or the volume// was not dynamically provisioned. Try to find a plugin by spec.spec := vol.NewSpecFromPersistentVolume(volume, false)plugin, err := ctrl.volumePluginMgr.FindDeletablePluginBySpec(spec)if err != nil {// No deleter found. Emit an event and mark the volume Failed.return nil, fmt.Errorf("Error getting deleter volume plugin for volume %q: %v", volume.Name, err)}return plugin, nil
}
ctrl.claimWorker
ctrl.claimWorker() 方法是通过协程方式启动,对claim(PVC) 进行生命周期管理,即负责处理 PVC 的绑定、创建和删除等操作。
(1) 先从本地informer cache中查询claim, 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理;
(2) 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {workFunc := func() bool {keyObj, quit := ctrl.claimQueue.Get()if quit {return true}defer ctrl.claimQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("claimWorker[%s]", key)namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)return false}claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)if err == nil {// The claim still exists in informer cache, the event must have// been add/update/sync// 核心代码! 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理ctrl.updateClaim(claim)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting claim %q from informer: %v", key, err)return false}// The claim is not in informer cache, the event must have been "delete"claimObj, found, err := ctrl.claims.GetByKey(key)if err != nil {glog.V(2).Infof("error getting claim %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the claim from its cacheglog.V(2).Infof("deletion of claim %q was already processed", key)return false}claim, ok := claimObj.(*v1.PersistentVolumeClaim)if !ok {glog.Errorf("expected claim, got %+v", claimObj)return false}// 核心代码! 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理ctrl.deleteClaim(claim)return false}// 核心代码! 持续循环地从 claimQueue 里面获取到的 PersistentVolumeClaim,并进行相应处理for {if quit := workFunc(); quit {glog.Infof("claim worker queue shutting down")return}}
}
重点函数包括了ctrl.updateClaim(claim)
和 ctrl.deleteClaim(claim)
ctrl.updateClaim(claim)
ctrl.updateClaim(claim) 用于处理 claim 的 ”创建“、”更新“以及周期性更新的事件
// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {// Store the new claim version in the cache and do not process it if this is// an old version.// 先检查 claim 的版本号,如果不是"新的",就不处理new, err := ctrl.storeClaimUpdate(claim)if err != nil {glog.Errorf("%v", err)}if !new {return}// 核心代码!如果通过上面检查,则使用函数ctrl.syncClaim(claim)进一步处理err = ctrl.syncClaim(claim)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)} else {glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)}}
}
ctrl.syncClaim(claim)
syncClaim是 pv controler的主要方法,它决定了具体如何处理claim。该方法又根据annotation中是否包括"pv.kubernetes.io/bind-completed",再决定将claim进一步交个ctrl.syncUnboundClaim(claim) 或 ctrl.syncBoundClaim(claim) 处理
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {glog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))if !metav1.HasAnnotation(claim.ObjectMeta, annBindCompleted) {// 如果不包括annotation "pv.kubernetes.io/bind-completed",这个annotation用于标记 bind 是否完成。// 此时没有这个annotation,说明是还没有Bind操作的claim,比如处理pengding状态的claim,接下来交给 ctrl.syncUnboundClaim(claim) 处理return ctrl.syncUnboundClaim(claim)} else {// 如果包括annotation "pv.kubernetes.io/bind-completed",说明已经 bind 过的claim,接下来交给 ctrl.syncBoundClaim(claim) 处理return ctrl.syncBoundClaim(claim)}
}
ctrl.syncBoundClaim()
ctrl.syncBoundClaim() 作为 pv controller 最重要的方法,必须仔细分析。代码的逻辑如下:
- 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值确为空了,这种情况就将claim状态改为"lost"并抛出event并返回
- 从本地informer cache中查找对应的 claim.Spec.VolumeName
- 如果没找对应的volume,说明claim bind了一个不存在的volume。这种情况就将claim状态改为"lost"并抛出event并返回
- 如果找到对应的volume
- 如果volume.Spec.ClaimRef == nil,说明claim 已经bind 了volume, 但volume确没有bind对应的claim,这种情况就调用 ctrl.bind(volume, claim)方法让volume 绑定 claim
- 如果volume.Spec.ClaimRef.UID == claim.UID,说明逻辑都是正常的。接下来任然做一次ctrl.bind(),但是大多数情况会直接返回(因为所有的操作都已经做完了)
- 其他情况,比如volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这种情况就将claim状态改为"lost"并抛出event并返回
// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {// HasAnnotation(pvc, annBindCompleted)// This PVC has previously been bound// OBSERVATION: pvc is not "Pending"// [Unit test set 3]// 第1种情况: 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值异常了,抛出event并返回 if claim.Spec.VolumeName == "" {// Claim was bound before but not any more.if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil}// 找到对应的 volume obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 第2种情况: 如果claim.Spec.VolumeName 有值,claim之前Bound过的,但 volume 不存在了或许被删除了,抛出event并返回 if !found {// Claim is bound to a non-existing volume.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// 第3种情况:如果claim.Spec.VolumeName 有值,而且volume也存在,但是volume对应的值volume.Spec.ClaimRef为nil// 可能有2种场景:1)claim bound了但是volume 确是unbound的。2)claim bound了但是controller 没有来得及updated volume// 不论具体是哪种场景,接下来都再次执行一次ctrl.bind()操作if volume.Spec.ClaimRef == nil {// Claim is bound but volume has come unbound.// Or, a claim was bound and the controller has not received updated// volume yet. We can't distinguish these cases.// Bind the volume again and set all states to Bound.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 第4种情况: 如果claim.Spec.VolumeName 有值,而且volume也存在,而且volume对应的值volume.Spec.ClaimRef 不为nil // 而且 volume.Spec.ClaimRef.UID == claim.UID;说明逻辑都是正常的。接下来任然做一次ctrl.bind()} else if volume.Spec.ClaimRef.UID == claim.UID {// All is well// NOTE: syncPV can handle this so it can be left out.// NOTE: bind() call here will do nothing in most cases as// everything should be already set.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 其他情况: volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这时抛出event} else {// Claim is bound but volume has a different claimant.// Set the claim phase to 'Lost', which is a terminal// phase.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {return err}return nil}}
}
ctrl.bind(volume, claim)
// bind saves binding information both to the volume and the claim and marks
// both objects as Bound. Volume is saved first.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {var err errorvar updatedClaim *v1.PersistentVolumeClaimvar updatedVolume *v1.PersistentVolume// 将 volume bind 到 claimif updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {return err}volume = updatedVolume// 更新 volume 的phase 为Bound 状态if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {return err}volume = updatedVolume// 将 volume claim 到 volumeif updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {return err}claim = updatedClaim// 更新 claim 的phase 为Bound 状态if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {return err}claim = updatedClaimreturn nil
}
syncUnboundClaim
-
如果claim.Spec.VolumeName == " ",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态
-
这时就从集群中存在的 volume 中查询是否匹配的 claim(空间大小满足同时access模式满足而且处于”Available“ 状态的), 根据是否没找到了匹配的volume,分2中情况分配处理:
-
如果没找到匹配的volume:
-
再检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding,如果启用了,就返回,等该PVC消费者例如pod,需要时才会对这个claim进行bind处理。
-
如果没有启用延迟bingding,也就是用了默认的Immediate模式,就判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass ,调用ctrl.provisionClaim(claim)方法创建一个新的 volume
-
-
如果找到匹配的volume,则调用ctrl.bind(volume, claim)将volume与claim进行绑定,如果绑定正常就正常退出,如果bind错误就返回错误。
-
-
如果claim.Spec.VolumeName != " ",说明 claim
指定了
绑定到一个特定的 VolumeName。那接下来先去本地cache中查找对应名称的 volume-
如果本地cache中没有找到,说明这时可能 volume 还没创建,那就将claim 状态设置为 “Pengding”,等下一个循环周期再处理,有可能下一个循环周期有volume 就创建好了。
-
如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef是不是为nil:
-
如果为nil,说明 volume 是没有被使用的、处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定,完成绑定后,claim 是 “Bound” 状态, pv 也是 "Bound"状态,这时就可以正常退出
-
如果不为nil,调用方法 isVolumeBoundToClaim(volume, claim) 判断volume 是否已经绑定给这个 claim 了,“也就是说claim 指定要的是 volume,volume也要的是该claim,双向奔赴”,如果已经绑定了,则在次调用ctrl.bind(volume, claim),完成绑定
-
-
// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {// This is a new PVC that has not completed binding// OBSERVATION: pvc is "Pending"// claim.Spec.VolumeName == "",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态if claim.Spec.VolumeName == "" {// User did not care which PV they get.// 检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding(默认值为Immediate)delayBinding, err := ctrl.shouldDelayBinding(claim)if err != nil {return err}// [Unit test set 1]// 从集群中存在的 volume 中查询是否匹配的 claimvolume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)if err != nil {glog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)}// volume == nil,说明没有找到匹配claim的volumeif volume == nil {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))// No PV could be found// OBSERVATION: pvc is "Pending", will retryswitch {// 检查 storageClass 中的volumeBindingMode字段是否设置了delayBinding,如果为真,这里只是抛出event,不会再继续为PVC 创建对应的 PV// 需要等到消费者完成准备(比如需要挂载该PVC的pod已经OK了),才会去创建PV,并做绑定case delayBinding:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")// 判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass 来创建一个新的 volume case v1helper.GetPersistentVolumeClaimClass(claim) != "":// ctrl.provisionClaim(claim)方法 用于创建新的 volumeif err = ctrl.provisionClaim(claim); err != nil {return err}return nil// 如果没有找到,而且也没设置 storageClass ,则抛出eventdefault:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")}// Mark the claim as Pending and try to find a match in the next// periodic syncClaim// 将 claim 状态设置为 "Pending",等待下一个循环周期到来时会再次处理if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果 volume != nil,说明找到匹配claim的处于”Available“ 状态的 volume,这种情况就调用ctrl.bind(volume, claim),将volume 与 claim 绑定} else /* pv != nil */ {// Found a PV for this claim// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume))if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: claim is "Bound", pv is "Bound"// 完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil}// 如果 pvc.Spec.VolumeName != nil ,说明 claim 指定了绑定到对应的 VolumeName;那边接下来先去本地cache中查找对应名称的 volume} else /* pvc.Spec.VolumeName != nil */ {// [Unit test set 2]// User asked for a specific PV.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 如果本地cache中没有找到,说明这是可能 volume 还没创建,那就将claim 状态设置为 "Pengding",等下一个循环周期再处理,有可能下一个循环周期 // 有可能下一个循环周期 volume 就创建好了if !found {// User asked for a PV that does not exist.// OBSERVATION: pvc is "Pending"// Retry later.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)// 更新 claim 的状态为 'Pengding'if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef不是为nil//如果为nil,说明 volume 是没有被使用的,处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// volume.Spec.ClaimRef == nil, 则将volume 与 claim 绑定if volume.Spec.ClaimRef == nil {// User asked for a PV that is not claimed// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))// 绑定前做一些检查工作if err = checkVolumeSatisfyClaim(volume, claim); err != nil {glog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)//send an eventmsg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)//volume does not satisfy the requirements of the claimif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}// 执行 claim 与 volume的绑定} else if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil// 接下来看看,如果volume.Spec.ClaimRef != nil时,// 而且isVolumeBoundToClaim(volume, claim)判断volume 如果已经绑定给 claim 了,而此时应为PVC 是 "Pengding"的,所以调用ctrl.bind(volume, claim),完成绑定} else if isVolumeBoundToClaim(volume, claim) {// User asked for a PV that is claimed by this PVC// OBSERVATION: pvc is "Pending", pv is "Bound"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))// Finish the volume binding by adding claim UID.if err = ctrl.bind(volume, claim); err != nil {return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil} else {// User asked for a PV that is claimed by someone else// OBSERVATION: pvc is "Pending", pv is "Bound"if !metav1.HasAnnotation(claim.ObjectMeta, annBoundByController) {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))// User asked for a specific PV, retry laterif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else {// This should never happen because someone had to remove// annBindCompleted annotation on the claim.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))}}}}
}
provisionClaim
接下来分析一下provisionClaim是如何为claim,创建volume的。方法调用流程如下:
provisionClaim(claim) —> ctrl.provisionClaimOperation(claim) —> provisionClaimOperation(claim)
provisionClaim(claim)的逻辑是先判断是否启用了动态dynamicProvisoning,如果没启动则退出。如果启用了调用ctrl.provisionClaimOperation(claim)方法继续处理。
// provisionClaim starts new asynchronous operation to provision a claim if
// provisioning is enabled.
func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {if !ctrl.enableDynamicProvisioning {return nil}glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))startTime := time.Now()// 调用ctrl.scheduleOperation()方法,执行provisionClaim; 这里ctrl.scheduleOperation()方法是保证只有一个gorouting运行ctrl.scheduleOperation(opName, func() error {// ctrl.provisionClaimOperation(claim) 为核心代码pluginName, err := ctrl.provisionClaimOperation(claim)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err)return err})return nil
}
provisionClaimOperation() 是创建卷的核心方法,它的逻辑如下
(1) 获取创建claim的storageclass
(2) 为 claim 添加一个 provisioner 的 annotation标识(volume.beta.kubernetes.io/storage-provisioner= class.Provisioner),这个标识表示这个claim 是由哪个plugin实现provision的
(3) 获取 Provisiong 的插件plugin,如果没有plugin==nil表示没有找到插件,则抛出event后退出。
(4) 如果找到plugin:
- 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)
- 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出
- 如果volume 不存在,准备开始创建volume,先准备创建volume需要的参数,之后创建一个provisioner 接口,provisioner实现了provision volume 的具体方法
- 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等
- 为创建出来的 volume 关联 pvc 对象(ClaimRef),尝试为volume 创建 pv 对象 (重复多次)
// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {// 获取创建claim的storageclassclaimClass := v1helper.GetPersistentVolumeClaimClass(claim)glog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)// 获取 Provisiong 的插件pluginplugin, storageClass, err := ctrl.findProvisionablePlugin(claim)if err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())glog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)// The controller will retry provisioning the volume in every// syncVolume() call.return "", err}var pluginName stringif plugin != nil {pluginName = plugin.GetPluginName()}// Add provisioner annotation so external provisioners know when to start// 为 claim 添加一个 provisioner annotationnewClaim, err := ctrl.setClaimProvisioner(claim, storageClass)if err != nil {// Save failed, the controller will retry in the next syncglog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)return pluginName, err}claim = newClaim// 如果没有找到plugin,则抛出event后退出。if plugin == nil {// findProvisionablePlugin returned no error nor plugin.// This means that an unknown provisioner is requested. Report an event// and wait for the external provisionermsg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner)ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)glog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg)return pluginName, nil}// internal provisioning// A previous doProvisionClaim may just have finished while we were waiting for// the locks. Check that PV (with deterministic name) hasn't been provisioned// yet.// plugin != nil// 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)pvName := ctrl.getProvisionedVolumeNameForClaim(claim)// 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出函数,volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})if err == nil && volume != nil {// Volume has been already provisioned, nothing to do.glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))return pluginName, err}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {glog.V(3).Infof("unexpected error getting claim reference: %v", err)return pluginName, err}// Gather provisioning optionstags := make(map[string]string)tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespacetags[CloudVolumeCreatedForClaimNameTag] = claim.Nametags[CloudVolumeCreatedForVolumeNameTag] = pvName// 准备创建volume需要的参数options := vol.VolumeOptions{PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,MountOptions: storageClass.MountOptions,CloudTags: &tags,ClusterName: ctrl.clusterName,PVName: pvName,PVC: claim,Parameters: storageClass.Parameters,}// Refuse to provision if the plugin doesn't support mount options, creation// of PV would be rejected by validation anywayif !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)glog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())}// Provision the volume// 开始provision一个volume// 创建一个provisioner对象,provisioner实现了创建volume 的具体方法provisioner, err := plugin.NewProvisioner(options)if err != nil {strerr := fmt.Sprintf("Failed to create provisioner: %v", err)glog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}var selectedNode *v1.Node = nilif nodeName, ok := claim.Annotations[annSelectedNode]; ok {selectedNode, err = ctrl.NodeLister.Get(nodeName)if err != nil {strerr := fmt.Sprintf("Failed to get target node: %v", err)glog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}}allowedTopologies := storageClass.AllowedTopologiesopComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")// 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。// 具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等volume, err = provisioner.Provision(selectedNode, allowedTopologies)opComplete(&err)if err != nil {// Other places of failure have nothing to do with VolumeScheduling,// so just let controller retry in the next sync. We'll only call func// rescheduleProvisioning here when the underlying provisioning actually failed.ctrl.rescheduleProvisioning(claim)strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))// Create Kubernetes PV object for the volume.if volume.Name == "" {volume.Name = pvName}// Bind it to the claim// 将 volume bind 到 claimvolume.Spec.ClaimRef = claimRefvolume.Status.Phase = v1.VolumeBoundvolume.Spec.StorageClassName = claimClass// Add annBoundByController (used in deleting the volume)// 为 volume 添加必要的 annotationmetav1.SetMetaDataAnnotation(&volume.ObjectMeta, annBoundByController, "yes")metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.GetPluginName())// Try to create the PV object several timesfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)var newVol *v1.PersistentVolumeif newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {// Save succeeded.if err != nil {glog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))err = nil} else {glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))_, updateErr := ctrl.storeVolumeUpdate(newVol)if updateErr != nil {// We will get an "volume added" event soon, this is not a big errorglog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)}}break}// Save failed, try again after a while.glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)time.Sleep(ctrl.createProvisionedPVInterval)}// err != nil 说明创建volume失败了,失败后需要对残留的volume 进行删除清理if err != nil {// Save failed. Now we have a storage asset outside of Kubernetes,// but we don't have appropriate PV object for it.// Emit some event here and try to delete the storage asset several// times.strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)glog.V(3).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)var deleteErr errorvar deleted boolfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {// 删除 volume_, deleted, deleteErr = ctrl.doDeleteVolume(volume)if deleteErr == nil && deleted {// Delete succeededglog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)break}if !deleted {// This is unreachable code, the volume was provisioned by an// internal plugin and therefore there MUST be an internal// plugin that deletes it.glog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())break}// Delete failed, try again after a while.glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)time.Sleep(ctrl.createProvisionedPVInterval)}if deleteErr != nil {// Delete failed several times. There is an orphaned volume and there// is nothing we can do about it.strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)glog.V(2).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)}// err == nil,表示provision创建成功,这是抛出event并退出} else {glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)}return pluginName, nil
}
总结
PV controller的作用是对PV和PVC资源生命周期管理: 首先,PV控制器负责将PVC与合适的PV进行绑定。它会根据PVC的请求和PV的可用性来匹配合适的存储资源。同时PV控制器管理PV的状态,确保PV在被使用时不会被其他PVC绑定,同时也会处理PV的回收和删除。其次,如果PVC配置了StorageClass,PV控制器可以动态地为PVC创建新的PV。
本文通过源码学习了控制器的原理是如何实现。
参考文献
-
kube-controller-manager源码分析-PV controller分析
-
从零开始入门 K8s | Kubernetes 存储架构及插件使用
-
kubernetes pv-controller 解析
相关文章:
kubernetes存储架构之PV controller源码解读
kubernetes存储之PV controller源码解读 摘要 本文介绍kubernetes存储架构的基础,并重点对PV controller的源码进行了学习 引入 从一个业务场景出发,假如你所在的公司,有个物理机,上面部署了web服务器,随着业务…...
Kubernates
kubernates是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效(powerful),Kubernetes提供了应用部署,规划,更新,维护的一种机制。 架构…...
1.使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务
在使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务时,可以按照以下步骤实现,同时需要注意关键点。 实现方案 1. 数据层设计(Couchbase 增量存储与标记) 在 Couchb…...
设计模式的主要分类是什么?请简要介绍每个分类的特点。
大家好,我是锋哥。今天分享关于【设计模式的主要分类是什么?请简要介绍每个分类的特点。】面试题。希望对大家有帮助; 设计模式的主要分类是什么?请简要介绍每个分类的特点。 1000道 互联网大厂Java工程师 精选面试题-Java资源分…...
Webpack学习笔记(5)
1.拆分开发环境和生产环境配置 很多配置在开发环境和生产环境存在不一致的情况,比如开发环境没有必要设置缓存,生产环境需要设置公共路径等等。 2.公共路径 使用publicPath配置项,可以通过它指定应用程序中所有资源的基础路径。 webpack.…...
Unity 6 中的新增功能
Unity 6 是 Unity 的最新版本。 一、编辑器和工作流程 Unity 6 中引入的更改 在 Linux 上实现了将文件和资源从 Unity 拖放到外部应用程序的功能。将 Asset Manager for Unity 包添加到 Package Manager > Services > Content Management 部分中。此包允许用户轻松浏览…...
【前端 Uniapp】使用Vant打造Uniapp项目(避坑版)
一、基本介绍 Uniapp 是基于 Vue.js 的开发框架,通过一套代码可以同时发布到多个平台的应用框架。而 Vant 是针对移动端 Vue.js 的组件库。通过这样的组合,我们可以快速构建出一个跨平台的移动应用。Vant 已经支持多种小程序和 H5 平台,也对…...
JVM 详解
一. JVM 内存区域的划分 1. 程序计数器 程序计数器是JVM中一块比较小的空间, 它保存下一条要执行的指令的地址. [注]: 与CPU的程序计数器不同, 这里的下一条指令不是二进制的机器语言, 而是Java字节码. 2. 栈 保存方法中的局部变量, 方法的形参, 方法之间的调用关系. 栈又…...
代码加入SFTP JAVA ---(小白篇3)
在 Java 中,您可以使用 JSch(Java Secure Channel)库 来连接和操作 SFTP 服务器。以下是一个完整的示例代码,展示如何使用 Java 接入 SFTP 服务器并上传文件。 1.服务器上加入SFTP------(小白篇 1) 2.加入SFTP 用户------(小白篇…...
机器人C++开源库The Robotics Library (RL)使用手册(一)
强大的、完整的C机器人开源库 1、是否可以免费商用?2、支持什么平台?3、下载地址4、开始! 1、是否可以免费商用? Robotics Library(RL)是一个独立的C库,用于机器人运动学、运动规划和控制。它涵…...
Linux高级--2.6 网络面试问题
tcp 与 udp的区别 1.tcp 是基于连接的 UDP是基于数据包 2.处理并发的方式不通 a.tcp用epoll进行监听的 b. udp是模拟tcp的连接过程,服务端开放一个IP端口,收到连接后,服务端用另一个IP和端口发包给客户端。 3.tcp根据协议MTU黏包及…...
HTML 画布:创意与技术的融合
HTML 画布:创意与技术的融合 HTML 画布(<canvas>)元素是现代网页设计中的一个强大工具,它为开发者提供了一个空白画布,可以在上面通过JavaScript绘制图形、图像和动画。这种技术不仅为网页增添了视觉吸引力,还极大地丰富了用户的交互体验。本文将深入探讨HTML画布…...
python EEGPT报错:Cannot cast ufunc ‘clip‘ output from dtype(‘float64‘)
今天在运行EEGPT的时候遇见了下面的问题,首先是nme报错,然后引起了numpy的报错: numpy.core._exceptions._UFuncOutputCastingError: Cannot cast ufunc clip output from dtype(float64)在网上找了好久的教程,但是没有找到。猜测…...
揭秘区块链隐私黑科技:零知识证明如何改变未来
文章目录 1. 引言:什么是零知识证明?2. 零知识证明的核心概念与三大属性2.1 完备性(Completeness)2.2 可靠性(Soundness)2.3 零知识性(Zero-Knowledge) 3. 零知识证明的工作原理4. 零…...
mysql之MHA
MHA 1.概述 MHA(Master High Availability)是一种用于MySQL数据库的高可用性解决方案,旨在实现自动故障切换和最小化数据丢失。它由MHA Manager和MHA Node组成,适用于一主多从的架构。 是建立在主从复制基础之上的故障切换的软件…...
《XML》教案 第2章 使第4章 呈现XML文档
《XML》教案 第2章 使第4章 呈现XML文档 主讲人: 回顾上一章: [10分钟] 2 课程知识点讲解: 2 通过级联样式表转换XML文档:[15分钟] 3 通过可扩展样式表语言转换XML文档 :[5分钟] 4 嵌套 for 循环 :[20分钟] 5 本章总结…...
Centos7离线安装Docker脚本
1、文件结构 docker_install/ ├── docker-27.4.1.tgz ├── docker-compose-linux-x86_64 └── docker_install.sh 2、下载docker安装包 wget https://download.docker.com/linux/static/stable/x86_64/docker-27.4.1.tgz 3、下载docker-compose wget https://githu…...
Linux -- 互斥的底层实现
lock 和 unlock 的汇编伪代码如下: lock:movb $0,%alxchgb %al,mutexif(al 寄存器的内容>0)return 0;else挂起等待;goto lock;unlock:movb $1,mutex唤醒等待 mutex 的线程;return 0; 我们来理解以下上面的代码。 首先线程 1 申请锁&…...
hhdb客户端介绍(57)
技术选型 选择 MySQL 数据库的原因 开源免费: MySQL 作为一款开源数据库,不仅免费提供给用户,还具备强大的功能和灵活性,有效降低了企业的软件许可成本。 卓越的性能: 在处理大规模数据集和高并发访问时,…...
elasticsearch 杂记
8.17快速安装与使用 系统:ubuntu 24 下载地址: https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.17.0-linux-x86_64.tar.gz 解压后进入目录:cd ./elasticsearch-8.17.0 运行:./bin/elasticsearch 创…...
大功率LED基础学习笔记
大功率 LED 基础学习笔记 一、 LED发光原理 (1)传统白炽灯发光原理 大家从小听说爱迪生发明(改良)电灯,其白炽灯工作原理为:灯丝通电加热到白炽状态,利用热辐射发出可见光的电光源。由于发光…...
EdgeX Core Service 核心服务之 Core Command 命令
EdgeX Core Service 核心服务之 Core Command 命令 一、概述 Core-command(通常称为命令和控制微服务)可以代表以下角色向设备和传感器发出命令或动作: EdgeX Foundry中的其他微服务(例如,本地边缘分析或规则引擎微服务)EdgeX Foundry与同一系统上可能存在的其他应用程序…...
Debian12使用RKE2离线部署3master2node三主两从的k8s集群详细教程
一、前提步骤 1、在各个节点执行(所有 Server 和 Agent 节点) apt install apparmor -y apt install curl -y2、设置各节点Host文件(所有 Server 和 Agent 节点) 192.168.144.175 master01 192.168.144.167 master02 192.168.1…...
【Redis】配置序列化器
1. 配置FastJSON2 FastJSON2相比与FastJSON更安全,更推荐使用。 import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONReader; import com.alibaba.fastjson2.JSONWriter; import com.alibaba.fastjson2.filter.Filter;import org.springframew…...
数字IC后端设计实现十大精华主题分享
今天小编给大家分享下吾爱IC社区星球上周十大后端精华主题。 Q1:星主,请教个问题,长tree的时候发现这个scan的tree 的skew差不多400p,我高亮了整个tree的schematic,我在想是不是我在这一系列mux前边打断,设置ignore p…...
【Docker】部署MySQL容器
关于docker,Windows上使用Powershell/CMD执行指令,Linux系统直接使用终端执行指令。 拉取MySQL 也可以跳过拉取步骤,直接run,这样本地容器不存在的话,会自动拉取最新/指定的版本。 # 默认拉取最新版本 docker pull …...
go语言中的字符串详解
目录 字符串的基本特点 1.字符串的不可变性 2.其他基本特点 字符串基本操作 1. 创建字符串 2. 获取字符串长度 3. 字符串拼接 4. 遍历字符串 5. 字符串比较 字符串常用函数 1. 判断子串 2. 查找与索引 3. 字符串替换 4. 分割与连接 5. 修剪字符串 6. 大小写转换…...
数据中台从centos升级为国产操作系统后,资源增加字段时,提交报500错误
文章目录 背景一、步骤1.分析阶段2.查看nginx3.修改用户(也可以修改所有者权限) 背景 故障报错: nginx报错信息: 2024/12/19 15:25:31 [crit, 500299#0: *249 onen0 " /var/lib/nginx/tmp/cient body/0000000001" f…...
centos-stream9系统安装docker
如果之前安装过docker需要删除之前的。 for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo yum -y remove $pkg; done 安装yum-utils工具: dnf -y install yum-utils dnf-plugins-core 设置存储库&…...
计算机网络基础20道选择题,你能答对几题?
大家好,我是阿祥。今天给大家准备了一场关于计算机网络基础知识的小测验,包含20道精选的选择题。这不仅是对大家网络知识的一次检验,也是一次有趣的学习机会。 IPv4地址的长度是多少位?A. 16位 B. 32位 C. 64位 D. 128位答案&…...
分布式协同 - 分布式事务_2PC 3PC解决方案
文章目录 导图Pre2PC(Two-Phase Commit)协议准备阶段提交阶段情况 1:只要有一个事务参与者反馈未就绪(no ready),事务协调者就会回滚事务情况 2:当所有事务参与者均反馈就绪(ready&a…...
虚幻引擎结构之UWorld
Uworld -> Ulevel ->Actors -> AActor 在虚幻引擎中,UWorld 类扮演着至关重要的角色,它就像是游戏世界的总指挥。作为游戏世界的核心容器,UWorld 包含了构成游戏体验的众多元素,从游戏实体到关卡设计,再到物…...
牛客网刷题 ——C语言初阶——BC114 小乐乐排电梯
1.牛客网 :BC114 小乐乐排电梯 题目描述: 小乐乐学校教学楼的电梯前排了很多人,他的前面有n个人在等电梯。电梯每次可以乘坐12人,每次上下需要的时间为4分钟(上需要2分钟,下需要2分钟)。请帮助…...
43. Three.js案例-绘制100个立方体
43. Three.js案例-绘制100个立方体 实现效果 知识点 WebGLRenderer(WebGL渲染器) WebGLRenderer是Three.js中最常用的渲染器之一,用于将3D场景渲染到网页上。 构造器 WebGLRenderer(parameters : Object) 参数类型描述parametersObject…...
验证 Dijkstra 算法程序输出的奥秘
一、引言 Dijkstra 算法作为解决图中单源最短路径问题的经典算法,在网络路由、交通规划、资源分配等众多领域有着广泛应用。其通过不断选择距离源节点最近的未访问节点,逐步更新邻居节点的最短路径信息,以求得从源节点到其他所有节点的最短路径。在实际应用中,确保 Dijkst…...
springboot 3 websocket react 系统提示,选手实时数据更新监控
构建一个基于 Spring Boot 3 和 WebSocket 的实时数据监控系统,并在前端使用 React,可以实现选手实时数据的更新和展示功能。以下是该系统的核心设计和实现思路: 1. 系统架构 后端 (Spring Boot 3): 提供 WebSocket 服务端,处理…...
SpringBoot使用Validation校验参数
准备工作 引入相关依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> 约束性注解(简单)说明 AssertFalse可以为null,如果不为null的话必…...
AAAI-2024 | 大语言模型赋能导航决策!NavGPT:基于大模型显式推理的视觉语言导航
作者:Gengze Zhou, Yicong Hong, Qi Wu 单位:阿德莱德大学,澳大利亚国立大学 论文链接: NavGPT: Explicit Reasoning in Vision-and-Language Navigation with Large Language Models (https://ojs.aaai.org/index.p…...
Scala迭代更新
在Scala中,迭代器(Iterator)是一种用于遍历集合(如数组、列表、集合等)的元素而不暴露其底层表示的对象。迭代器提供了一种统一的方法来访问集合中的元素,而无需关心集合的具体实现。 在Scala中,…...
算法练习——位运算
前言:位运算的方法大多比较抽象,很难想到。 一:判断字符是否唯一 题目要求: 解题思路: 法一:使用hash的思想,统计每一个字母出现的次数,再通过一次循环遍历查询是否有超过1的字母&…...
“文件夹管理”与“标签管理”如何合理使用
在现代信息化的工作与生活环境中,文件夹管理与标签管理是两种常见的信息组织方法。合理使用文件夹与标签管理、提高信息检索效率、优化工作流程是实现高效信息管理的关键。其中,提高信息检索效率尤为重要,因为在海量的数据和文件中࿰…...
【学习总结|DAY023】Java高级技术
大家好,今天我们来聊聊 Java 中的几个高级技术:单元测试、反射、注解和动态代理。这些技术在源码、框架和架构师层面发挥着重要作用,掌握它们能让我们更深入地理解 Java 的底层原理,并提升代码质量和开发效率。 单元测试…...
java 对mongodb操作封装工具类
在 Java 中,封装 MongoDB 操作的工具类是非常常见的做法。使用 MongoDB 官方的 Java 驱动程序,结合常用的工具类封装,可以使得与 MongoDB 的交互更加方便和清晰。下面是一个简单的 MongoDB 操作封装工具类的示例代码。 前提 首先࿰…...
Spark-Streaming集成Kafka
Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们…...
最大似然检测在通信解调中的应用
最大似然检测(Maximum Likelihood Detection,MLD),也称为最大似然序列估计(Maximum Likelihood Sequence Estimation,MLSE),是一种在通信系统中广泛应用的解调方法。其核心思想是在给…...
Bert各种变体——RoBERTA/ALBERT/DistillBert
RoBERTa 会重复一个语句10次,然后每次都mask不同的15%token。丢弃了NSP任务,论文指出NSP任务有时甚至会损害性能。使用了BPE ALBERT 1. 跨层参数共享 可以共享多头注意力层的参数,或者前馈网络层的参数,或者全部共享。 实验结果…...
力扣周赛T2-执行操作后不同元素的最大数量
给你一个整数数组 nums 和一个整数 k。 你可以对数组中的每个元素 最多 执行 一次 以下操作: 将一个在范围 [-k, k] 内的整数加到该元素上。 返回执行这些操作后,nums 中可能拥有的不同元素的 最大 数量。 示例 1: 输入: nums [1…...
uniapp 3分钟集成轮播广告图
先上效果图 顶部广告栏为 移动app常见需求,今天主要演示如何快速实现.这里还是基于 《星云erp-移动版》演示版 (自行下载 导入 Hbuilder, 后端接口可以直接使用我演示接口,不需要修改) 第一步: 组件选择 我们直接使用uni-swipe…...
图像修复和编辑大一统 | 腾讯北大等联合提出BrushEdit:BrushNet进阶版来了
文章链接:https://arxiv.org/pdf/2412.10316 项目链接:https://liyaowei-stu.github.io/project/BrushEdit 亮点直击 提出了BrushEdit,这是先前BrushNet模型的高级迭代版本。BrushEdit通过开创基于修复(inpainting)的图…...
极狐GitLab 17.7正式发布,可从 GitLab 丝滑迁移至极狐GitLab【二】
GitLab 是一个全球知名的一体化 DevOps 平台,很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版,专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料: 极狐GitLab 官网极狐…...