client-go package包含了许多的机制来方便开发者去开发自定义的controllers。之前基于该包实现过收集日志控制器log-controller,负载均衡控制器lb-controller以及收集k8s事件并写入相应的sink的eventrouter等,但是在使用的过程中对client-go内部的实现机制细节,一直比较模糊,所以抽时间分析下client-go的内部机制,在这里做下记录。

这张图描述了client-go包各个组件之间是如何结合工作的一个整体架构:


client-go

注意: 这张图分为两部分,黄色图标是开发者需要自行开发的部分,而其它的部分是client-go已经提供的,直接使用即可。

  1. 通过controller中的Reflector来实现监听,它通过Kubernetes的List/Watch机制将得到的事件(Object)写入到Store(Delta FIFO)中,后续会基于该Delta FIFO实现完全按事件发生的顺序进行分发处理。
  2. 由Reflector生产的事件最终由processor消费。processor通过POP队列(Delta FIFO)里的事件,更新本地的informer indexer缓存,同时将事件distribute给所有的listener。
  3. processer的listener由外部通过AddEventHandler注册,每个listener提供AddFunc, UpdateFunc, DeleteFunc方法。listener内部的实现加了一层缓存,用于存放pendingNotification。listener最终实现了事件的分发,事件最终被注册的handler处理。
  4. 注册的handler可以根据事件的类型ADD,UPDATE,DELETE,将该事件的key(格式: namespace/resource_name)Enqueue到client-go提供的Workqueue队列中。
  5. 开发者需要实现自己的controller syncHandler(就是自己的核心逻辑),从Workqueue中获取key,并通过这个key解析出namespace和resource_name去调用Lister从indexer中获取该key对应的相应的元数据进行后续的逻辑处理。

上面就是开发者想要写一个controller(或者有的人也叫operator)的一个整体的流程。

SharedInformerFactory

SharedInformerFactory为kubernetes中的所有资源(API group versions)提供了一个shared informer。所以controller中使用的所有Informer都是从SharedInformerFactory中通过GroupVersionResource得到。

SharedInformerFactory的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Events() events.Interface
Extensions() extensions.Interface
Networking() networking.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}

Events的声明结构:

1
2
3
4
5
// Interface provides access to all the informers in this group version.
type Interface interface {
// Events returns a EventInformer.
Events() EventInformer
}

EventInformer的声明结构:

1
2
3
4
5
6
// EventInformer provides access to a shared informer and lister for
// Events.
type EventInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1beta1.EventLister
}

这样如果我们想使用EventInformer,那么我们就直接在SharedInformerFactory中获取我们需要的Informer即可。只需要执行下面的两行代码:

1
2
3
sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval"))

eventsInformer := sharedInformers.Core().V1().Events()

Register Informer

已经获取了我们想要使用的EventInformer,接下来就需要将该Informer注册到factory(SharedInformerFactory),其实在调用eventsInformer.Informer()时,就已经做了Informer注册的工作,之后通过informerFactory.Start()将所有注册到factory的Informer都启动。

下面是注册EventInformer的一个实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func NewFilteredEventInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Events(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Events(namespace).Watch(options)
},
},
&corev1.Event{},
resyncPeriod,
indexers,
)
}

func (f *eventInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredEventInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *eventInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Event{}, f.defaultInformer)
}

sharedInformerFactory的声明结构, 在这里我们主要关注informersstartedInformers,其中informers主要的是存储,向该factory已经注册的Informer,而startedInformers主要记录哪些Informer已经启动了。

1
2
3
4
5
6
7
8
9
10
11
12
13
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration

informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}

下面这段代码的逻辑是判断informer是否已经向factory注册完成,如果没有则进行注册操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

Informer Run

最终informerFactory将注册到工厂的所有informer都启动,Informer主要的工作就是监听事件,并分发事件。

1
2
3
4
5
6
7
8
9
10
11
12
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

而WaitForCacheSync的作用主要是确认是否所有的Informer的都已经从kubernetes接收过事件,如果已经接收到事件,那么HasSynced会被设置为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()

res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}

sharedIndexInformer

informer作为异步事件处理框架,完成了事件监听和分发处理两个过程,sharedIndexInformer的声明结构,该结构就是代表了一个Informer实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type sharedIndexInformer struct {
indexer Indexer
controller Controller

processor *sharedProcessor
cacheMutationDetector CacheMutationDetector

// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object

// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock

started, stopped bool
startedLock sync.Mutex

// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}

首先来看indexer成员,该成员indexer是一个保存全量数据的缓存Store。 Informer对外提供的Lister就是直接从Store获取的数据,而没有直接操作etcd。

indexer的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
Store
// Retrieve list of objects that match on the named indexing function
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the set of keys that match on the named indexing function.
IndexKeys(indexName, indexKey string) ([]string, error)
// ListIndexFuncValues returns the list of generated values of an Index func
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

下面这个代码片段是启动一个Informer实例,需要做的处理逻辑。接下来,让我们分析下这部分处理逻辑的各个细节部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}

DeltaFIFO

首先创建一个DeltaFIFO实例对象,该实例对象的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond

// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int

// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc

// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}

在Informer中,DeltaFIFO作为Reflector的Store,根据List/Watch的结果对Store进行ADD,UPDATE,DELETE操作。在Delta的声明结构中,最重要的就是两个成员itermsqueue。iterms成员缓存了所有添加到Store中的事件,而queue则存储这些事件的id作为FIFO处理的先后顺序。而populatedinitialPopulationCount两个成员主要当Store被首次初始化完成之后,会被设置为true。

iterms的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

Controller

当初始化DeltaFIFO实例之后,就对controller的Config进行初始化操作,Config的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
Queue

// Something that can list and watch your objects.
ListerWatcher

// Something that can process your objects.
Process ProcessFunc

// The type of your objects.
ObjectType runtime.Object

// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration

// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
ShouldResync ShouldResyncFunc

// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}

对Config中的主要成员进行介绍下,方便之后介绍controller的时候方便了解。

  • Queue: DeltaFIFO事件处理队列,之后Queue的POP方法会从该队列中不断的POP数据给Process()方法去处理。
  • ListerWatcher: 用于List/Watch关心的kubernetes资源对象。
  • Process: 就是处理从DeltaFIFO中POP出来的数据,这个具体的实现后续会介绍到。

最终对Config初始化完成之后,赋值给sharedIndexInformer的controller成员。

Controller的Run主要是一个生产者消费者模式,reflector是生产者,为controller中的Process方法Process: s.HandleDeltas是消费者。而processLoop会循环的从Queue(DeltaFIFO)中POP事件数据给s.HandleDeltas去处理。

Controller Run的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
defer wg.Wait()

wg.StartWithChannel(stopCh, r.Run)

wait.Until(c.processLoop, time.Second, stopCh)
}

在上面的这段代码片段中,首先会先启动reflector来List/Watch我们所关心的资源,并将其添加到Store(DeltaFIFO)中。具体的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics

// The type of object we expect to place in the store.
expectedType reflect.Type
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
}

对Reflector对象的初始化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
name: name,
// we need this to be unique per process (some names are still the same) but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
return r
}

当Reflector初始化完成之后,则启动Reflector来让它去帮助你去List/Watch工作。具体的启动实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}

上面这段代码的核心就是通过r.ListAndWatch方法去List/Watch。由于r.ListAndWatch实现逻辑太长,就简单的说下它的实现逻辑,这个方法主要做两件事儿:

  1. list所有关心的资源对象,并将对象存储到Store中。
  2. watch所关心的资源对象, 并判断对象是否已经存在Store,如果存在则UPDATE,否则添加,或者删除。

ok,这样现在通过Reflector这个生产者,我们就把我们所关心的资源对象添加到Store(DeltaFIFO)中了。接下来通过我们的Process方法来从Queue中POP出事件数据,进行消费处理。

processLoop的处理逻辑代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

process会不断的从Queue中POP事件数据给c.config.Process消费。如果在消费的过程中出现错误的情况,则还会重新的把数据重新加回到queue队列中。

queue.POP方法的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

c.config.Process(就是HandleDeltas)方法的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

每当从Queue队列中POP出新的事件数据时,都会被上面的这个方法处理,首先会根据元素的处理类型,来决定具体的处理逻辑:

下面是每个事件元素能够被处理的时间类型:

1
2
3
4
5
6
7
8
9
10
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)

如果是Sync,ADD,Updated,DELETE则相应的处理逻辑如下:

1.如果是ADD类型,则直接将新添加的元素ADD到Store中,之后进行事件的分发distribute操作。
2.如果是UPDATED类型,则从Store中获取该元素,并更新Store中的元素,之后进行事件的分发distribute操作。
3.如果是DELETE类型,则直接从Store中删除元素,之后也是进行事件的分发distribute操作。

sharedProcessor

接下来我们在对sharedProcessor的事件分发处理进行详细的介绍。在消费事件时,通过informer的processer进行distrubute。processer进行分发的处理函数由外部的AddEventHandler向processer里addListener。其中addListener只是添加一个processer管理listeners,并在分发时遍历listeners,将事件发送给所有的listener。

sharedProcessor的声明结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()

if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

processor的run保证所有listener都开始运行,并保证退出时所有listener的chan都关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

processorListener

processorListener的声明结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}

handler ResourceEventHandler

// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing

// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}

pendingNotifications装了所有还没分发的事件。而handler则是开发者向Informer注册的ResourceEventHandler。

1
2
3
4
5
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

而ResourceEventHandler这个接口被ResourceEventHandlerFuncs结构已经实现了,开发者去实现下面的AddFunc, UpdateFunc, DeleteFunc并注册到Informer即可。

1
2
3
4
5
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}

接下来继续,其中处理事件processor开始distribute时,会调用listener的add方法,将事件发到addCh上。

1
2
3
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}

listener的pop goroutine不断地从addCh中获取事件,写到本地的pendingNotification或写给nextCh,而nextCh从本地pendingNotification或addCh获取事件。最后由run方法消费事件和分发事件。run方法支持指数重试,退出也会重新开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})

// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}

这样就对Store中的事件数据进行了分发,如果开发者想要实现自己的custom controller的话,可以在分发之后,将分发后的数据写入到client-go提供的Workqueue队列中,并在自己实现的syncHandler实现逻辑中不断的中Workqueue中去获取key,然后去实现自己的逻辑。

总结

上面详细的介绍了client-go的实现逻辑。深入的了解client-go的实现逻辑对于之后开发自定义controller也好,还是看kube-controller-manager源码也好(由于kube-controller-manager是一个controller的集合,用于管理kubernetes上的各类资源)。都会有一定的帮助。并且它的这种设计理念也是值得去学习的:)

更多有益的资源