// StatefulSetController controls statefulsets. type StatefulSetController struct { // client interface kubeClient clientset.Interface // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface // podControl is used for patching pods. podControl controller.PodControlInterface // podLister is able to list/get pods from a shared informer's store podLister corelisters.PodLister // podListerSynced returns true if the pod shared informer has synced at least once podListerSynced cache.InformerSynced // setLister is able to list/get stateful sets from a shared informer's store setLister appslisters.StatefulSetLister // setListerSynced returns true if the stateful set shared informer has synced at least once setListerSynced cache.InformerSynced // pvcListerSynced returns true if the pvc shared informer has synced at least once pvcListerSynced cache.InformerSynced // revListerSynced returns true if the rev shared informer has synced at least once revListerSynced cache.InformerSynced // StatefulSets that need to be synced. queue workqueue.RateLimitingInterface }
// list all revisions and sort them revisions, err := ssc.ListRevisions(set) iferr != nil { returnerr } history.SortControllerRevisions(revisions)
// get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) iferr != nil { returnerr }
// perform the main update function and get the status status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) iferr != nil { returnerr }
// update the set's status err = ssc.updateStatefulSetStatus(set, status) iferr != nil { returnerr }
// maintain the set's revision history limit returnssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) }
for i := range replicas { // delete and recreate failed pods if isFailed(replicas[i]) { ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", "StatefulSet %s/%s is recreating failed Pod %s", set.Namespace, set.Name, replicas[i].Name) iferr := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { return &status, err } if getPodRevision(replicas[i]) == currentRevision.Name { status.CurrentReplicas-- } if getPodRevision(replicas[i]) == updateRevision.Name { status.UpdatedReplicas-- } status.Replicas-- replicas[i] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, updateRevision.Name, i) } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { iferr := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil { return &status, err } status.Replicas++ if getPodRevision(replicas[i]) == currentRevision.Name { status.CurrentReplicas++ } if getPodRevision(replicas[i]) == updateRevision.Name { status.UpdatedReplicas++ }
// if the set does not allow bursting, return immediately if monotonic { return &status, nil } // pod created, no more work possible for this round continue } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to Terminate", set.Namespace, set.Name, replicas[i].Name) return &status, nil } // If we have a Pod that has been created but is not running and ready we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. if !isRunningAndReady(replicas[i]) && monotonic { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready", set.Namespace, set.Name, replicas[i].Name) return &status, nil } // Enforce the StatefulSet invariants if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { continue } // Make a deep copy so we don't mutate the shared cache replica := replicas[i].DeepCopy() iferr := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { return &status, err } }
7.condemned中存储的pod是需要待删除的pod,对于这部分pod的删除,通常是从ord高序号开始进行pod的删除动作。具体的实现是: 7.1 如果condemned[target]位置的pod处于正在删除的状态(DeletionTimestamp != nil)并且monotonic为false(意味着串行的处理pod),则直接返回statefulset的status. 7.2 如果condemned[target]位置的pod的运行状态处于非Runing and Ready状态,并且monotonic为false同时还不是第一个不健康的pod,则等待该pod的状态处于Running and Ready的状态时,才允许删除,否则直接返回statefulset的status. 7.3 如果上面的条件都通过了,则直接调用DeleteStatefulPod来对pod进行删除操作处理。并更新statefulset的status.
for target := len(condemned) - 1; target >= 0; target-- { // wait for terminating pods to expire if isTerminating(condemned[target]) { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down", set.Namespace, set.Name, condemned[target].Name) // block if we are in monotonic mode if monotonic { return &status, nil } continue } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block if!isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down", set.Namespace, set.Name, firstUnhealthyPod.Name) return &status, nil } klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down", set.Namespace, set.Name, condemned[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil { return &status, err } if getPodRevision(condemned[target]) == currentRevision.Name { status.CurrentReplicas-- } if getPodRevision(condemned[target]) == updateRevision.Name { status.UpdatedReplicas-- } if monotonic { return &status, nil } }
10.对[updateMin,len(replicas)]范围内的pod已index逆序的方式去做更新操作。 10.1 首先去检查replicas[target]的revision是否等于updateRevsion,如果不是并且replicas[target]未处于删除的状态,则调用DeleteStatefulPod方法对该pod进行删除处理,并更新statefulset的status,并返回status. 10.2 检查replicas[target]的pod是否处于Running and Ready状态,如果不是,则直接返回statefulset的status.等待该pod处于Running and Ready状态。
// delete the Pod if it isnot already terminating and does not match the update revision. if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update", set.Namespace, set.Name, replicas[target].Name) err := ssc.podControl.DeleteStatefulPod(set, replicas[target]) status.CurrentReplicas-- return &status, err }
// wait for unhealthy Pods onupdate if !isHealthy(replicas[target]) { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to update", set.Namespace, set.Name, replicas[target].Name) return &status, nil }