kubernetes从1.2版本增加了一个新的资源对象deployment,并且deployment资源对象是使用频率最高的一个资源对象之一。因此很有必要对deployment controller的机制有所了解。

deployment的主要功能:

  1. 保证指定的pod副本数健康运行。
  2. 支持暂停/恢复机制。
  3. 支持deployment回滚机制。
  4. 弹性伸缩。
  5. 滚动升级。

上篇文章已经介绍了kube-controller-manaer解析之启动流程。这篇文章开始介绍被kube-controller-manager组件控制并管理的Deployment控制器。

注意: kubernetes 1.9.0, commit Id: 925c127ec6b946659ad0fd596fa959be43f0cc05

Deployment Controller启动的流程

下面是Deployment Controller启动流程的时序图,按照这个时序图介绍下Deployment Controller是如何启动的。


deployment-controller

kube-controller-manager会在启动的时候把下面的所有被管理的控制器都启动。其中任何一个控制器启动失败,kube-controller-manager则不会正常的启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["service"] = startServiceController
controllers["node"] = startNodeController
controllers["route"] = startRouteController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController

return controllers
}

其中就包含我我们这篇文章介绍的Deployment Controlelr控制器:

1
controllers["deployment"] = startDeploymentController

startDeploymentController方法中调用NewDeploymentController方法对DeploymentController对象进行初始化。

DeploymentController对象定义的结构如下:

1
2
3
4
5
6
7
8
9
10
11
type DeploymentController struct {
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
syncHandler func(dKey string) error enqueueDeployment func(deployment *extensions.Deployment)
dLister extensionslisters.DeploymentLister rsLister extensionslisters.ReplicaSetLister
podLister corelisters.PodLister dListerSynced cache.InformerSynced
rsListerSynced cache.InformerSynced
podListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}

其中syncHandler是deployment的核心逻辑,控制器watch到的所有Deployment资源对象都会被放到queue工作队列中。然后在Run的时候启动指定ConcurrentDeploymentSyncs数量的goroutine从queue消费去执行syncHandler部分的核心逻辑。

好了,Deployment Controlelr的启动流程现在没有问题了,下面我们就介绍下Deployment Controller的核心逻辑:syncDeployment

Deployment Controller核心逻辑解析

下面这张时序图最主要是介绍Deployment Controller的核心的逻辑的一个实现流程。(双击图片放大)


deployment-controller
  • syncDeployment: deployment控制器的核心逻辑入口。
  • splitMetaNamespaceKey: 对deployment key(namespace/name 形式)进行切分,获取deployment所在的namespace及deplyment name。
  • getDeployment: 通过上面splitMetaNamespaceKey获取的namespacename从本地的cache中获取该Deployment资源对象(注意需要对从cache中获取的deployment进行深度拷贝. deployment.DeepCopy()否则修改的只是cache中的数据)。
  • getReplicaSetsForDeployment:获取属于该deployment的所有replicaset。
    • rsList: 获取deployment namespace下的所有replicaset。
    • canAdoptFunc: 检查deployment是否被删除。如果在处理过程deployment被删除了,则直接返回,不需要进行下面操作。
    • NewReplicaSetControllerRefManager: 创建ReplicaSetControllerRefManager对象并暴露出一些方法用于管理属于该Deployment的ReplciaSet.(如对ReplicaSet对象的更新操作等)
    • ClaimReplicaSets: 对属于该deploymeng的replicaset进行领养。

ReplicaSetControllerRefManager对象的定义结构如下:

1
2
3
4
5
6
7
8
9
type ReplicaSetControllerRefManager struct {
BaseControllerRefManager
controllerKind schema.GroupVersionKind
rsControl RSControlInterface
}

type RSControlInterface interface {
PatchReplicaSet(namespace, name string, data []byte) error
}

  • getPodMapForDeployment: 获取该deployment下的所有Pods,并基于该deployment下的replicaset对属于该deployment下的所有Pod进行(group by)分组。
  • DeletionTimestamp != nil: 判断该Deployment是否已经被删除,如果正在处理的deployment被删除了,则调用syncStatusOnly方法,对deployment的状态进行同步。

syncStatusOnly方法的定义:

1
2
3
4
5
6
7
8
9
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
func (dc *DeploymentController) syncStatusOnly(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
  • getAllReplicaSetsAndSyncRevision: 获取该Deployment下的所有的ReplicaSet(包括最新的,和所有老的ReplicaSet)并更新最新的ReplicaSet和Deployment的Revision number。
    • rsAndPodsWithHashKeySynced:对该deployment下的所有replicaset和pod增加pod_template_label.增加pod_template_label的作用主要用于ReplicaSet对属于它的Pod进行领养操作。
    • FindOldReplicaSets: 获取该deployment下的所有的老的replicaset.
    • getNewReplicaSet: 获取该deployment下的最新的replicaset.
  • syncDeploymentStatus: 同步deployment的状态信息。

getAllReplicaSetsAndSyncRevision方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
rsList, err := dc.rsAndPodsWithHashKeySynced(d, rsList, podMap)
if err != nil {
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
}
_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
// Get new replica set with the updated revision number
newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted)
if err != nil {
return nil, nil, err
}
return newRS, allOldRSs, nil
}
  • checkPausedConditions: 判断deployment是否pause,如果是则更新该deployment的状态。
  • paused = true: 如果deployment处于暂停状态,调用sync方法,执行deployment状态的同步。下面的sync中调用的一些方法:
    • getAllReplicaSetsAndSyncRevision: 解释同上。
    • scale: 判断ReplicaSet是否应该扩缩容,如果是则根据deployment的MaxSure和MaxUnaviable进行扩缩容。
    • cleanupDeployment: 依据deployment的RevisionHistoryLimit参数,对多余的ReplicaSet进行清理。
    • syncDeploymentStatus: 同步deployment的状态信息。

sync方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (dc *DeploymentController) sync(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
if err := dc.scale(d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
// Clean up the deployment when it's paused and no rollback is in flight.
if d.Spec.Paused && d.Spec.RollbackTo == nil {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
  • rollbackTo: 判断是否需要对deployment进行回滚。如果是则进行回滚操作。
    • getAllReplicaSetsAndSyncRevision: 解释同上。
    • 根据toRevision来决定回滚到具体的哪一个版本。如果没有指定toRevision则回滚到最新的一个ReplicaSet(就是最近的一次ReplicaSet版本)。如果指定了toRevision版本,则回滚到指定的版本就ok了。
  • isScalingEvent: 判断是不是只是执行规模调整
1
2
3
4
5
6
7
scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList, podMap)
}
isScalingEvent方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return false, err
}
allRSs := append(oldRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
if !ok {
continue
}
if desired != *(d.Spec.Replicas) {
return true, nil
}
}
return false, nil
}
如果是则执行`sync`对Deployment期望的副本数进行scale up/scale down操作。
  • 根据Strategy类型,判断执行的更新操作是直接更新还是滚动更新,并分别执行对应操作Strategy

    • 如果是Recreate操作,则执行rolloutRecreate方法:

      • getAllReplicaSetsAndSyncRevision: 解释同上。
      • FilterActiveReplicaSets:获取副本数不为0的ReplicaSets.
      • scaleDownOldReplicaSetsForRecreate: scale down old replica sets.
      • scaleUpNewReplicaSetForRecreate: scale up new replica set.
      • syncRolloutStatus: Sync deployment status.
    • 如果是RollingUpdate操作,则指定rolloutRolling方法:

      • getAllReplicaSetsAndSyncRevision: 解释同上。
      • reconcileNewReplicaSet: 扩容新的ReplicaSet.
      • reconcileOldReplicaSets: 收容老的ReplicaSet.
      • syncRolloutStatus: 缩容后设置状态并退出.
      • cleanupDeployment: 依据deployment的RevisionHistoryLimit参数,对多余的ReplicaSet进行清理。
      • syncRolloutStatus: Sync deployment status

    rolloutRolling方法定义:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
        // rolloutRolling implements the logic for rolling a new replica set.
    func (dc *DeploymentController) rolloutRolling(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
    if err != nil {
    return err
    }
    allRSs := append(oldRSs, newRS)
    // Scale up, if we can.
    scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
    if err != nil {
    return err
    }
    if scaledUp {
    // Update DeploymentStatus
    return dc.syncRolloutStatus(allRSs, newRS, d)
    }
    // Scale down, if we can.
    scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
    if err != nil {
    return err
    }
    if scaledDown {
    // Update DeploymentStatus
    return dc.syncRolloutStatus(allRSs, newRS, d)
    }
    if deploymentutil.DeploymentComplete(d, &d.Status) {
    if err := dc.cleanupDeployment(oldRSs, d); err != nil {
    return err
    }
    }
    // Sync deployment status
    return dc.syncRolloutStatus(allRSs, newRS, d)
    }

总结

通过上面对Deployment控制器的初步分析让我们了解了: Deployment控制器的启动过程。Deployment, ReplicaSet和Pod资源对象之间的关系。Deployment的暂停及恢复机制以及回滚和Deployment的历史版本记录之间的关系,弹性扩缩容,滚动升级。当我们在使用deployment过程中遇到相关的问题,可以进一步针对每一个功能点更进行更深入的分析。