已经分析过了deployment controller, replicaset controller的具体实现逻辑,最近抽时间看了看statefulset controller的内部实现机制,目标是将kube-controller-manager的主要资源控制器的实现机制都深入的了解下。

StatefulSet Controller逻辑分析

startStatefulSetController

statefulset控制器的主要实现逻辑如下图所示:


statefulset

接下来会针对上面的时序图对statefulset controller的具体实现逻辑,进行详细的分析:

statefulset controller的启动方式和其它的控制器的启动方式一样,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
return nil, false, nil
}
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop)
return nil, true, nil
}

NewStatefulSetController主要是对StatefulSetController结构进行初始化, 该结构的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// StatefulSetController controls statefulsets.
type StatefulSetController struct {
// client interface
kubeClient clientset.Interface
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podControl is used for patching pods.
podControl controller.PodControlInterface
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
podListerSynced cache.InformerSynced
// setLister is able to list/get stateful sets from a shared informer's store
setLister appslisters.StatefulSetLister
// setListerSynced returns true if the stateful set shared informer has synced at least once
setListerSynced cache.InformerSynced
// pvcListerSynced returns true if the pvc shared informer has synced at least once
pvcListerSynced cache.InformerSynced
// revListerSynced returns true if the rev shared informer has synced at least once
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
}

defaultStatefulSetControl需要实现StatefulSetControlInterface这个接口的所有方法:

1
2
3
4
5
type StatefulSetControlInterface interface {
UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
}
  • UpdateStatefulSet用于实现statefulset controller的核心逻辑,比如: 扩容pod,删除pod,基于相应的更新策略更新pod等操作,也是实现statefulset controller最核心的方法.
  • ListRevisions获取每个statefulset关联的controller revisions,statefulset通过controller revsion来对指定版本的服务进行回滚操作.并通过revisionHistoryLimit参数来控制需要保留的revision的数量(默认是10).
  • AdoptOrphanRevisions用于statefulset去领养匹配的controller revision,通过ownerReference的方式进行领养.

RealPodControl需要实现PodControlInterface这个接口的所有方法,这个接口内部的各个方法主要是对Pod进行创建,删除,更新操作。

1
2
3
4
5
6
7
type PodControlInterface interface {
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
DeletePod(namespace string, podID string, object runtime.Object) error
PatchPod(namespace, name string, data []byte) error
}

而在statefulset controller中会去list/watch statefulset和pod这两种类型的资源对象,当这些资源对象有状态变化(ADD/DELETE/UPDATE)时,会被添加到queue中,被后续的syncHander去处理。

Run

StatefulSetController结构初始化完成之后,通过Run方法去启动statefulset控制器。Run方法代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()

klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")

if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}

<-stopCh
}

首先会通过WaitForNamedCacheSync去等待Pod和StatefulSet资源对象是否已经同步到Pod和StatefulSet Infomer中,如果已经同步过了之后,则会并发的启动指定数量的worker去从queue对statefulset进行处理(默认的并发数量是5, queue里面存储的是statefulset的key, worker会通过该key去cache中获取需要被处理的statefulset对象)。

worker&&processNextWorkItem

在worker方法中会去调用processNextWorkItem方法,然后在processNextWorkItem方法中,会去queue中串行的获取statefulset key,然后调用sync方法对获取的key进行处理,如果处理成功,则将该key从queue中forget掉,如果处理失败了,则将该key重新入队到queue中,等待下个syncloop去处理。

processNextWorkItem的代码实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ssc *StatefulSetController) processNextWorkItem() bool {
key, quit := ssc.queue.Get()
if quit {
return false
}
defer ssc.queue.Done(key)
if err := ssc.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
ssc.queue.AddRateLimited(key)
} else {
ssc.queue.Forget(key)
}
return true
}

sync

到了sync也就是对具体的某一个具体的statefulset进行逻辑任务处理了。在sync方法中主要做了下面几件事儿:

  1. 调用SplitMetaNamespaceKey将statefulset key切分成namespace和name。(statefulset所在的namespace和statefulset的name).
  2. 通过statefulset namespace和name从statefulset controller的本地cache中获取具体的statefulset资源对象.
  3. 调用adoptOrphanRevisions方法对孤儿的controller revision进行领养。和deployment,replicaset,pod之间的领养关系一样,都是通过OwnerReference的方式进行领养。
  4. 调用getPodsForStatefulSet通过statefulset和selector来获取属于该statefulset下面所有的pod.
  5. 调用syncStatefulSet来维护statefulset和pods之间的关系。(这部分也就是statefulset controller的核心逻辑了)。

关于sync方式的代码实现逻辑如下,除了syncStatefulSet核心方法接下来进行分析,其它的方法就不详细说明了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (ssc *StatefulSetController) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}

selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}

if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}

pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}

return ssc.syncStatefulSet(set, pods)
}

syncStatefulSet

syncStatefulSet方法是statefulset控制器的核心方法,该方法主要做了下面几件事儿:

  1. 调用ListRevisions方法,获取属于该statefulset的所有controller revsion.然后通过SortControllerRevisions对controller revision基于创建的时间进行排序。
  2. 调用getStatefulSetRevisions方法获取statefulset的当前controller revison和update controller revision.如果是第一次创建statefulset则current和update controller revision相等。
  3. 调用updateStatefulSet方法来对statefulset进行pod的增加,删除,更新操作。
  4. 调用updateStatefulSetStatus方法来更新statefulset的状态信息。
  5. 调用truncateHistory方法来维护statefulset可以保留的controller revision数量,默认是10.

syncStatefulSet的代码实现逻辑如下:

1
2
3
4
5
6
7
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {

// list all revisions and sort them
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
}
history.SortControllerRevisions(revisions)

// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return err
}

// perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return err
}

// update the set's status
err = ssc.updateStatefulSetStatus(set, status)
if err != nil {
return err
}

// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

接下来主要分析下updateStatefulSet方法的逻辑实现,UpdateStatefulSet中涉及的其它方法这里就不详细的介绍了。

updateStatefulSet

updateStatefulSet方法是statefulset的核心逻辑,该方法主要做了下面几件事儿:

1.首先通过currentRevsion和updateRevision去获取对应的statefulset,并更新statefulset的status。
2.将getPodsForStatefulSet方法中获取statefulset下的pods进行切分成replicas和condemned两个slices。0 <= ord(pod的序列号) < replicaCount的pod存放在replicas,而将ord(pod的序列号) >= replicaCount的pod存放在condemned中,用于待删除处理。

这部分的代码实现逻辑如下:

1
2
3
4
5
6
7
8
for i := range pods {
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
replicas[ord] = pods[i]

} else if ord >= replicaCount {
condemned = append(condemned, pods[i])
}
}

3.如果replicas中存在某些ord(pod序列号)没有对应的Pod,则创建对应revision的Pod(通过newVersionedStatefulSetPod),后面会检测到该Pod没有真实创建就会去创建对应的Pod实例。

这部分的代码实现逻辑如下:

1
2
3
4
5
6
7
8
9
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}

4.从repilcas和condemned pods两个slices中找出第一个unhealthy的Pod(ord最小的unhealth pod)。
5.对于正在删除(DeletionTimestamp != nil)的statefulset,不做任何操作,直接返回当前status。
6.遍历replicas中pods,保证replicas中index在[0,spec.replicas)的pod都是Running And Ready的, 具体的实现是:
6.1 检查pod的状态是否处于Failed,如果是,则删除pod,更新status的状态,并创建新的pod对象
6.2 如果pod没有被创建,则直接调用CreateStatefulPod去创建该pod,并更新statefulset的status. 并且当monotonic为fasle(就是非并行的创建pod),则直接返回statefulset的status状态。
6.3 如果pod处于删除状态(DeletionTimestamp != nil)并且monotoni设置为falase,则直接返回status,等待这个pod被删除之后,在继续做后续的处理工作。
6.4 如果pod被创建,但是pod还没有达到Running and Ready的状态,则直接返回status,不继续做后续的处理工作。
6.5 检查该pod与statefulset的identity和storage是否匹配,如果有一个不匹配,则调用apiserver Update Stateful Pod进行updateIdentity和updateStorage(并创建对应的PVC),返回status,结束

这部分代码的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
for i := range replicas {
// delete and recreate failed pods
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}

// if the set does not allow bursting, return immediately
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// Enforce the StatefulSet invariants
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}
// Make a deep copy so we don't mutate the shared cache
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
return &status, err
}
}

7.condemned中存储的pod是需要待删除的pod,对于这部分pod的删除,通常是从ord高序号开始进行pod的删除动作。具体的实现是:
7.1 如果condemned[target]位置的pod处于正在删除的状态(DeletionTimestamp != nil)并且monotonic为false(意味着串行的处理pod),则直接返回statefulset的status.
7.2 如果condemned[target]位置的pod的运行状态处于非Runing and Ready状态,并且monotonic为false同时还不是第一个不健康的pod,则等待该pod的状态处于Running and Ready的状态时,才允许删除,否则直接返回statefulset的status.
7.3 如果上面的条件都通过了,则直接调用DeleteStatefulPod来对pod进行删除操作处理。并更新statefulset的status.

这部分的具体代码逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
for target := len(condemned) - 1; target >= 0; target-- {
// wait for terminating pods to expire
if isTerminating(condemned[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
set.Namespace,
set.Name,
condemned[target].Name)
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
set.Namespace,
set.Name,
condemned[target].Name)

if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return &status, nil
}
}

8.对statefulset的更新策略(UpdateStrategy.Type)进行检查,判断是否是OnDelete的策略,如果是则直接返回statefulset的status. 在这里需要强调下的是,如果statefulset是OnDelete策略,则需要用户手动去删除pod之后,statefulset controller才会去启动新版本的pod实例。
9.对statefulset的更新策略(UpdateStrategy.Type)进行检查,如果是RollingUpdate策略,并且设置了Partition参数,则会计算出一个最小需要更新的updateMin值。这里也需要强调下,如果statefulset设置的更新策略是RollingUpdate,并且设置了Partition,则statefulset在更新时,会去更新ord(pod序列号) > Partition的pod,如果设置的Partition大于statefulset replicas则不会进行新版本pod的更新操作。

这部分逻辑的代码实现:

1
2
3
4
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}

10.对[updateMin,len(replicas)]范围内的pod已index逆序的方式去做更新操作。
10.1 首先去检查replicas[target]的revision是否等于updateRevsion,如果不是并且replicas[target]未处于删除的状态,则调用DeleteStatefulPod方法对该pod进行删除处理,并更新statefulset的status,并返回status.
10.2 检查replicas[target]的pod是否处于Running and Ready状态,如果不是,则直接返回statefulset的status.等待该pod处于Running and Ready状态。

这部分逻辑代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for target := len(replicas) - 1; target >= updateMin; target-- {

// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
status.CurrentReplicas--
return &status, err
}

// wait for unhealthy Pods on update
if !isHealthy(replicas[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to update",
set.Namespace,
set.Name,
replicas[target].Name)
return &status, nil
}

}

通过上面的描述分析了statefulset controller的整体逻辑,在此做下记录:)

Fri Dec 27 10:55:31 CST 2019