最近在工作之余准备看看Kube-controller-manager比较核心的几个控制器,也是现在容器平台主要使用的几个,如:Deployment, ReplicaSet, Garbage Collection, ReousrceQuota, Node Controller等。在分析这些控制器之前,先介绍下kube-controller-manager的功能及它的启动流程。

注意: kubernetes 1.9.0, commit Id: 925c127ec6b946659ad0fd596fa959be43f0cc05

kube-controller-manager 介绍

controller-manager是 Kubernetes 的大脑,它通过 apiserver 监控整个集群的状态,并确保集群处于预期的工作状态。

kube-controller-manager 有一系列的控制器组成:

  • Replication Controller
  • Node Controller
  • CronJob Controller
  • Daemon Controller
  • Deployment Controller
  • Endpoint Controller
  • Garbage Collector
  • Job Controller
  • Pod AutoScaler
  • ReplicaSet
  • Service Controller
  • ServiceAccount Controller
  • StatefulSet Controller
  • Volume Controller
  • Resource quota Controller

除了上面的这些控制器,kube-controller-manager还为一些云厂商提供了一些特定的控制器,这里就不介绍了。

kube-controller-manager通过上面的这些资源控制器,来维护集群中各个资源在集群中处于”期望的状态”。本篇文章先介绍下kube-controller-manager的启动流程,之后会陆续的分析一些比较重要的控制器。

kube-controller-manager 启动流程

kube-controller-manager程序的入口在源码的/kubernetes/cmd/kube-controller-manager位置,内容如下:


除了控制器的入口之外,各个控制器的具体实现主要都在kubernetes/pkg/controller目录下,之后会详细的介绍该目录下各个控制器的实现机制。

下面这张图是kube-controller-manager启动流程的时序图:



根据上面的这张图,我们来分析下kube-contrller-manager是如何启动的。

  • NewCMServer() 主要对CMServer结构进行一些默认参数的初始化工作,并返回CMServer实例对象。
  • AddFlags() 主要为CMServer实例对象指定flags。
  • InitFlags() 对AddFlags指定的flags进行命令行参数解析。
  • InitLogs() 对kube-controller-manager服务进行日志初始化。
  • Run() 启动kube-controller-manager服务的入口。

接下来主要分析下Run()方法,下面是Run方法逻辑的主要流程图:


  • Validate() 用于controller-manager启动之前对各个控制器进行检验,来检查哪些控制器应该默认开启。
  • createClients() 用于创建kubeClient, leaderElectionClient及kubeconfig。kubeClient用于跟apiserver进行交互, leaderElectionClient用户多个kube-controller-manager服务实例进行选举,kubeconfig用于之后其它类型client的创建。
  • startHTTP() 启动http server,默认暴露的端口号:10252。用于controller-manager服务性能检测(如:/debug/profile)及暴露服务相关的metrics供promtheus用于监控。

LeaderElect用于判断controller-manager实例时单点,还是多点。

如果LeaderElectfalse,则说明controller-manager是单点启动的。则直接调用run方法来启动需要被启动的控制器即可。

如果LeaderElecttrue,则说明controller-manager是多点启动的,这时多个节点的服务需要进行选举,被选中的服务作为leader来对外提供服务。并回调run方法来启动需要被启动的控制器。

对于启动控制器的run方法,准备在后面介绍,先来介绍下controller-manager的高可用。如果需要部署多个controller-manager需要在每一个实例的配置参数中设置--leader-elect=true启动参数,用于告知controller-manager以HA的方式进行启动。哪个controller-manager想要进行工作,必须先要抢到锁,被选为leader才行。而没有抢到的controller-manager服务只能每隔一段时间进行尝试去获取锁。一旦现有的leader挂掉,就是被剩余的服务抢到锁。来继续提供服务。

分布式锁一般实现原理就是大家先去抢锁,抢到的人成为 leader ,然后 leader 会定期更新锁的状态,声明自己的活动状态,不让其他人把锁抢走。K8s 的资源锁也类似,抢到锁的节点会将自己的标记(目前是hostname)设为锁的持有者,其他人则需要通过对比锁的更新时间和持有者来判断自己是否能成为新的 leader ,而 leader 则可以通过更新 RenewTime 来确保持续保有该锁。

在controller-manager中会创建一个这样的资源锁:

1
2
3
4
5
6
7
8
9
10
11
rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
leaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}

之后这个rl资源锁会被用于controller-manager进行leader选举:

1
2
3
4
5
6
7
8
9
10
11
12
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})

RunOrDie的参数是leaderelection.LeaderElectionConfig让我们看下leaderelection.LeaderElectionConfig的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type LeaderElectionConfig struct {
// Lock is the resource that will be used for locking
Lock rl.Interface

// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
// last observed ack.
LeaseDuration time.Duration
// RenewDeadline is the duration that the acting master will retry
// refreshing leadership before giving up.
RenewDeadline time.Duration
// RetryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
RetryPeriod time.Duration

// Callbacks are callbacks that are triggered during certain lifecycle
// events of the LeaderElector
Callbacks LeaderCallbacks
}
1
2
3
4
5
6
7
8
9
10
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading
OnStartedLeading func(stop <-chan struct{})
// OnStoppedLeading is called when a LeaderElector client stops leading
OnStoppedLeading func()
// OnNewLeader is called when the client observes a leader that is
// not the previously observed leader. This includes the first observed
// leader when the client starts.
OnNewLeader func(identity string)
}

当controller-manager获取锁成为leader之后,会通过LeaderCallbacks回调结构中的OnStartedLeading调用run方法。当controller-manager不在是leader的时候会通过OnStoppedLeading调用指定的相关方法。

Ok,我们继续分析RunOrDie()方法,具体的定义如下:

1
2
3
4
5
6
7
8
9
// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
le.Run()
}
  • NewLeaderElector() 创建一个LeaderElector实例
  • Run() 通过LeaderElector实例调用Run进行选举(抢锁)。

Run() 方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Run starts the leader election loop
func (le *LeaderElector) Run() {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
le.acquire()
stop := make(chan struct{})
go le.config.Callbacks.OnStartedLeading(stop)
le.renew()
close(stop)
}
  • acquire() 抢锁操作,阻塞所有的资源。
  • Callbacks.OnStartedLeading(stop) 当抢到锁之后,执行controller-manager的核心函数run()
  • renew() 抢到锁后,需要定期更新,确保自己一直持有该锁。

下面主要看下acquirerenew方法, run核心函数最后在介绍。

acquire()函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
func (le *LeaderElector) acquire() {
stop := make(chan struct{})
glog.Infof("attempting to acquire leader lease...")
wait.JitterUntil(func() {
succeeded := le.tryAcquireOrRenew()
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if !succeeded {
glog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc)
close(stop)
}, le.config.RetryPeriod, JitterFactor, true, stop)
}

通过wait.JitterUntil来周期性的去调用le.tryAcquireOrRenew方法来获取资源锁,直到获取为止。如果获取不到锁,则会以 RetryPeriod 为间隔不断尝试。如果获取到锁,就会关闭通道,通知 wait.JitterUntil 停止尝试。tryAcquireOrRenew是最核心的方法。在renew函数中也有使用到。

renew()函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails.
func (le *LeaderElector) renew() {
stop := make(chan struct{})
wait.Until(func() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
})
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
glog.V(4).Infof("successfully renewed lease %v", desc)
return
}
le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err)
close(stop)
}, 0, stop)
}

renew 只有在获取锁之后才会调用,它会通过持续更新资源锁的数据,来确保继续持有已获得的锁,保持自己的leader状态。

tryAcquireOrRenew()函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}

// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
glog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = time.Now()
return true
}

// 2. Record obtained, check the Identity & Time
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = time.Now()
}
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}

// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}

// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
glog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = time.Now()
return true
}

上面的这个函数的主要逻辑:

  • 获取ElectionRecord记录
    • 如果没有则创建一条新的ElectionRecord记录,获取锁,并返回true。
    • 如果记录已经存在,说明该controller-manager是leader,检查身份并更新锁的时间。
  • 更新资源锁对象。

当哪一个节点上的controller-manager选举成功之后,就会通过回调函数调用run方法。具体的函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}

clientBuilder = rootClientBuilder

ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}

ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)

select {}
}
  • CreateControllerContext() 创建一个Context实例用于controller-manager中的各个控制器(如:Client, 可以使用的资源等)
  • saTokenControllerInitFunc() 必须是第一个启动的控制器,用于给其他的控制器设置访问资源的权限。
  • StartControllers() 启动所有允许默认启动的控制器

controller-manager包含的所有控制器的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewControllerInitializers() map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["service"] = startServiceController
controllers["node"] = startNodeController
controllers["route"] = startRouteController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController

return controllers
}

StartControllers()函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
if _, err := startSATokenController(ctx); err != nil {
return err
}

for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}

started, err := initFn(ctx)
if err != nil {
glog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
glog.Warningf("Skipping %q", controllerName)
continue
}
glog.Infof("Started %q", controllerName)
}
return nil
}

通过StartControllers()方法将controller-manager默认允许启动的所有控制器启动了。

总结

kube-controller-manager启动的流程主要分为两种情况:

  • 一种是以单点的形式进行启动,然后直接调用run方法把默认需要开启的控制器进行启动。
  • 第二种是以多点的形式进行启动,然后多个controller-manager实例进行选举,成为leader的实例通过回调函数调用run方法来把默认需要开启的控制器启动。当leader挂掉之后,会重新进行选举,重新开启控制器。