kube-router/app/controllers/network_routes_controller.go
Murali Reddy d042dbb21e Add new Node api watcher which watches for add/remove nodes events.
On add/remove node events, perform refresh of peers to the peers as per the
current set of active nodes. If a node is removed, delete the BGP nieghbor relation.

Fixes #14
2017-05-30 20:35:23 +05:30

385 lines
11 KiB
Go

package controllers
import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/golang/glog"
bgpapi "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet/bgp"
gobgp "github.com/osrg/gobgp/server"
"github.com/osrg/gobgp/table"
"github.com/vishvananda/netlink"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type NetworkRoutingController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
clientset *kubernetes.Clientset
bgpServer *gobgp.BgpServer
syncPeriod time.Duration
advertiseClusterIp bool
peerRouter string
asnNumber uint32
peerAsnNumber uint32
}
var(
activeNodes = make(map[string]bool)
)
func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
cidr, err := utils.GetPodCidrFromCniSpec("/etc/cni/net.d/10-kuberouter.conf")
if err != nil {
glog.Errorf("Failed to get pod CIDR from CNI conf file: %s", err.Error())
}
cidrlen, _ := cidr.Mask.Size()
oldCidr := cidr.IP.String() + "/" + strconv.Itoa(cidrlen)
currentCidr, err := utils.GetPodCidrFromNodeSpec(nrc.clientset)
if err != nil {
glog.Errorf("Failed to get pod CIDR from node spec: %s", err.Error())
}
if len(cidr.IP) == 0 || strings.Compare(oldCidr, currentCidr) != 0 {
err = utils.InsertPodCidrInCniSpec("/etc/cni/net.d/10-kuberouter.conf", currentCidr)
if err != nil {
glog.Errorf("Failed to insert pod CIDR into CNI conf file: %s", err.Error())
}
}
t := time.NewTicker(nrc.syncPeriod)
defer t.Stop()
defer wg.Done()
glog.Infof("Starting network route controller")
// if the global routing peer is configured then peer with it
if len(nrc.peerRouter) != 0 {
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nrc.peerRouter,
PeerAs: nrc.peerAsnNumber,
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
glog.Errorf("Failed to peer with global peer routeri: %s", nrc.peerRouter)
}
}
// loop forever till notified to stop on stopCh
for {
select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
return
default:
}
// add the current set of nodes (excluding self) as BGP peers. Nodes form full mesh
nrc.syncPeers()
// advertise cluster IP for the service to be reachable via host
if nrc.advertiseClusterIp {
glog.Infof("Advertising cluster ips")
for _, svc := range watchers.ServiceWatcher.List() {
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" {
glog.Infof("found a service of cluster ip type")
nrc.AdvertiseClusterIp(svc.Spec.ClusterIP)
}
}
}
glog.Infof("Performing periodic syn of the routes")
err := nrc.advertiseRoute()
if err != nil {
glog.Errorf("Failed to advertise route: %s", err.Error())
}
select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
return
case <-t.C:
}
}
}
func (nrc *NetworkRoutingController) watchBgpUpdates() {
watcher := nrc.bgpServer.Watch(gobgp.WatchBestPath(false))
for {
select {
case ev := <-watcher.Event():
switch msg := ev.(type) {
case *gobgp.WatchEventBestPath:
glog.Infof("Processing bgp route advertisement from peer")
for _, path := range msg.PathList {
if path.IsLocal() {
continue
}
if err := nrc.injectRoute(path); err != nil {
glog.Errorf("Failed to inject routes due to: " + err.Error())
continue
}
}
}
}
}
}
func (nrc *NetworkRoutingController) advertiseRoute() error {
cidr, err := utils.GetPodCidrFromNodeSpec(nrc.clientset)
if err != nil {
return err
}
cidrStr := strings.Split(cidr, "/")
subnet := cidrStr[0]
cidrLen, err := strconv.Atoi(cidrStr[1])
attrs := []bgp.PathAttributeInterface{
bgp.NewPathAttributeOrigin(0),
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{4000, 400000, 300000, 40001})}),
}
glog.Infof("Advertising route: '%s/%s via %s' to peers", subnet, strconv.Itoa(cidrLen), nrc.nodeIP.String())
if _, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(cidrLen),
subnet), false, attrs, time.Now(), false)}); err != nil {
return fmt.Errorf(err.Error())
}
return nil
}
func (nrc *NetworkRoutingController) AdvertiseClusterIp(clusterIp string) error {
attrs := []bgp.PathAttributeInterface{
bgp.NewPathAttributeOrigin(0),
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{4000, 400000, 300000, 40001})}),
}
glog.Infof("Advertising route: '%s/%s via %s' to peers", clusterIp, strconv.Itoa(32), nrc.nodeIP.String())
if _, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
clusterIp), false, attrs, time.Now(), false)}); err != nil {
return fmt.Errorf(err.Error())
}
return nil
}
func (nrc *NetworkRoutingController) injectRoute(path *table.Path) error {
nexthop := path.GetNexthop()
nlri := path.GetNlri()
dst, _ := netlink.ParseIPNet(nlri.String())
route := &netlink.Route{
Dst: dst,
Gw: nexthop,
Protocol: 0x11,
}
glog.Infof("Inject route: '%s via %s' from peer to routing table", dst, nexthop)
return netlink.RouteReplace(route)
}
func (nrc *NetworkRoutingController) Cleanup() {
}
// Refresh the peer relationship rest of the nodes in the cluster. Node add/remove
// events should ensure peer relationship with only currently active nodes. In case
// we miss any events from API server this method which is called periodically
// ensure peer relationship with removed nodes is deleted.
func (nrc *NetworkRoutingController) syncPeers() {
// get the current list of the nodes from API server
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Failed to list nodes: %s", err.Error())
return
}
// establish peer with current set of nodes
currentNodes := make([]string, 0)
for _, node := range nodes.Items {
nodeIP, _ := getNodeIP(&node)
if nodeIP.String() == nrc.nodeIP.String() {
continue
}
currentNodes = append(currentNodes, nodeIP.String())
activeNodes[nodeIP.String()] = true
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
// TODO: check if a node is alredy added as nieighbour in a better way that add and catch error
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
if !strings.Contains(err.Error(), "Can't overwrite the existing peer") {
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP.String(), err)
}
}
}
// find the list of the node removed, from the last known list of active nodes
removedNodes := make([]string, 0)
for ip, _ := range activeNodes {
stillActive := false
for _, node := range currentNodes {
if ip == node {
stillActive = true
break
}
}
if !stillActive {
removedNodes = append(removedNodes, ip)
}
}
// delete the neighbor for the node that is removed
for _, ip := range removedNodes {
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: ip,
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
glog.Errorf("Failed to remove node %s as peer due to %s", ip, err)
}
delete(activeNodes, ip)
}
}
// Handle updates from Node watcher. Node watcher calls this method whenever there is
// new node is added or old node is deleted. So peer up with new node and drop peering
// from old node
func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) {
nrc.mu.Lock()
defer nrc.mu.Unlock()
node := nodeUpdate.Node
nodeIP, _ := getNodeIP(node)
if nodeUpdate.Op == watchers.ADD {
glog.Infof("Received node %s added update from watch API so peer with new node", nodeIP)
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP, err)
}
activeNodes[nodeIP.String()] = true
} else if nodeUpdate.Op == watchers.REMOVE {
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
glog.Errorf("Failed to remove node %s as peer due to %s", nodeIP, err)
}
delete(activeNodes, nodeIP.String())
}
}
func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
nrc := NetworkRoutingController{}
nrc.syncPeriod = kubeRouterConfig.RoutesSyncPeriod
nrc.clientset = clientset
if len(kubeRouterConfig.ClusterAsn) != 0 {
asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32)
if err != nil {
panic("Invalid cluster ASN: " + err.Error())
}
if asn > 65534 || asn < 64512 {
panic("Invalid ASN number for cluster ASN")
}
nrc.asnNumber = uint32(asn)
} else {
nrc.asnNumber = 64512 // this magic number is first of the private ASN range, use it as default
}
nrc.advertiseClusterIp = kubeRouterConfig.AdvertiseClusterIp
if len(kubeRouterConfig.PeerRouter) != 0 {
if net.ParseIP(kubeRouterConfig.PeerRouter) == nil {
panic("Invalid peer router ip: " + nrc.peerRouter)
}
nrc.peerRouter = kubeRouterConfig.PeerRouter
if len(kubeRouterConfig.PeerAsn) == 0 {
panic("ASN number for peer router must be specified")
}
asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32)
if err != nil {
panic("Invalid BGP peer ASN: " + err.Error())
}
if asn > 65534 {
panic("Invalid ASN number for cluster ASN")
}
nrc.peerAsnNumber = uint32(asn)
}
nodeHostName, err := os.Hostname()
if err != nil {
panic(err.Error())
}
nodeFqdnHostName := utils.GetFqdn()
node, err := clientset.Core().Nodes().Get(nodeHostName, metav1.GetOptions{})
if err != nil {
node, err = clientset.Core().Nodes().Get(nodeFqdnHostName, metav1.GetOptions{})
if err != nil {
panic(err.Error())
}
nrc.nodeHostName = nodeFqdnHostName
} else {
nrc.nodeHostName = nodeHostName
}
nodeIP, err := getNodeIP(node)
if err != nil {
panic(err.Error())
}
nrc.nodeIP = nodeIP
nrc.bgpServer = gobgp.NewBgpServer()
go nrc.bgpServer.Serve()
g := bgpapi.NewGrpcServer(nrc.bgpServer, ":50051")
go g.Serve()
global := &config.Global{
Config: config.GlobalConfig{
As: nrc.asnNumber,
RouterId: nrc.nodeIP.String(),
},
}
if err := nrc.bgpServer.Start(global); err != nil {
panic(err)
}
watchers.NodeWatcher.RegisterHandler(&nrc)
go nrc.watchBgpUpdates()
return &nrc, nil
}