diff --git a/app/controllers/network_routes_controller.go b/app/controllers/network_routes_controller.go index 2d89a088..daae4206 100644 --- a/app/controllers/network_routes_controller.go +++ b/app/controllers/network_routes_controller.go @@ -39,6 +39,7 @@ type NetworkRoutingController struct { nodeHostName string nodeSubnet net.IPNet nodeInterface string + activeNodes map[string]bool mu sync.Mutex clientset kubernetes.Interface bgpServer *gobgp.BgpServer @@ -61,7 +62,6 @@ type NetworkRoutingController struct { } var ( - activeNodes = make(map[string]bool) podEgressArgs = []string{"-m", "set", "--match-set", podSubnetsIPSetName, "src", "-m", "set", "!", "--match-set", podSubnetsIPSetName, "dst", "-m", "set", "!", "--match-set", nodeAddrsIPSetName, "dst", @@ -1004,7 +1004,9 @@ func (nrc *NetworkRoutingController) syncInternalPeers() { } currentNodes = append(currentNodes, nodeIP.String()) - activeNodes[nodeIP.String()] = true + nrc.mu.Lock() + nrc.activeNodes[nodeIP.String()] = true + nrc.mu.Unlock() n := &config.Neighbor{ Config: config.NeighborConfig{ NeighborAddress: nodeIP.String(), @@ -1047,7 +1049,9 @@ func (nrc *NetworkRoutingController) syncInternalPeers() { // find the list of the node removed, from the last known list of active nodes removedNodes := make([]string, 0) - for ip := range activeNodes { + nrc.mu.Lock() + defer nrc.mu.Unlock() + for ip := range nrc.activeNodes { stillActive := false for _, node := range currentNodes { if ip == node { @@ -1071,7 +1075,7 @@ func (nrc *NetworkRoutingController) syncInternalPeers() { if err := nrc.bgpServer.DeleteNeighbor(n); err != nil { glog.Errorf("Failed to remove node %s as peer due to %s", ip, err) } - delete(activeNodes, ip) + delete(nrc.activeNodes, ip) } } @@ -1220,7 +1224,7 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdat if err := nrc.bgpServer.AddNeighbor(n); err != nil { glog.Errorf("Failed to add node %s as peer due to %s", nodeIP, err) } - activeNodes[nodeIP.String()] = true + nrc.activeNodes[nodeIP.String()] = true } else if nodeUpdate.Op == watchers.REMOVE { glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP) n := &config.Neighbor{ @@ -1232,7 +1236,7 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdat if err := nrc.bgpServer.DeleteNeighbor(n); err != nil { glog.Errorf("Failed to remove node %s as peer due to %s", nodeIP, err) } - delete(activeNodes, nodeIP.String()) + delete(nrc.activeNodes, nodeIP.String()) } nrc.disableSourceDestinationCheck() } @@ -1414,6 +1418,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, nrc.enablePodEgress = kubeRouterConfig.EnablePodEgress nrc.syncPeriod = kubeRouterConfig.RoutesSyncPeriod nrc.clientset = clientset + nrc.activeNodes = make(map[string]bool) nrc.ipSetHandler, err = utils.NewIPSet() if err != nil { diff --git a/app/controllers/network_routes_controller_test.go b/app/controllers/network_routes_controller_test.go index d49616e2..70afb449 100644 --- a/app/controllers/network_routes_controller_test.go +++ b/app/controllers/network_routes_controller_test.go @@ -525,6 +525,173 @@ func Test_advertiseRoute(t *testing.T) { } } +func Test_OnNodeUpdate(t *testing.T) { + testcases := []struct { + name string + nrc *NetworkRoutingController + nodeEvents []*watchers.NodeUpdate + activeNodes map[string]bool + }{ + { + "node add event", + &NetworkRoutingController{ + activeNodes: make(map[string]bool), + bgpServer: gobgp.NewBgpServer(), + defaultNodeAsnNumber: 1, + clientset: fake.NewSimpleClientset(), + }, + []*watchers.NodeUpdate{ + { + Node: &v1core.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1core.NodeStatus{ + Addresses: []v1core.NodeAddress{ + { + Type: v1core.NodeInternalIP, + Address: "10.0.0.1", + }, + }, + }, + }, + Op: watchers.ADD, + }, + }, + map[string]bool{ + "10.0.0.1": true, + }, + }, + { + "add multiple nodes", + &NetworkRoutingController{ + activeNodes: make(map[string]bool), + bgpServer: gobgp.NewBgpServer(), + defaultNodeAsnNumber: 1, + clientset: fake.NewSimpleClientset(), + }, + []*watchers.NodeUpdate{ + { + Node: &v1core.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1core.NodeStatus{ + Addresses: []v1core.NodeAddress{ + { + Type: v1core.NodeInternalIP, + Address: "10.0.0.1", + }, + }, + }, + }, + Op: watchers.ADD, + }, + { + Node: &v1core.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-2", + }, + Status: v1core.NodeStatus{ + Addresses: []v1core.NodeAddress{ + { + Type: v1core.NodeExternalIP, + Address: "1.1.1.1", + }, + }, + }, + }, + Op: watchers.ADD, + }, + }, + map[string]bool{ + "10.0.0.1": true, + "1.1.1.1": true, + }, + }, + { + "add and then delete nodes", + &NetworkRoutingController{ + activeNodes: make(map[string]bool), + bgpServer: gobgp.NewBgpServer(), + defaultNodeAsnNumber: 1, + clientset: fake.NewSimpleClientset(), + }, + []*watchers.NodeUpdate{ + { + Node: &v1core.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1core.NodeStatus{ + Addresses: []v1core.NodeAddress{ + { + Type: v1core.NodeInternalIP, + Address: "10.0.0.1", + }, + }, + }, + }, + Op: watchers.ADD, + }, + { + Node: &v1core.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: v1core.NodeStatus{ + Addresses: []v1core.NodeAddress{ + { + Type: v1core.NodeInternalIP, + Address: "10.0.0.1", + }, + }, + }, + }, + Op: watchers.REMOVE, + }, + }, + map[string]bool{}, + }, + } + + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + t.Log(testcase.name) + go testcase.nrc.bgpServer.Serve() + err := testcase.nrc.bgpServer.Start(&config.Global{ + Config: config.GlobalConfig{ + As: 1, + RouterId: "10.0.0.0", + Port: 10000, + }, + }) + if err != nil { + t.Fatalf("failed to start BGP server: %v", err) + } + defer testcase.nrc.bgpServer.Stop() + + for _, nodeEvent := range testcase.nodeEvents { + testcase.nrc.OnNodeUpdate(nodeEvent) + } + + neighbors := testcase.nrc.bgpServer.GetNeighbor("", false) + for _, neighbor := range neighbors { + _, exists := testcase.activeNodes[neighbor.Config.NeighborAddress] + if !exists { + t.Errorf("expected neighbor: %v doesn't exist", neighbor.Config.NeighborAddress) + } + } + + if !reflect.DeepEqual(testcase.nrc.activeNodes, testcase.activeNodes) { + t.Logf("actual active nodes: %v", testcase.nrc.activeNodes) + t.Logf("expected active nodes: %v", testcase.activeNodes) + t.Errorf("did not get expected activeNodes") + } + }) + } +} + func createServices(clientset kubernetes.Interface, svcs []*v1core.Service) error { for _, svc := range svcs { _, err := clientset.CoreV1().Services("default").Create(svc)