Kube-Controller-manager之Deployment Controller源码解析
kubernetes从1.2
版本增加了一个新的资源对象deployment
,并且deployment
资源对象是使用频率最高的一个资源对象之一。因此很有必要对deployment controller的机制有所了解。
deployment的主要功能:
- 保证指定的pod副本数健康运行。
- 支持暂停/恢复机制。
- 支持deployment回滚机制。
- 弹性伸缩。
- 滚动升级。
上篇文章已经介绍了kube-controller-manaer解析之启动流程。这篇文章开始介绍被kube-controller-manager组件控制并管理的Deployment控制器。
注意: kubernetes 1.9.0, commit Id: 925c127ec6b946659ad0fd596fa959be43f0cc05
Deployment Controller启动的流程
下面是Deployment Controller
启动流程的时序图,按照这个时序图介绍下Deployment Controller是如何启动的。
kube-controller-manager会在启动的时候把下面的所有被管理的控制器都启动。其中任何一个控制器启动失败,kube-controller-manager则不会正常的启动。
1 | func NewControllerInitializers() map[string]InitFunc { |
其中就包含我我们这篇文章介绍的Deployment Controlelr
控制器:
1 | controllers["deployment"] = startDeploymentController |
在startDeploymentController
方法中调用NewDeploymentController
方法对DeploymentController
对象进行初始化。
DeploymentController对象定义的结构如下:
1 | type DeploymentController struct { |
其中syncHandler
是deployment的核心逻辑,控制器watch到的所有Deployment资源对象都会被放到queue
工作队列中。然后在Run
的时候启动指定ConcurrentDeploymentSyncs
数量的goroutine从queue
消费去执行syncHandler
部分的核心逻辑。
好了,Deployment Controlelr
的启动流程现在没有问题了,下面我们就介绍下Deployment Controller
的核心逻辑:syncDeployment
。
Deployment Controller核心逻辑解析
下面这张时序图最主要是介绍Deployment Controller
的核心的逻辑的一个实现流程。(双击图片放大)
- syncDeployment: deployment控制器的核心逻辑入口。
- splitMetaNamespaceKey: 对deployment key(namespace/name 形式)进行切分,获取deployment所在的namespace及deplyment name。
- getDeployment: 通过上面
splitMetaNamespaceKey
获取的namespace
和name
从本地的cache中获取该Deployment资源对象(注意需要对从cache中获取的deployment进行深度拷贝. deployment.DeepCopy()否则修改的只是cache中的数据)。 - getReplicaSetsForDeployment:获取属于该deployment的所有replicaset。
- rsList: 获取deployment namespace下的所有replicaset。
- canAdoptFunc: 检查deployment是否被删除。如果在处理过程deployment被删除了,则直接返回,不需要进行下面操作。
- NewReplicaSetControllerRefManager: 创建
ReplicaSetControllerRefManager
对象并暴露出一些方法用于管理属于该Deployment的ReplciaSet.(如对ReplicaSet对象的更新操作等) - ClaimReplicaSets: 对属于该deploymeng的replicaset进行领养。
ReplicaSetControllerRefManager
对象的定义结构如下:
1
2
3
4
5
6
7
8
9type ReplicaSetControllerRefManager struct {
BaseControllerRefManager
controllerKind schema.GroupVersionKind
rsControl RSControlInterface
}
type RSControlInterface interface {
PatchReplicaSet(namespace, name string, data []byte) error
}
- getPodMapForDeployment: 获取该deployment下的所有Pods,并基于该deployment下的replicaset对属于该deployment下的所有Pod进行(group by)分组。
- DeletionTimestamp != nil: 判断该Deployment是否已经被删除,如果正在处理的deployment被删除了,则调用
syncStatusOnly
方法,对deployment的状态进行同步。
syncStatusOnly
方法的定义:
1 | // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. |
- getAllReplicaSetsAndSyncRevision: 获取该Deployment下的所有的ReplicaSet(包括最新的,和所有老的ReplicaSet)并更新最新的ReplicaSet和Deployment的Revision number。
- rsAndPodsWithHashKeySynced:对该deployment下的所有replicaset和pod增加
pod_template_label
.增加pod_template_label
的作用主要用于ReplicaSet
对属于它的Pod进行领养操作。 - FindOldReplicaSets: 获取该deployment下的所有的老的replicaset.
- getNewReplicaSet: 获取该deployment下的最新的replicaset.
- rsAndPodsWithHashKeySynced:对该deployment下的所有replicaset和pod增加
- syncDeploymentStatus: 同步deployment的状态信息。
getAllReplicaSetsAndSyncRevision
方法的定义:
1 | func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { |
- checkPausedConditions: 判断deployment是否pause,如果是则更新该deployment的状态。
- paused = true: 如果deployment处于暂停状态,调用
sync
方法,执行deployment状态的同步。下面的sync
中调用的一些方法:- getAllReplicaSetsAndSyncRevision: 解释同上。
- scale: 判断ReplicaSet是否应该扩缩容,如果是则根据deployment的MaxSure和MaxUnaviable进行扩缩容。
- cleanupDeployment: 依据deployment的
RevisionHistoryLimit
参数,对多余的ReplicaSet进行清理。 - syncDeploymentStatus: 同步deployment的状态信息。
sync
方法的定义:
1 | func (dc *DeploymentController) sync(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error { |
- rollbackTo: 判断是否需要对deployment进行回滚。如果是则进行回滚操作。
- getAllReplicaSetsAndSyncRevision: 解释同上。
- 根据
toRevision
来决定回滚到具体的哪一个版本。如果没有指定toRevision
则回滚到最新的一个ReplicaSet(就是最近的一次ReplicaSet版本)。如果指定了toRevision
版本,则回滚到指定的版本就ok了。
- isScalingEvent: 判断是不是只是执行规模调整
1 | scalingEvent, err := dc.isScalingEvent(d, rsList, podMap) |
isScalingEvent方法的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return false, err
}
allRSs := append(oldRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
if !ok {
continue
}
if desired != *(d.Spec.Replicas) {
return true, nil
}
}
return false, nil
}
如果是则执行`sync`对Deployment期望的副本数进行scale up/scale down操作。
根据Strategy类型,判断执行的更新操作是直接更新还是滚动更新,并分别执行对应操作Strategy
如果是Recreate操作,则执行
rolloutRecreate
方法:- getAllReplicaSetsAndSyncRevision: 解释同上。
- FilterActiveReplicaSets:获取副本数不为0的ReplicaSets.
- scaleDownOldReplicaSetsForRecreate: scale down old replica sets.
- scaleUpNewReplicaSetForRecreate: scale up new replica set.
- syncRolloutStatus: Sync deployment status.
如果是RollingUpdate操作,则指定
rolloutRolling
方法:- getAllReplicaSetsAndSyncRevision: 解释同上。
- reconcileNewReplicaSet: 扩容新的ReplicaSet.
- reconcileOldReplicaSets: 收容老的ReplicaSet.
- syncRolloutStatus: 缩容后设置状态并退出.
- cleanupDeployment: 依据deployment的RevisionHistoryLimit参数,对多余的ReplicaSet进行清理。
- syncRolloutStatus: Sync deployment status
rolloutRolling
方法定义:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
总结
通过上面对Deployment控制器的初步分析让我们了解了: Deployment控制器的启动过程。Deployment, ReplicaSet和Pod资源对象之间的关系。Deployment的暂停及恢复机制以及回滚和Deployment的历史版本记录之间的关系,弹性扩缩容,滚动升级。当我们在使用deployment过程中遇到相关的问题,可以进一步针对每一个功能点更进行更深入的分析。