diff --git a/pkg/controllers/network_policy_controller.go b/pkg/controllers/network_policy_controller.go index e8375bed..b9cd62f4 100644 --- a/pkg/controllers/network_policy_controller.go +++ b/pkg/controllers/network_policy_controller.go @@ -49,6 +49,7 @@ type NetworkPolicyController struct { syncPeriod time.Duration MetricsEnabled bool v1NetworkPolicy bool + readyForUpdates bool // list of all active network policies expressed as networkPolicyInfo networkPoliciesInfo *[]networkPolicyInfo @@ -137,7 +138,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat, } else { sendHeartBeat(healthChan, "NPC") } - + npc.readyForUpdates = true select { case <-stopCh: glog.Infof("Shutting down network policies controller") @@ -152,6 +153,11 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { pod := obj.(*api.Pod) glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name) + if !npc.readyForUpdates { + glog.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name) + return + } + err := npc.Sync() if err != nil { glog.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err) @@ -162,6 +168,12 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) { func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) { netpol := obj.(*networking.NetworkPolicy) glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name) + + if !npc.readyForUpdates { + glog.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name) + return + } + err := npc.Sync() if err != nil { glog.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err) @@ -196,7 +208,7 @@ func (npc *NetworkPolicyController) Sync() error { if npc.MetricsEnabled { controllerIptablesSyncTime.WithLabelValues().Set(float64(endTime)) } - glog.V(2).Infof("sync iptables took %v", endTime) + glog.V(1).Infof("sync iptables took %v", endTime) }() glog.V(1).Info("Starting periodic sync of iptables") @@ -1414,8 +1426,12 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand }, UpdateFunc: func(oldObj, newObj interface{}) { - npc.OnPodUpdate(newObj) - + newPoObj := newObj.(*api.Pod) + oldPoObj := oldObj.(*api.Pod) + if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP { + // for the network policies, we are only interested in pod status phase change or IP change + npc.OnPodUpdate(newObj) + } }, DeleteFunc: func(obj interface{}) { npc.OnPodUpdate(obj) diff --git a/pkg/controllers/network_routes_controller.go b/pkg/controllers/network_routes_controller.go index a5b27665..24f6a33f 100644 --- a/pkg/controllers/network_routes_controller.go +++ b/pkg/controllers/network_routes_controller.go @@ -1429,10 +1429,6 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) { } func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) { - if !nrc.bgpServerStarted { - return - } - svc, ok := obj.(*v1core.Service) if !ok { glog.Errorf("cache indexer returned obj that is not type *v1.Service") @@ -1440,6 +1436,11 @@ func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) { } glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name) + if !nrc.bgpServerStarted { + glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name) + return + } + toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true) if err != nil { glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err) @@ -1483,10 +1484,6 @@ func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) { } func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { - if !nrc.bgpServerStarted { - return - } - ep, ok := obj.(*v1core.Endpoints) if !ok { glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints") @@ -1497,6 +1494,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { return } + glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) + if !nrc.bgpServerStarted { + glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name) + return + } + svc, err := nrc.serviceForEndpoints(ep) if err != nil { glog.Errorf("failed to convert endpoints resource to service: %s", err) diff --git a/pkg/controllers/network_services_controller.go b/pkg/controllers/network_services_controller.go index 6757fddf..25fd31dc 100644 --- a/pkg/controllers/network_services_controller.go +++ b/pkg/controllers/network_services_controller.go @@ -169,6 +169,7 @@ type NetworkServicesController struct { nodeportBindOnAllIp bool MetricsEnabled bool ln LinuxNetworking + readyForUpdates bool svcLister cache.Indexer epLister cache.Indexer @@ -246,7 +247,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat } else { sendHeartBeat(healthChan, "NSC") } - + nsc.readyForUpdates = true select { case <-stopCh: glog.Info("Shutting down network services controller") @@ -350,9 +351,6 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM // OnEndpointsUpdate handle change in endpoints update from the API server func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) { - nsc.mu.Lock() - defer nsc.mu.Unlock() - ep, ok := obj.(*api.Endpoints) if !ok { glog.Error("could not convert endpoints update object to *v1.Endpoints") @@ -364,6 +362,12 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) { } glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) + if !nsc.readyForUpdates { + glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name) + return + } + nsc.mu.Lock() + defer nsc.mu.Unlock() // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo() @@ -381,9 +385,6 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) { // OnServiceUpdate handle change in service update from the API server func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) { - nsc.mu.Lock() - defer nsc.mu.Unlock() - svc, ok := obj.(*api.Service) if !ok { glog.Error("could not convert service update object to *v1.Service") @@ -391,6 +392,12 @@ func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) { } glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name) + if !nsc.readyForUpdates { + glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name) + return + } + nsc.mu.Lock() + defer nsc.mu.Unlock() // build new service and endpoints map to reflect the change newServiceMap := nsc.buildServicesInfo()