本篇文章主要介绍kubelet服务如何启动及创建pod的流程,也对想要看kubelet源码的同学作为一个参考。

(kubelet版本: 1.7.4)

Kubelet介绍

在Kubernetes集群中,在每个Node节点上都会启动一个kubelet服务进程。该进程用于处理Master节点下发到本节点的任务,管理Pod及Pod中的容器。每个Kubelet进程会在APIServer上注册节点自身信息,定期向Master节点汇报节点资源的使用情况,并通过cAdvise监控容器和节点资源。

Kubelet功能

  • Pod管理
  • 容器的健康检测
  • 容器监控

Kubelet 代码结构


systemd
  • cmd/kubelet/kubelet.go kubelet服务的入口(main)。
  • cmd/kubelet/app/server.go 主要负责校验参数,创建和api-server交互的client及对运行kubelet权限检测,启动Kubelet等等。

除了入口,kubelet其它的主要功能实现在pkg/kubelet下。这里就不一一介绍了,在下面的时序图中,会标记pkg中用到了哪些文件,并主要实现了什么功能。

Kubelet服务启动流程


systemd

上面的时序图就是整个kubelet的启动流程。

  • validateConfig 主要对kubelet的NewKubeletServer结构体进行参数校验。
  • CreateAPIServerClientConfig 创建与控制节点api-server交互的client(kubeClient, eventClient)。
  • checkPermission 对运行kubelet进程的用户的权限验证(是否为root用户)。

RunKubelet中主要做CreateAndInitKubeletstartKubelet两件事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, crOptions *options.ContainerRuntimeOptions, standaloneMode bool, hostnameOverride, nodeIP, providerID string) (k kubelet.KubeletBootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, standaloneMode, hostnameOverride, nodeIP, providerID)
if err != nil {
return nil, err
}

k.BirthCry()

k.StartGarbageCollection()

return k, nil
}
  • NewMainKubelet 实例化一个kubelet对象,并对kubelet内部各个component进行初始化工作,如:

    • makePodSourceConfig pod元数据的来源(FILE, URL, api-server).
    • diskSpaceManager 磁盘空间的管理
    • secretManager secret资源的管理
    • configMapManager configMap资源的管理
    • InitNetworkPlugin 网络插件的初始化
    • PodCache Pod缓存的管理
    • PodManager 对pod的管理(如: 增删改等)
    • ContainerRuntime 容器运行时的选择(docker or rkt)
    • containerGC 容器的垃圾回收
    • imageManager 镜像的管路
    • statusManager pod状态的管理
    • probeManager 容器健康检测
    • gpuManager 对GPU的支持
  • BirthCry 通知api-server服务kubelet启动了
  • StartGarbageCollection 开启垃圾回收服务

当之前所有的预处理工作处理完成之后,准备启动我们的kubelet服务startKubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

// start the kubelet server
if kubeCfg.EnableServer {
go wait.Until(func() {
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}, 0, wait.NeverStop)
}
if kubeCfg.ReadOnlyPort > 0 {
go wait.Until(func() {
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, 0, wait.NeverStop)
}
}

startKubelet内的第一个goroutine负责启动kubelet,而后面则是创建一个kubelet http server。通过该server获取pod及node的相关信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}

if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}

// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start gorouting responsible for checking limits in resolv.conf
if kl.resolverConfig != "" {
go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
}

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()

// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}

上面的这段代码就是对上面介绍的各个component组件的启动,每个组件都是以goroutine运行的。这里就不细说了。创建pod的入口在syncLoop这里开始。

Pod的创建流程


systemd

syncLoop 是kubelet的主循环方法,它从不同的管道(FILE,URL, API-SERVER)监听到pod的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证Pod处于期望的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}

kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}

kl.syncLoopIteration这个方法会对多个管道进行遍历,如果有pod动作,则会调用相应的Handler。下面是对应的Interface

1
2
3
4
5
6
7
8
9
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}

我们以创建Pod为例,它会调用对应的HandlePodAdditionsHandler进行处理。HandlePodAdditions做的任务就是通过canAdmitPod方法校验Pod能否在该计算节点创建(如:磁盘空间)。之后把创建Pod的事交给dispatchWorkdispatchWork主要工作就是把接收到的参数封装成UpdatePodOptions,调用 UpdatePod 方法.

syncPod是创建Pod的核心逻辑。其中有几个主要的方法:

  • computePodContainerChanges 根据最新拿到的pod配置与当前运行的容器配置进行对比,计算其中的变化(一个具体的hash值),得到需要重启的容器的信息。
  • createPodSandBox 创建一个PodSandBox。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
//......
}

// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
//......
}

podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)
if err != nil {
//......
}

return podSandBoxID, "", nil
}
  • generatePodSandboxConfig 获取PodSandbox的配置(如:metadata,clusterDNS,容器的端口映射等)

  • RunPodSandbox 创建并开启一个Pod级别的sandbox。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) {
// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
podSandboxImage := ds.podSandboxImage
if len(podSandboxImage) != 0 {
image = podSandboxImage
}
if err := ensureSandboxImageExists(ds.client, image); err != nil {
return "", err
}

// Step 2: Create the sandbox container.
createConfig, err := ds.makeSandboxDockerConfig(config, image)
createResp, err := ds.client.CreateContainer(*createConfig)

// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return createResp.ID, err
}

// Step 4: Start the sandbox container.
err = ds.client.StartContainer(createResp.ID)

// Step 5: Setup networking for the sandbox.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)

return createResp.ID
}
  • ensureSandboxImageExists 检测用户是否设置了自己的pause镜像,如果没有设置则使用默认的gcr.io/google_containers/pause-amd64:3.0镜像。
  • makeSandboxDockerConfig 生成创建pause容器的配置信息。
  • CreateContainer 创建容器
  • StartContainer 启动容器
  • network.SetUpPod 设置容器的网络(kubelet加载cni插件对容器的网络进行设置等)

上面的这些操作就把我们Pod中的第一个pause容器创建并启动了。之后要做的就是把该Pod中的其它业务容器逐一的启动。但是在启动真正的业务容器之前,首先会检查用户是否设置了init_container。如果设置了,则会按init_container设置的顺序依次的执行init_container(注意:当其中的init_container执行失败了,则Pod会异常,并且业务容器不会被创建)。当init_container执行完成之后,我们真正的业务容器才会被逐一的启动。

业务容器启动的逻辑和Pod的初始化pause容器的启动的流程基本一致。下面的代码是循环的启动业务容器。

1
2
3
4
5
6
7
8
9
10
11
for idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]

//.....

glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
//.....
continue
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)

// Step 2: create the container.
containerConfig, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)

// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)

// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil)
return "PostStart Hook Failed", err
}
}

return "", nil
}
  • EnsureImageExists 检查业务镜像是否存在,不存在则到Docker Registry或是Private Registry拉取镜像。
  • generateContainerConfig 生成业务容器的配置信息
  • CreateContainer 通过client.CreateContainer调用docker engine-api创建业务容器。
  • StartContainer 启动业务容器
  • runner.Run 这个方法的主要作用就是在业务容器起来的时候,首先会执行一个container hook(PostStart和PreStop),做一些预处理工作。只有container kook执行成功才会运行具体的业务服务,否则容器异常。

这样Pod大体的启动流程就描述完了,但是对于kubelet中其它的中间服务,如: volumeManager,diskSpaceManager,secretManager,configMapManager等等就需要更深层的了解了。