funcmain() { command := &cobra.Command{ Use: "echo [string to echo]", Short: "Echo anything to the screen", Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`, Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { fmt.Println("Print: " + strings.Join(args, " ")) }, }
type Interface interface { // GetVersion returns the "X.Y.Z" version string for iptables. GetVersion() (string, error) // EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true. EnsureChain(tableTable, chain Chain) (bool, error) // FlushChain clears the specified chain. If the chain did not exist, return error. FlushChain(tableTable, chain Chain) error // DeleteChain deletes the specified chain. If the chain did not exist, return error. DeleteChain(tableTable, chain Chain) error // EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true. EnsureRule(position RulePosition, tableTable, chain Chain, args ...string) (bool, error) // DeleteRule checks if the specified rule is present and, if so, deletes it. DeleteRule(tableTable, chain Chain, args ...string) error // IsIpv6 returns true if this is managing ipv6 tables IsIpv6() bool // SaveInto calls `iptables-save` for table and stores result in a given buffer. SaveInto(tableTable, buffer *bytes.Buffer) error // Restore runs `iptables-restore` passing data through []byte. // table is the Table to restore // data should be formatted like the output of SaveInto() // flush sets the presence of the "--noflush" flag. see: FlushFlag // counters sets the "--counters" flag. see: RestoreCountersFlag Restore(tableTable, data []byte, flush FlushFlag, counters RestoreCountersFlag) error // RestoreAll is the same as Restore except that no table is specified. RestoreAll(data []byte, flush FlushFlag, counters RestoreCountersFlag) error // AddReloadFunc adds a function to call on iptables reload AddReloadFunc(reloadFunc func()) // Destroy cleans up resources used by the Interface Destroy() }
typeInterface interface { // Flush clears all virtual servers in system. return occurred error immediately. Flush() error // AddVirtualServer creates the specified virtual server. AddVirtualServer(*VirtualServer) error // UpdateVirtualServer updates an already existing virtual server. If the virtual server does not exist, return error. UpdateVirtualServer(*VirtualServer) error // DeleteVirtualServer deletes the specified virtual server. If the virtual server does not exist, return error. DeleteVirtualServer(*VirtualServer) error // Given a partial virtual server, GetVirtualServer will return the specified virtual server information in the system. GetVirtualServer(*VirtualServer) (*VirtualServer, error) // GetVirtualServers lists all virtual servers in the system. GetVirtualServers() ([]*VirtualServer, error) // AddRealServer creates the specified real server for the specified virtual server. AddRealServer(*VirtualServer, *RealServer) error // GetRealServers returns all real servers for the specified virtual server. GetRealServers(*VirtualServer) ([]*RealServer, error) // DeleteRealServer deletes the specified real server from the specified virtual server. DeleteRealServer(*VirtualServer, *RealServer) error // UpdateRealServer updates the specified real server from the specified virtual server. UpdateRealServer(*VirtualServer, *RealServer) error }
// Interface is an injectable interface for running ipset commands. Implementations must be goroutine-safe. type Interface interface { // FlushSet deletes all entries from a named set. FlushSet(set string) error // DestroySet deletes a named set. DestroySet(set string) error // DestroyAllSets deletes all sets. DestroyAllSets() error // CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true. CreateSet(set *IPSet, ignoreExistErr bool) error // AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true. AddEntry(entry string, set *IPSet, ignoreExistErr bool) error // DelEntry deletes one entry from the named set DelEntry(entry string, set string) error // Test test if an entry exists in the named set TestEntry(entry string, set string) (bool, error) // ListEntries lists all the entries from a named set ListEntries(set string) ([]string, error) // ListSets list all set names from kernel ListSets() ([]string, error) // GetVersion returns the "X.Y" version string for ipset. GetVersion() (string, error) }
// ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { config.EndpointsHandler config.ServiceHandler
// Sync immediately synchronizes the ProxyProvider's current state to proxy rules. Sync() // SyncLoop runs periodic work. // This is expected to run as a goroutine or as the main loop of the app. // It does not return. SyncLoop() }
type BoundedFrequencyRunner struct { name string // the name of this instance minInterval time.Duration// the min time between runs, modulo bursts maxInterval time.Duration// the max time between runs
run chan struct{} // try an async run
mu sync.Mutex// guards runs of fn and all mutations fn func() // function to run lastRun time.Time// time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs }
typeServiceHandler interface { // OnServiceAdd is called whenever creation ofnew service object // is observed. OnServiceAdd(service *v1.Service) // OnServiceUpdate is called whenever modification of an existing // service object is observed. OnServiceUpdate(oldService, service *v1.Service) // OnServiceDelete is called whenever deletion of an existing service // object is observed. OnServiceDelete(service *v1.Service) // OnServiceSynced is called once all the initial even handlers were // called and the state is fully propagated to local cache. OnServiceSynced() }
创建ServiceConfig实例对象的具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funcNewServiceConfig(serviceInformercoreinformers.ServiceInformer, resyncPeriodtime.Duration) *ServiceConfig { result := &ServiceConfig{ listerSynced: serviceInformer.Informer().HasSynced, }
// OnServiceAdd is called whenever creation of new service object is observed. func(proxier *Proxier)OnServiceAdd(service *v1.Service) { proxier.OnServiceUpdate(nil, service) }
// OnServiceUpdate is called whenever modification of an existing service object is observed. func(proxier *Proxier)OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }
ServiceChangeTracker的结构定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
// ServiceChangeTracker carries state about uncommitted changestoan arbitrary number of // Services, keyed by their namespace and name. type ServiceChangeTracker struct { // lock protects items. lock sync.Mutex // items maps a service to its serviceChange. itemsmap[types.NamespacedName]*serviceChange // makeServiceInfo allows proxier to inject customized information when processing service. makeServiceInfo makeServicePortFunc // isIPv6Mode indicates ifchange tracker is under IPv6/IPv4 mode. Nil means not applicable. isIPv6Mode *bool recorder record.EventRecorder }
serviceChanage的结构定义如下:
1 2 3 4 5 6 7
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is statefrom before applying the changes, // current is state after applying all of the changes. type serviceChange struct { previous ServiceMap current ServiceMap }
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())