prevent processing updates leading to sync when controller doing full sync at boot time (#400)

This commit is contained in:
Murali Reddy 2018-04-18 13:42:37 +05:30 committed by GitHub
parent 041c05570a
commit a1ecedf802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 19 deletions

View File

@ -49,6 +49,7 @@ type NetworkPolicyController struct {
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
readyForUpdates bool
// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
@ -137,7 +138,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
} else {
sendHeartBeat(healthChan, "NPC")
}
npc.readyForUpdates = true
select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
@ -152,6 +153,11 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
if !npc.readyForUpdates {
glog.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name)
return
}
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err)
@ -162,6 +168,12 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
netpol := obj.(*networking.NetworkPolicy)
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
if !npc.readyForUpdates {
glog.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
return
}
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err)
@ -196,7 +208,7 @@ func (npc *NetworkPolicyController) Sync() error {
if npc.MetricsEnabled {
controllerIptablesSyncTime.WithLabelValues().Set(float64(endTime))
}
glog.V(2).Infof("sync iptables took %v", endTime)
glog.V(1).Infof("sync iptables took %v", endTime)
}()
glog.V(1).Info("Starting periodic sync of iptables")
@ -1414,8 +1426,12 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand
},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnPodUpdate(newObj)
newPoObj := newObj.(*api.Pod)
oldPoObj := oldObj.(*api.Pod)
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
// for the network policies, we are only interested in pod status phase change or IP change
npc.OnPodUpdate(newObj)
}
},
DeleteFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)

View File

@ -1429,10 +1429,6 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
}
func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
if !nrc.bgpServerStarted {
return
}
svc, ok := obj.(*v1core.Service)
if !ok {
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
@ -1440,6 +1436,11 @@ func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
}
glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
if !nrc.bgpServerStarted {
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
return
}
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
if err != nil {
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
@ -1483,10 +1484,6 @@ func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
}
func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
if !nrc.bgpServerStarted {
return
}
ep, ok := obj.(*v1core.Endpoints)
if !ok {
glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints")
@ -1497,6 +1494,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
return
}
glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
if !nrc.bgpServerStarted {
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
return
}
svc, err := nrc.serviceForEndpoints(ep)
if err != nil {
glog.Errorf("failed to convert endpoints resource to service: %s", err)

View File

@ -169,6 +169,7 @@ type NetworkServicesController struct {
nodeportBindOnAllIp bool
MetricsEnabled bool
ln LinuxNetworking
readyForUpdates bool
svcLister cache.Indexer
epLister cache.Indexer
@ -246,7 +247,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat
} else {
sendHeartBeat(healthChan, "NSC")
}
nsc.readyForUpdates = true
select {
case <-stopCh:
glog.Info("Shutting down network services controller")
@ -350,9 +351,6 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM
// OnEndpointsUpdate handle change in endpoints update from the API server
func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()
ep, ok := obj.(*api.Endpoints)
if !ok {
glog.Error("could not convert endpoints update object to *v1.Endpoints")
@ -364,6 +362,12 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
}
glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
if !nsc.readyForUpdates {
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
return
}
nsc.mu.Lock()
defer nsc.mu.Unlock()
// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()
@ -381,9 +385,6 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
// OnServiceUpdate handle change in service update from the API server
func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()
svc, ok := obj.(*api.Service)
if !ok {
glog.Error("could not convert service update object to *v1.Service")
@ -391,6 +392,12 @@ func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
}
glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
if !nsc.readyForUpdates {
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
return
}
nsc.mu.Lock()
defer nsc.mu.Unlock()
// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()