kube-proxy当前支持三种方式实现负载均衡,分别是: userspace, Iptables, IPVS. 但前两者随着Service的数量增长,存在性能的瓶颈,在生产环境是不能接受的。所以本篇文章主要对IPVS模式进行源码分析。

代码版本: release-1.15

kube-proxy 整体逻辑结构


kube-proxy

这张时序图描述了kube-proxy的整体逻辑结构,由于kub-proxy组件和其它的kube-* 组件一样都是使用pflagcobra库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
command := &cobra.Command{
Use: "echo [string to echo]",
Short: "Echo anything to the screen",
Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("Print: " + strings.Join(args, " "))
},
}

command.Execute()
}

上面这段代码就是使用cobra包的一个最简单的例子,首先初始化Command结构,其中该结构中的Run就是最终要执行的真正逻辑。当初始化完成Command之后,通过commnad.Execute去启动应用程序。

现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对Commnad结构中的Run进行核心逻辑实现。也就是说kube-proxy核心逻辑入口就是从这里开始(Command.Run)。😋

Command.Run中主要做了如下几件事,看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
defer close(o.errCh)

//....
proxyServer, err := NewProxyServer(o)
if err != nil {
return err
}

if o.CleanupAndExit {
return proxyServer.CleanupAndExit()
}

o.proxyServer = proxyServer
return o.runLoop()
}

1.对ProxyServer实例进行初始化。
2.如果在启动kube-proxy服务时,CleanupAndExit参数设置为true,则会将userspace, iptables, ipvs三种模式之前设置的所有规则清除掉,然后直接退出。
3.如果在启动kube-proxy服务时,CleanupAndExit参数设置为flase,则会调用runLoop来启动ProxyServer服务。

首先先来看看ProxyServer的结构定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type ProxyServer struct {
Client clientset.Interface
EventClient v1core.EventsGetter
IptInterface utiliptables.Interface
IpvsInterface utilipvs.Interface
IpsetInterface utilipset.Interface
execer exec.Interface
Proxier proxy.ProxyProvider
Broadcaster record.EventBroadcaster
Recorder record.EventRecorder
ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
Conntracker Conntracker // if nil, ignored
ProxyMode string
NodeRef *v1.ObjectReference
CleanupIPVS bool
MetricsBindAddress string
EnableProfiling bool
OOMScoreAdj *int32
ConfigSyncPeriod time.Duration
HealthzServer *healthcheck.HealthzServer
}

ProxyServer结构中:

包含了与kube-apiserver通信的Client

操作IptablesIptInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Interface interface {
// GetVersion returns the "X.Y.Z" version string for iptables.
GetVersion() (string, error)
// EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
EnsureChain(table Table, chain Chain) (bool, error)
// FlushChain clears the specified chain. If the chain did not exist, return error.
FlushChain(table Table, chain Chain) error
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
DeleteChain(table Table, chain Chain) error
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
EnsureRule(position RulePosition, table Table, chain Chain, args ...string) (bool, error)
// DeleteRule checks if the specified rule is present and, if so, deletes it.
DeleteRule(table Table, chain Chain, args ...string) error
// IsIpv6 returns true if this is managing ipv6 tables
IsIpv6() bool
// SaveInto calls `iptables-save` for table and stores result in a given buffer.
SaveInto(table Table, buffer *bytes.Buffer) error
// Restore runs `iptables-restore` passing data through []byte.
// table is the Table to restore
// data should be formatted like the output of SaveInto()
// flush sets the presence of the "--noflush" flag. see: FlushFlag
// counters sets the "--counters" flag. see: RestoreCountersFlag
Restore(table Table, data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// RestoreAll is the same as Restore except that no table is specified.
RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error
// AddReloadFunc adds a function to call on iptables reload
AddReloadFunc(reloadFunc func())
// Destroy cleans up resources used by the Interface
Destroy()
}

操作IPVSIpvsInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Interface interface {
// Flush clears all virtual servers in system. return occurred error immediately.
Flush() error
// AddVirtualServer creates the specified virtual server.
AddVirtualServer(*VirtualServer) error
// UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error.
UpdateVirtualServer(*VirtualServer) error
// DeleteVirtualServer deletes the specified virtual server. If the virtual server does not exist, return error.
DeleteVirtualServer(*VirtualServer) error
// Given a partial virtual server, GetVirtualServer will return the specified virtual server information in the system.
GetVirtualServer(*VirtualServer) (*VirtualServer, error)
// GetVirtualServers lists all virtual servers in the system.
GetVirtualServers() ([]*VirtualServer, error)
// AddRealServer creates the specified real server for the specified virtual server.
AddRealServer(*VirtualServer, *RealServer) error
// GetRealServers returns all real servers for the specified virtual server.
GetRealServers(*VirtualServer) ([]*RealServer, error)
// DeleteRealServer deletes the specified real server from the specified virtual server.
DeleteRealServer(*VirtualServer, *RealServer) error
// UpdateRealServer updates the specified real server from the specified virtual server.
UpdateRealServer(*VirtualServer, *RealServer) error
}

操作IpSetIpsetInterface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Interface is an injectable interface for running ipset commands.  Implementations must be goroutine-safe.
type Interface interface {
// FlushSet deletes all entries from a named set.
FlushSet(set string) error
// DestroySet deletes a named set.
DestroySet(set string) error
// DestroyAllSets deletes all sets.
DestroyAllSets() error
// CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true.
CreateSet(set *IPSet, ignoreExistErr bool) error
// AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true.
AddEntry(entry string, set *IPSet, ignoreExistErr bool) error
// DelEntry deletes one entry from the named set
DelEntry(entry string, set string) error
// Test test if an entry exists in the named set
TestEntry(entry string, set string) (bool, error)
// ListEntries lists all the entries from a named set
ListEntries(set string) ([]string, error)
// ListSets list all set names from kernel
ListSets() ([]string, error)
// GetVersion returns the "X.Y" version string for ipset.
GetVersion() (string, error)
}

以及通过ProxyMode参数获取基于userspace, iptables, ipvs三种方式中的哪种使用的Proxier:

1
2
3
4
5
6
7
8
9
10
11
12
// ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface {
config.EndpointsHandler
config.ServiceHandler

// Sync immediately synchronizes the ProxyProvider's current state to proxy rules.
Sync()
// SyncLoop runs periodic work.
// This is expected to run as a goroutine or as the main loop of the app.
// It does not return.
SyncLoop()
}

接下来重点介绍基于ipvs模式实现的Proxier, 在ipvs模式下Proxier结构的定义:

注意:由于Proxier定义的内容太多,我这里把接下来介绍的内容保留,其它不准备介绍的内容删除掉,省的占用太多的篇幅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Proxier struct {
endpointsChanges *proxy.EndpointChangeTracker
serviceChanges *proxy.ServiceChangeTracker

//...
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable

//...
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules

//...
iptables utiliptables.Interface
ipvs utilipvs.Interface
ipset utilipset.Interface
exec utilexec.Interface

//...
ipvsScheduler string
}

Proxier结构中,先介绍下async.BoundedFrequencyRunner,其它的在介绍ProxyServer.Run的时候介绍:

BoundedFrequencyRunner的定义结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs

run chan struct{} // try an async run

mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}

BoundedFrequencyRunner结构中的run会异步的去定期的执行任务fn,比如定期的执行proxier.syncProxyRules去创建或者更新VirtuaServerRealServer并将VirtualServer的VIP绑定到dummy interface(kube-ipvs0)。

下面是在NewProxier方法中初始化BoundedFrequencyRunner对象的示例:

1
2
proxier.syncRunner = async.NewBoundedFrequencyRunner(
"sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

其中:

  • minSyncPeriod: 规则最小的更新时间
  • syncPeriod: 规则最大更新时间
  • proxier.syncProxyRules: 同步规则的实现函数(也是kube-proxy基于ipvs同步规则的核心实现)

接下来介绍下ProxyServer.Run的逻辑实现部分:

ProxyServer启动流程


kube-proxy

上面这张图是ProxyServer启动的整个流程图。在启动过程中,主要做了下面这几件事儿:

1.启动健康检查服务HealthzServer.
2.启动暴露监控指标的MetricsServer.
3.如果需要调整系统的conntrack相关参数,则对系统的conntrack进行参数调整.
4.创建一个informerFactory实例,后面去通过informerFactory获取kubernetes的各类资源数据.
5.创建一个ServiceConfig实例,这个实例主要作用是实时的WATCH kubernetes Service资源的变化,并加入队列中,用于后续对变化的Service进行规则同步。
6.注册servier event handerProxier.
7.启动serviceConfig.

接下来详细的介绍下[4-7]这几步的流程:

ServiceConfig的结构定义如下:

1
2
3
4
type ServiceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
}

ServiceHandler的结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *v1.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *v1.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *v1.Service)
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}

创建ServiceConfig实例对象的具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
listerSynced: serviceInformer.Informer().HasSynced,
}

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
resyncPeriod,
)

return result
}
  • 首先通过执行serviceInformer.Informer().HasSynced来将kubernetes下的所有Service资源同步到缓存listerSynced中。
  • 其次为AddEventHandlerWithResyncPeriod添加针对Service对象,添加,更新,删除的事件触发函数。当Service有相应的触发动作,就会调用相应的函数:
1
2
3
handleAddService
handleUpdateService
handleDeleteService

我们看看handleAddService触发函数的实现逻辑,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
func (c *ServiceConfig) handleAddService(obj interface{}) {
service, ok := obj.(*v1.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
klog.V(4).Info("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
}

当watch到kubernetes集群中有新的Service被创建之后,会触发handleAddService函数,并在该函数中遍历eventHandlers分别去调用OnServiceAdd来对proxier结构中的serviceChanages进行更新并去同步相应的规则。

OnServiceAdd的具体实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}

// OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}

ServiceChangeTracker的结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {
// lock protects items.
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows proxier to inject customized information when processing service.
makeServiceInfo makeServicePortFunc
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool
recorder record.EventRecorder
}

serviceChanage的结构定义如下:

1
2
3
4
5
6
7
// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServiceMap
current ServiceMap
}

到这里在回过头来看上面的基于IPVS实现的Proxier的整体流程就完全通了,ProxyServer.Run函数在启动时,通过kubernetes LIST/WATCH机制去实时的感知kubernetes集群Service资源的变化,然后不断的在更新Proxier结构中的ServiceChanges,然后将变化的Service保存在ServiceChanges结构中的ServiceMap中,给后续的async.BoundedFrequencyRunner去执行同步规则函数syncProxyRules来使用。

8.endpointConfig的实现机制和serviceConfig的机制完全一样,这里就不在详细的介绍了。
9.上面做的所有预处理工作,会在informerFactory.Start这步生效。
10.birthCry的作用就是通过event的方式通知kubernetes, kube-proxy这边的所有准备工作都处理好了,我要启动了。

1
2
3
func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
}

11.最终通过SyncLoop启动kube-proxy服务,并立刻执行syncProxyRules先来一遍同步在说.之后便会通过异步的方式定期的去同步IPVS, Iptables, Ipset的规则通过syncProxyRules函数:

syncProxyRules函数是kube-proxy实现的核心。主体逻辑是遍历ServiceMap并遍历ServiceMap下的endpointsMap及创建的Service类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的IPVS规则。

syncProxyRules的函数实现定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func (proxier *Proxier) syncProxyRules() {
//.....

// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
//......

// Handle traffic that loops back to the originator with SNAT.
for _, e := range proxier.endpointsMap[svcName] {
//....
}

// Capture the clusterIP.
// ipset call
entry := &utilipset.Entry{
IP: svcInfo.ClusterIP().String(),
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
continue
}
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// ipvs call
serv := &utilipvs.VirtualServer{
Address: svcInfo.ClusterIP(),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
}
// Set session affinity flag and timeout for IPVS service
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}

// Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPStrings() {
//....
}

// Capture load-balancer ingress.
for _, ingress := range svcInfo.LoadBalancerIPStrings() {
//.....
}

if svcInfo.NodePort() != 0 {
//....
}
}

// sync ipset entries
for _, set := range proxier.ipsetList {
set.syncIPSetEntries()
}

// Tail call iptables rules for ipset, make sure only call iptables once
// in a single loop per ip set.
proxier.writeIptablesRules()

// Sync iptables rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())

}

总结

kube-proxy的代码逻辑还是比较简洁的,整体的思想就是kube-proxy服务去watch kubernetes集群的ServiceEndpoint对象,当这两个资源对象有状态变化时,会把它们保存在ServiceMapEndPonintMap中,然后会通过async.BoundedFrequencyRunner去异步的执行syncProxyRules去下发规则。