SharedInformerFactory为kubernetes中的所有资源(API group versions)提供了一个shared informer。所以controller中使用的所有Informer都是从SharedInformerFactory中通过GroupVersionResource得到。
// Interface provides access to all the informers in this group version. typeInterface interface { // Events returns a EventInformer. Events() EventInformer }
EventInformer的声明结构:
1 2 3 4 5 6
// EventInformer provides access to a shared informer and lister for // Events. typeEventInformer interface { Informer() cache.SharedIndexInformer Lister() v1beta1.EventLister }
informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool }
for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
// WaitForCacheSync waits for all started informers' cache were synced. func(f *sharedInformerFactory)WaitForCacheSync(stopCh <-chanstruct{})map[reflect.Type]bool { informers := func()map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }()
res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res }
// This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default // value). defaultEventHandlerResyncPeriod time.Duration // clock allows for testability clock clock.Clock
started, stopped bool startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex }
// Indexer is a storage interface that lets you list objects using multiple indexing functions type Indexer interface { Store // Retrieve list of objects that match on the named indexing function Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys returns the set of keys that match on the named indexing function. IndexKeys(indexName, indexKey string) ([]string, error) // ListIndexFuncValues returns the list of generated values of an Index func ListIndexFuncValues(indexName string) []string // ByIndex lists object that match on the named indexing function with the exact key ByIndex(indexName, indexKey string) ([]interface{}, error) // GetIndexer return the indexers GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
// Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() s.controller.Run(stopCh) }
// We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. itemsmap[string]Deltas queue []string
// populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the firstcall of Replace() initialPopulationCount int
// keyFunc is used tomake the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc
// knownObjects listkeys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter
// Indication the queue is closed. // Used to indicate a queue is closed soa control loop can exit when a queue isempty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex }
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // The other types are obvious. You'll get Sync deltas when: // * A watch expires/errors outand a new list/watch cycle is started. // * You've turned on periodic syncs. // (Anything that trigger's DeltaFIFO's Replace() method.) Sync DeltaType = "Sync" )
// Deltais the typestored by a DeltaFIFO. It tells you what change // happened, and the object's state after* that change. // // [*] Unless the change is a deletion, andthen you'll get the final // state of the object before it was deleted. typeDelta struct { TypeDeltaType Object interface{} }
// Deltas is a list of one or more 'Delta's to an individual object. // The oldest deltaisat index 0, the newest deltais the last one. typeDeltas []Delta
// Config contains all the settings for a Controller. type Config struct { // The queue for your objects - has to be a DeltaFIFO due to // assumptions in the implementation. Your Process() function // should accept the output of this Queue's Pop() method. Queue
// Something that can list and watch your objects. ListerWatcher
// Something that can process your objects. Process ProcessFunc
// The type of your objects. ObjectType runtime.Object
// Reprocess everything at least this often. // Note that if it takes longer for you to clear the queue than this // period, you will end up processing items in the order determined // by FIFO.Replace(). Currently, this is random. If this is a // problem, we can change that replacement policy to append new // things to the end of the queue instead of replacing the entire // queue. FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next // periodic sync should occur. If this returns true, it means the reflector should proceed with // the resync. ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object. // TODO: add interface to let you inject a delay/backoff or drop // the object completely if desired. Pass the object in // question to this interface as a parameter. RetryOnError bool }
// Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock
c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()
// Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. name string // metrics tracks basic metric information about the reflector metrics *reflectorMetrics
// The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. period time.Duration resyncPeriod time.Duration ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex }
// NewNamedReflector same as NewReflector, but with a specified name for logging funcNewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r := &Reflector{ name: name, // we need this to be unique per process (some names are still the same) but obvious who it belongs to metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))), listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, } return r }
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. func(r *Reflector)Run(stopCh <-chanstruct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
func(f *DeltaFIFO)Pop(process PopProcessFunc)(interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { forlen(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { returnnil, FIFOClosedError }
f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // The other types are obvious. You'll get Sync deltas when: // * A watch expires/errors out and a new list/watch cycle is started. // * You've turned on periodic syncs. // (Anything that trigger's DeltaFIFO's Replace() method.) Sync DeltaType = "Sync" )
type processorListener struct { nextCh chan interface{} addCh chan interface{}
handler ResourceEventHandler
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications // added until we OOM. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but // we should try to do something better. pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer requestedResyncPeriod time.Duration // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the // informer's overall resync check period. resyncPeriod time.Duration // nextResync is the earliest time the listener should get a full resync nextResync time.Time // resyncLock guards access to resyncPeriod and nextResync resyncLock sync.Mutex }
var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil })
// the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }