mirror of
https://github.com/cloudnativelabs/kube-router.git
synced 2025-10-05 06:51:05 +02:00
While rare that NodeName is missing it is not guaranteed to exist by the Kubernetes API (see link below). This retains checking via NodeName first if it exists, but if it's nil rather than segfaulting it evaluates the via IP address. Fixes #781 https://github.com/cloudnativelabs/kube-router/blob/master/vendor/k8s.io/api/core/v1/types.go#L3487
453 lines
13 KiB
Go
453 lines
13 KiB
Go
package routing
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"strings"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/osrg/gobgp/packet/bgp"
|
|
"github.com/osrg/gobgp/table"
|
|
v1core "k8s.io/api/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// bgpAdvertiseVIP advertises the service vip (cluster ip or load balancer ip or external IP) the configured peers
|
|
func (nrc *NetworkRoutingController) bgpAdvertiseVIP(vip string) error {
|
|
|
|
attrs := []bgp.PathAttributeInterface{
|
|
bgp.NewPathAttributeOrigin(0),
|
|
bgp.NewPathAttributeNextHop(nrc.nodeIP.String()),
|
|
}
|
|
|
|
glog.V(2).Infof("Advertising route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String())
|
|
|
|
_, err := nrc.bgpServer.AddPath("", []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
|
|
vip), false, attrs, time.Now(), false)})
|
|
|
|
return err
|
|
}
|
|
|
|
// bgpWithdrawVIP unadvertises the service vip
|
|
func (nrc *NetworkRoutingController) bgpWithdrawVIP(vip string) error {
|
|
glog.V(2).Infof("Withdrawing route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String())
|
|
|
|
pathList := []*table.Path{table.NewPath(nil, bgp.NewIPAddrPrefix(uint8(32),
|
|
vip), true, nil, time.Now(), false)}
|
|
|
|
err := nrc.bgpServer.DeletePath([]byte(nil), 0, "", pathList)
|
|
|
|
return err
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) advertiseVIPs(vips []string) {
|
|
for _, vip := range vips {
|
|
err := nrc.bgpAdvertiseVIP(vip)
|
|
if err != nil {
|
|
glog.Errorf("error advertising IP: %q, error: %v", vip, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) withdrawVIPs(vips []string) {
|
|
for _, vip := range vips {
|
|
err := nrc.bgpWithdrawVIP(vip)
|
|
if err != nil {
|
|
glog.Errorf("error withdrawing IP: %q, error: %v", vip, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
nrc.OnServiceCreate(obj)
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
nrc.OnServiceUpdate(newObj, oldObj)
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
nrc.OnServiceDelete(obj)
|
|
},
|
|
}
|
|
}
|
|
|
|
func getServiceObject(obj interface{}) (svc *v1core.Service) {
|
|
if svc, _ = obj.(*v1core.Service); svc == nil {
|
|
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
|
|
}
|
|
return
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) {
|
|
if !nrc.bgpServerStarted {
|
|
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
|
|
return
|
|
}
|
|
|
|
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
|
|
if err != nil {
|
|
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
|
|
return
|
|
}
|
|
|
|
// update export policies so that new VIP's gets addedd to clusteripprefixsit and vip gets advertised to peers
|
|
err = nrc.AddPolicies()
|
|
if err != nil {
|
|
glog.Errorf("Error adding BGP policies: %s", err.Error())
|
|
}
|
|
|
|
nrc.advertiseVIPs(toAdvertise)
|
|
nrc.withdrawVIPs(toWithdraw)
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) {
|
|
|
|
if !nrc.bgpServerStarted {
|
|
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
|
|
return
|
|
}
|
|
|
|
err := nrc.AddPolicies()
|
|
if err != nil {
|
|
glog.Errorf("Error adding BGP policies: %s", err.Error())
|
|
}
|
|
|
|
activeVIPs, _, err := nrc.getActiveVIPs()
|
|
if err != nil {
|
|
glog.Errorf("Failed to get active VIP's on service delete event due to: %s", err.Error())
|
|
return
|
|
}
|
|
activeVIPsMap := make(map[string]bool)
|
|
for _, activeVIP := range activeVIPs {
|
|
activeVIPsMap[activeVIP] = true
|
|
}
|
|
serviceVIPs := nrc.getAllVIPsForService(svc)
|
|
withdrawVIPs := make([]string, 0)
|
|
for _, serviceVIP := range serviceVIPs {
|
|
// withdraw VIP only if deleted service is the last service using the VIP
|
|
if !activeVIPsMap[serviceVIP] {
|
|
withdrawVIPs = append(withdrawVIPs, serviceVIP)
|
|
}
|
|
}
|
|
nrc.withdrawVIPs(withdrawVIPs)
|
|
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) {
|
|
if svc := getServiceObject(obj); svc != nil {
|
|
glog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name)
|
|
nrc.handleServiceUpdate(svc)
|
|
}
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, logMsgFormat string) {
|
|
svc, ok := obj.(*v1core.Service)
|
|
if !ok {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
if svc, ok = tombstone.Obj.(*v1core.Service); !ok {
|
|
glog.Errorf("unexpected object type: %v", obj)
|
|
return
|
|
}
|
|
}
|
|
nrc.handleServiceDelete(svc)
|
|
}
|
|
|
|
// OnServiceCreate handles new service create event from the kubernetes API server
|
|
func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) {
|
|
nrc.tryHandleServiceUpdate(obj, "Received new service: %s/%s from watch API")
|
|
}
|
|
|
|
// OnServiceUpdate handles the service relates updates from the kubernetes API server
|
|
func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) {
|
|
nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API")
|
|
|
|
nrc.withdrawVIPs(nrc.getWithdraw(getServiceObject(objOld), getServiceObject(objNew)))
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getWithdraw(svcOld, svcNew *v1core.Service) (out []string) {
|
|
if svcOld != nil && svcNew != nil {
|
|
out = getMissingPrevGen(nrc.getExternalIps(svcOld), nrc.getExternalIps(svcNew))
|
|
}
|
|
return
|
|
}
|
|
|
|
func getMissingPrevGen(old, new []string) (withdrawIPs []string) {
|
|
lookIn := " " + strings.Join(new, " ") + " "
|
|
for _, s := range old {
|
|
if !strings.Contains(lookIn, " "+s+" ") {
|
|
withdrawIPs = append(withdrawIPs, s)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// OnServiceDelete handles the service delete updates from the kubernetes API server
|
|
func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
|
|
nrc.tryHandleServiceDelete(obj, "Received event to delete service: %s/%s from watch API")
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler {
|
|
return cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
nrc.OnEndpointsAdd(obj)
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
nrc.OnEndpointsUpdate(newObj)
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
// don't do anything if an endpoints resource is deleted since
|
|
// the service delete event handles route withdrawls
|
|
return
|
|
},
|
|
}
|
|
}
|
|
|
|
// OnEndpointsAdd handles endpoint add events from apiserver
|
|
// This method calls OnEndpointsUpdate with the addition of updating BGP export policies
|
|
// Calling AddPolicies here covers the edge case where AddPolicies fails in
|
|
// OnServiceUpdate because the corresponding Endpoint resource for the
|
|
// Service was not created yet.
|
|
func (nrc *NetworkRoutingController) OnEndpointsAdd(obj interface{}) {
|
|
if !nrc.bgpServerStarted {
|
|
glog.V(3).Info("Skipping OnAdd event to endpoint, controller still performing bootup full-sync")
|
|
return
|
|
}
|
|
|
|
err := nrc.AddPolicies()
|
|
if err != nil {
|
|
glog.Errorf("error adding BGP policies: %s", err)
|
|
}
|
|
|
|
nrc.OnEndpointsUpdate(obj)
|
|
}
|
|
|
|
// OnEndpointsUpdate handles the endpoint updates from the kubernetes API server
|
|
func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
|
|
ep, ok := obj.(*v1core.Endpoints)
|
|
if !ok {
|
|
glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints")
|
|
return
|
|
}
|
|
|
|
if isEndpointsForLeaderElection(ep) {
|
|
return
|
|
}
|
|
|
|
glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
|
|
if !nrc.bgpServerStarted {
|
|
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
|
|
return
|
|
}
|
|
|
|
svc, err := nrc.serviceForEndpoints(ep)
|
|
if err != nil {
|
|
glog.Errorf("failed to convert endpoints resource to service: %s", err)
|
|
return
|
|
}
|
|
|
|
nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event")
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) serviceForEndpoints(ep *v1core.Endpoints) (interface{}, error) {
|
|
key, err := cache.MetaNamespaceKeyFunc(ep)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
item, exists, err := nrc.svcLister.GetByKey(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("service resource doesn't exist for endpoints: %q", ep.Name)
|
|
}
|
|
|
|
return item, nil
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getClusterIp(svc *v1core.Service) string {
|
|
clusterIp := ""
|
|
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" || svc.Spec.Type == "LoadBalancer" {
|
|
|
|
// skip headless services
|
|
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
|
|
clusterIp = svc.Spec.ClusterIP
|
|
}
|
|
}
|
|
return clusterIp
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getExternalIps(svc *v1core.Service) []string {
|
|
externalIpList := make([]string, 0)
|
|
if svc.Spec.Type == "ClusterIP" || svc.Spec.Type == "NodePort" {
|
|
|
|
// skip headless services
|
|
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
|
|
externalIpList = append(externalIpList, svc.Spec.ExternalIPs...)
|
|
}
|
|
}
|
|
return externalIpList
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getLoadBalancerIps(svc *v1core.Service) []string {
|
|
loadBalancerIpList := make([]string, 0)
|
|
if svc.Spec.Type == "LoadBalancer" {
|
|
// skip headless services
|
|
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
|
|
for _, lbIngress := range svc.Status.LoadBalancer.Ingress {
|
|
if len(lbIngress.IP) > 0 {
|
|
loadBalancerIpList = append(loadBalancerIpList, lbIngress.IP)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return loadBalancerIpList
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) {
|
|
return nrc.getVIPs(false)
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getActiveVIPs() ([]string, []string, error) {
|
|
return nrc.getVIPs(true)
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string, []string, error) {
|
|
toAdvertiseList := make([]string, 0)
|
|
toWithdrawList := make([]string, 0)
|
|
|
|
for _, obj := range nrc.svcLister.List() {
|
|
svc := obj.(*v1core.Service)
|
|
|
|
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, onlyActiveEndpoints)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if len(toAdvertise) > 0 {
|
|
toAdvertiseList = append(toAdvertiseList, toAdvertise...)
|
|
}
|
|
|
|
if len(toWithdraw) > 0 {
|
|
toWithdrawList = append(toWithdrawList, toWithdraw...)
|
|
}
|
|
}
|
|
|
|
return toAdvertiseList, toWithdrawList, nil
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service, annotation string, defaultValue bool) bool {
|
|
returnValue := defaultValue
|
|
stringValue, exists := svc.Annotations[annotation]
|
|
if exists {
|
|
// Service annotations overrides defaults.
|
|
returnValue, _ = strconv.ParseBool(stringValue)
|
|
}
|
|
return returnValue
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, onlyActiveEndpoints bool) ([]string, []string, error) {
|
|
|
|
advertise := true
|
|
|
|
_, hasLocalAnnotation := svc.Annotations[svcLocalAnnotation]
|
|
hasLocalTrafficPolicy := svc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal
|
|
isLocal := hasLocalAnnotation || hasLocalTrafficPolicy
|
|
|
|
if onlyActiveEndpoints && isLocal {
|
|
var err error
|
|
advertise, err = nrc.nodeHasEndpointsForService(svc)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
ipList := nrc.getAllVIPsForService(svc)
|
|
|
|
if !advertise {
|
|
return nil, ipList, nil
|
|
}
|
|
|
|
return ipList, nil, nil
|
|
}
|
|
|
|
func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) []string {
|
|
|
|
ipList := make([]string, 0)
|
|
|
|
if nrc.shouldAdvertiseService(svc, svcAdvertiseClusterAnnotation, nrc.advertiseClusterIP) {
|
|
clusterIP := nrc.getClusterIp(svc)
|
|
if clusterIP != "" {
|
|
ipList = append(ipList, clusterIP)
|
|
}
|
|
}
|
|
|
|
if nrc.shouldAdvertiseService(svc, svcAdvertiseExternalAnnotation, nrc.advertiseExternalIP) {
|
|
ipList = append(ipList, nrc.getExternalIps(svc)...)
|
|
}
|
|
|
|
// Deprecated: Use service.advertise.loadbalancer=false instead of service.skiplbips.
|
|
_, skiplbips := svc.Annotations[svcSkipLbIpsAnnotation]
|
|
advertiseLoadBalancer := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, nrc.advertiseLoadBalancerIP)
|
|
if advertiseLoadBalancer && !skiplbips {
|
|
ipList = append(ipList, nrc.getLoadBalancerIps(svc)...)
|
|
}
|
|
|
|
return ipList
|
|
|
|
}
|
|
|
|
func isEndpointsForLeaderElection(ep *v1core.Endpoints) bool {
|
|
_, isLeaderElection := ep.Annotations[LeaderElectionRecordAnnotationKey]
|
|
return isLeaderElection
|
|
}
|
|
|
|
// nodeHasEndpointsForService will get the corresponding Endpoints resource for a given Service
|
|
// return true if any endpoint addresses has NodeName matching the node name of the route controller
|
|
func (nrc *NetworkRoutingController) nodeHasEndpointsForService(svc *v1core.Service) (bool, error) {
|
|
// listers for endpoints and services should use the same keys since
|
|
// endpoint and service resources share the same object name and namespace
|
|
key, err := cache.MetaNamespaceKeyFunc(svc)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
item, exists, err := nrc.epLister.GetByKey(key)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if !exists {
|
|
return false, fmt.Errorf("endpoint resource doesn't exist for service: %q", svc.Name)
|
|
}
|
|
|
|
ep, ok := item.(*v1core.Endpoints)
|
|
if !ok {
|
|
return false, errors.New("failed to convert cache item to Endpoints type")
|
|
}
|
|
|
|
for _, subset := range ep.Subsets {
|
|
for _, address := range subset.Addresses {
|
|
if address.NodeName != nil {
|
|
if *address.NodeName == nrc.nodeName {
|
|
return true, nil
|
|
}
|
|
} else {
|
|
if address.IP == nrc.nodeIP.String() {
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|