kube-router/app/controllers/network_routes_controller.go
Murali Reddy 1478527cc8 setup masquerade rule for traffic destined for outside of cluster and pod network
change added iptable rule in NAT table POSTROUTING chain to masqurade outbound traffic
from the pods.

Fixes #8
2017-06-01 07:20:04 +05:30

406 lines
12 KiB
Go

package controllers
import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
bgpapi "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet/bgp"
gobgp "github.com/osrg/gobgp/server"
"github.com/osrg/gobgp/table"
"github.com/vishvananda/netlink"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type NetworkRoutingController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
clientset *kubernetes.Clientset
bgpServer *gobgp.BgpServer
syncPeriod time.Duration
advertiseClusterIp bool
peerRouter string
asnNumber uint32
peerAsnNumber uint32
clusterCIDR string
}
var(
activeNodes = make(map[string]bool)
)
func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
cidr, err := utils.GetPodCidrFromCniSpec("/etc/cni/net.d/10-kuberouter.conf")
if err != nil {
glog.Errorf("Failed to get pod CIDR from CNI conf file: %s", err.Error())
}
cidrlen, _ := cidr.Mask.Size()
oldCidr := cidr.IP.String() + "/" + strconv.Itoa(cidrlen)
currentCidr, err := utils.GetPodCidrFromNodeSpec(nrc.clientset)
if err != nil {
glog.Errorf("Failed to get pod CIDR from node spec: %s", err.Error())
}
if len(cidr.IP) == 0 || strings.Compare(oldCidr, currentCidr) != 0 {
err = utils.InsertPodCidrInCniSpec("/etc/cni/net.d/10-kuberouter.conf", currentCidr)
if err != nil {
glog.Errorf("Failed to insert pod CIDR into CNI conf file: %s", err.Error())
}
}
t := time.NewTicker(nrc.syncPeriod)
defer t.Stop()
defer wg.Done()
glog.Infof("Starting network route controller")
// if the global routing peer is configured then peer with it
if len(nrc.peerRouter) != 0 {
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nrc.peerRouter,
PeerAs: nrc.peerAsnNumber,
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
glog.Errorf("Failed to peer with global peer routeri: %s", nrc.peerRouter)
}
}
if len(nrc.clusterCIDR) != 0 {
args := []string{"-s", nrc.clusterCIDR, "!", "-d", nrc.clusterCIDR, "-j", "MASQUERADE"}
iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Errorf("Failed to add iptable rule to masqurade outbound traffic from pods due to %s. External connectivity will not work.", err.Error())
}
err = iptablesCmdHandler.AppendUnique("nat", "POSTROUTING", args...)
if err != nil {
glog.Errorf("Failed to add iptable rule to masqurade outbound traffic from pods due to %s. External connectivity will not work.", err.Error())
}
}
// loop forever till notified to stop on stopCh
for {
select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
return
default:
}
// add the current set of nodes (excluding self) as BGP peers. Nodes form full mesh
nrc.syncPeers()
// advertise cluster IP for the service to be reachable via host
if nrc.advertiseClusterIp {
glog.Infof("Advertising cluster ips")
for _, svc := range watchers.ServiceWatcher.List() {
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" {
// skip headless services
if svc.Spec.ClusterIP == "None" || svc.Spec.ClusterIP == "" {
continue
}
glog.Infof("found a service of cluster ip type")
nrc.AdvertiseClusterIp(svc.Spec.ClusterIP)
}
}
}
glog.Infof("Performing periodic syn of the routes")
err := nrc.advertiseRoute()
if err != nil {
glog.Errorf("Failed to advertise route: %s", err.Error())
}
select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
return
case <-t.C:
}
}
}
func (nrc *NetworkRoutingController) watchBgpUpdates() {
watcher := nrc.bgpServer.Watch(gobgp.WatchBestPath(false))
for {
select {
case ev := <-watcher.Event():
switch msg := ev.(type) {
case *gobgp.WatchEventBestPath:
glog.Infof("Processing bgp route advertisement from peer")
for _, path := range msg.PathList {
if path.IsLocal() {
continue
}
if err := nrc.injectRoute(path); err != nil {
glog.Errorf("Failed to inject routes due to: " + err.Error())
continue
}
}
}
}
}
}
func (nrc *NetworkRoutingController) advertiseRoute() error {
cidr, err := utils.GetPodCidrFromNodeSpec(nrc.clientset)
if err != nil {
return err
}
cidrStr := strings.Split(cidr, "/")
subnet := cidrStr[0]
cidrLen, err := strconv.Atoi(cidrStr[1])
attrs := []bgp.PathAttributeInterface{
bgp.NewPathAttributeOrigin(0),
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{4000, 400000, 300000, 40001})}),
}
glog.Infof("Advertising route: '%s/%s via %s' to peers", subnet, strconv.Itoa(cidrLen), nrc.nodeIP.String())
if _, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(cidrLen),
subnet), false, attrs, time.Now(), false)}); err != nil {
return fmt.Errorf(err.Error())
}
return nil
}
func (nrc *NetworkRoutingController) AdvertiseClusterIp(clusterIp string) error {
attrs := []bgp.PathAttributeInterface{
bgp.NewPathAttributeOrigin(0),
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
bgp.NewPathAttributeAsPath([]bgp.AsPathParamInterface{bgp.NewAs4PathParam(bgp.BGP_ASPATH_ATTR_TYPE_SEQ, []uint32{4000, 400000, 300000, 40001})}),
}
glog.Infof("Advertising route: '%s/%s via %s' to peers", clusterIp, strconv.Itoa(32), nrc.nodeIP.String())
if _, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
clusterIp), false, attrs, time.Now(), false)}); err != nil {
return fmt.Errorf(err.Error())
}
return nil
}
func (nrc *NetworkRoutingController) injectRoute(path *table.Path) error {
nexthop := path.GetNexthop()
nlri := path.GetNlri()
dst, _ := netlink.ParseIPNet(nlri.String())
route := &netlink.Route{
Dst: dst,
Gw: nexthop,
Protocol: 0x11,
}
glog.Infof("Inject route: '%s via %s' from peer to routing table", dst, nexthop)
return netlink.RouteReplace(route)
}
func (nrc *NetworkRoutingController) Cleanup() {
}
// Refresh the peer relationship rest of the nodes in the cluster. Node add/remove
// events should ensure peer relationship with only currently active nodes. In case
// we miss any events from API server this method which is called periodically
// ensure peer relationship with removed nodes is deleted.
func (nrc *NetworkRoutingController) syncPeers() {
// get the current list of the nodes from API server
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Failed to list nodes: %s", err.Error())
return
}
// establish peer with current set of nodes
currentNodes := make([]string, 0)
for _, node := range nodes.Items {
nodeIP, _ := getNodeIP(&node)
if nodeIP.String() == nrc.nodeIP.String() {
continue
}
currentNodes = append(currentNodes, nodeIP.String())
activeNodes[nodeIP.String()] = true
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
// TODO: check if a node is alredy added as nieighbour in a better way that add and catch error
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
if !strings.Contains(err.Error(), "Can't overwrite the existing peer") {
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP.String(), err)
}
}
}
// find the list of the node removed, from the last known list of active nodes
removedNodes := make([]string, 0)
for ip, _ := range activeNodes {
stillActive := false
for _, node := range currentNodes {
if ip == node {
stillActive = true
break
}
}
if !stillActive {
removedNodes = append(removedNodes, ip)
}
}
// delete the neighbor for the node that is removed
for _, ip := range removedNodes {
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: ip,
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
glog.Errorf("Failed to remove node %s as peer due to %s", ip, err)
}
delete(activeNodes, ip)
}
}
// Handle updates from Node watcher. Node watcher calls this method whenever there is
// new node is added or old node is deleted. So peer up with new node and drop peering
// from old node
func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) {
nrc.mu.Lock()
defer nrc.mu.Unlock()
node := nodeUpdate.Node
nodeIP, _ := getNodeIP(node)
if nodeUpdate.Op == watchers.ADD {
glog.Infof("Received node %s added update from watch API so peer with new node", nodeIP)
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
glog.Errorf("Failed to add node %s as peer due to %s", nodeIP, err)
}
activeNodes[nodeIP.String()] = true
} else if nodeUpdate.Op == watchers.REMOVE {
glog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
n := &config.Neighbor{
Config: config.NeighborConfig{
NeighborAddress: nodeIP.String(),
PeerAs: nrc.asnNumber,
},
}
if err := nrc.bgpServer.DeleteNeighbor(n); err != nil {
glog.Errorf("Failed to remove node %s as peer due to %s", nodeIP, err)
}
delete(activeNodes, nodeIP.String())
}
}
func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
nrc := NetworkRoutingController{}
nrc.clusterCIDR = kubeRouterConfig.ClusterCIDR
nrc.syncPeriod = kubeRouterConfig.RoutesSyncPeriod
nrc.clientset = clientset
if len(kubeRouterConfig.ClusterAsn) != 0 {
asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32)
if err != nil {
panic("Invalid cluster ASN: " + err.Error())
}
if asn > 65534 || asn < 64512 {
panic("Invalid ASN number for cluster ASN")
}
nrc.asnNumber = uint32(asn)
} else {
nrc.asnNumber = 64512 // this magic number is first of the private ASN range, use it as default
}
nrc.advertiseClusterIp = kubeRouterConfig.AdvertiseClusterIp
if len(kubeRouterConfig.PeerRouter) != 0 {
if net.ParseIP(kubeRouterConfig.PeerRouter) == nil {
panic("Invalid peer router ip: " + nrc.peerRouter)
}
nrc.peerRouter = kubeRouterConfig.PeerRouter
if len(kubeRouterConfig.PeerAsn) == 0 {
panic("ASN number for peer router must be specified")
}
asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32)
if err != nil {
panic("Invalid BGP peer ASN: " + err.Error())
}
if asn > 65534 {
panic("Invalid ASN number for cluster ASN")
}
nrc.peerAsnNumber = uint32(asn)
}
nodeHostName, err := os.Hostname()
if err != nil {
panic(err.Error())
}
nodeFqdnHostName := utils.GetFqdn()
node, err := clientset.Core().Nodes().Get(nodeHostName, metav1.GetOptions{})
if err != nil {
node, err = clientset.Core().Nodes().Get(nodeFqdnHostName, metav1.GetOptions{})
if err != nil {
panic(err.Error())
}
nrc.nodeHostName = nodeFqdnHostName
} else {
nrc.nodeHostName = nodeHostName
}
nodeIP, err := getNodeIP(node)
if err != nil {
panic(err.Error())
}
nrc.nodeIP = nodeIP
nrc.bgpServer = gobgp.NewBgpServer()
go nrc.bgpServer.Serve()
g := bgpapi.NewGrpcServer(nrc.bgpServer, ":50051")
go g.Serve()
global := &config.Global{
Config: config.GlobalConfig{
As: nrc.asnNumber,
RouterId: nrc.nodeIP.String(),
},
}
if err := nrc.bgpServer.Start(global); err != nil {
panic(err)
}
watchers.NodeWatcher.RegisterHandler(&nrc)
go nrc.watchBgpUpdates()
return &nrc, nil
}