type LeaderElectionConfig struct { // Lock is the resource that will be used for locking Lock rl.Interface
// LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of // last observed ack. LeaseDuration time.Duration // RenewDeadline is the duration that the acting master will retry // refreshing leadership before giving up. RenewDeadline time.Duration // RetryPeriod is the duration the LeaderElector clients should wait // between tries of actions. RetryPeriod time.Duration
// Callbacks are callbacks that are triggered during certain lifecycle // events of the LeaderElector Callbacks LeaderCallbacks }
1 2 3 4 5 6 7 8 9 10
type LeaderCallbacksstruct{ // OnStartedLeading is called when a LeaderElector client starts leading OnStartedLeadingfunc(stop <-chan struct{}) // OnStoppedLeading is called when a LeaderElector client stops leading OnStoppedLeadingfunc() // OnNewLeader is called when the client observes a leader that is // not the previously observed leader. This includes the first observed // leader when the client starts. OnNewLeaderfunc(identity string) }
// RunOrDie starts a client with the provided config or panics if the config // fails to validate. funcRunOrDie(lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } le.Run() }
NewLeaderElector() 创建一个LeaderElector实例
Run() 通过LeaderElector实例调用Run进行选举(抢锁)。
Run() 方法的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12
// Run starts the leader election loop func(le *LeaderElector)Run() { deferfunc() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() le.acquire() stop := make(chanstruct{}) go le.config.Callbacks.OnStartedLeading(stop) le.renew() close(stop) }
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. func (le *LeaderElector) tryAcquireOrRenew() bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, }
// 2. Record obtained, check the Identity & Time if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { le.observedRecord = *oldLeaderElectionRecord le.observedTime = time.Now() } if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() { glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false }
// 3. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 }
for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { glog.Warningf("%q is disabled", controllerName) continue }