kube-router/pkg/controllers/proxy/service_endpoints_sync.go
Manuel Rüger 12674d5f8b
Add golangci-lint support (#895)
* Makefile: Add lint using golangci-lint

* build/travis-test.sh: Run lint step

* metrics_controller: Lint

pkg/metrics/metrics_controller.go:150:2: `mu` is unused (structcheck)
        mu          sync.Mutex
        ^
pkg/metrics/metrics_controller.go:151:2: `nodeIP` is unused (structcheck)
        nodeIP      net.IP
        ^

* network_service_graceful: Lint

pkg/controllers/proxy/network_service_graceful.go:21:6: `gracefulQueueItem` is unused (deadcode)
type gracefulQueueItem struct {
     ^
pkg/controllers/proxy/network_service_graceful.go:22:2: `added` is unused (structcheck)
        added   time.Time
        ^
pkg/controllers/proxy/network_service_graceful.go:23:2: `service` is unused (structcheck)
        service *ipvs.Service
        ^

* network_services_controller_test: Lint

pkg/controllers/proxy/network_services_controller_test.go:80:6: func `logf` is unused (unused)

* ecmp_vip: Lint

pkg/controllers/routing/ecmp_vip.go:208:4: S1023: redundant `return` statement (gosimple)
                        return
                        ^

* bgp_peers: Lint

pkg/controllers/routing/bgp_peers.go:331:4: S1023: redundant `return` statement (gosimple)
                        return
                        ^

* bgp_policies: Lint

pkg/controllers/routing/bgp_policies.go:80:3: S1011: should replace loop with `externalBgpPeers = append(externalBgpPeers, nrc.nodePeerRouters...)` (gosimple)
                for _, peer := range nrc.nodePeerRouters {
                ^
pkg/controllers/routing/bgp_policies.go:23:20: ineffectual assignment to `err` (ineffassign)
        podCidrPrefixSet, err := table.NewPrefixSet(config.PrefixSet{
                          ^
pkg/controllers/routing/bgp_policies.go:42:22: ineffectual assignment to `err` (ineffassign)
        clusterIPPrefixSet, err := table.NewPrefixSet(config.PrefixSet{
                            ^
pkg/controllers/routing/bgp_policies.go:33:30: Error return value of `nrc.bgpServer.AddDefinedSet` is not checked (errcheck)
                nrc.bgpServer.AddDefinedSet(podCidrPrefixSet)
                                           ^
pkg/controllers/routing/bgp_policies.go:48:30: Error return value of `nrc.bgpServer.AddDefinedSet` is not checked (errcheck)
                nrc.bgpServer.AddDefinedSet(clusterIPPrefixSet)
                                           ^
pkg/controllers/routing/bgp_policies.go:69:31: Error return value of `nrc.bgpServer.AddDefinedSet` is not checked (errcheck)
                        nrc.bgpServer.AddDefinedSet(iBGPPeerNS)
                                                   ^
pkg/controllers/routing/bgp_policies.go:108:31: Error return value of `nrc.bgpServer.AddDefinedSet` is not checked (errcheck)
                        nrc.bgpServer.AddDefinedSet(ns)
                                                   ^
pkg/controllers/routing/bgp_policies.go:120:30: Error return value of `nrc.bgpServer.AddDefinedSet` is not checked (errcheck)
                nrc.bgpServer.AddDefinedSet(ns)
                                           ^
                                                   ^

* network_policy_controller: Lint

pkg/controllers/netpol/network_policy_controller.go:35:2: `networkPolicyAnnotation` is unused (deadcode)
        networkPolicyAnnotation      = "net.beta.kubernetes.io/network-policy"
        ^
pkg/controllers/netpol/network_policy_controller.go:1047:4: SA9003: empty branch (staticcheck)
                        if err != nil {
                        ^
pkg/controllers/netpol/network_policy_controller.go:969:10: SA4006: this value of `err` is never used (staticcheck)
        chains, err := iptablesCmdHandler.ListChains("filter")
                ^
pkg/controllers/netpol/network_policy_controller.go:1568:4: SA4006: this value of `err` is never used (staticcheck)
                        err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
                        ^
pkg/controllers/netpol/network_policy_controller.go:1584:4: SA4006: this value of `err` is never used (staticcheck)
                        err = iptablesCmdHandler.Delete("filter", "OUTPUT", strconv.Itoa(i-realRuleNo))
                        ^

* network_services_controller: Lint

pkg/controllers/proxy/network_services_controller.go:66:2: `h` is unused (deadcode)
        h      *ipvs.Handle
        ^
pkg/controllers/proxy/network_services_controller.go:879:23: SA1019: client.NewEnvClient is deprecated: use NewClientWithOpts(FromEnv)  (staticcheck)
        dockerClient, err := client.NewEnvClient()
                             ^
pkg/controllers/proxy/network_services_controller.go:944:5: unreachable: unreachable code (govet)
                                glog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KUBE_TUNNEL_IF)
                                ^
pkg/controllers/proxy/network_services_controller.go:1289:5: S1002: should omit comparison to bool constant, can be simplified to `!hasHairpinChain` (gosimple)
        if hasHairpinChain != true {
           ^
pkg/controllers/proxy/network_services_controller.go:1237:43: S1019: should use make(map[string][]string) instead (gosimple)
        rulesNeeded := make(map[string][]string, 0)
                                                 ^
pkg/controllers/proxy/network_services_controller.go:1111:4: S1023: redundant break statement (gosimple)
                        break
                        ^
pkg/controllers/proxy/network_services_controller.go:1114:4: S1023: redundant break statement (gosimple)
                        break
                        ^
pkg/controllers/proxy/network_services_controller.go:1117:4: S1023: redundant break statement (gosimple)
                        break
                        ^
pkg/controllers/proxy/network_services_controller.go:445:21: Error return value of `nsc.publishMetrics` is not checked (errcheck)
                nsc.publishMetrics(nsc.serviceMap)
                                  ^
pkg/controllers/proxy/network_services_controller.go:1609:9: Error return value of `h.Write` is not checked (errcheck)
        h.Write([]byte(ip + "-" + protocol + "-" + port))
               ^
pkg/controllers/proxy/network_services_controller.go:912:13: Error return value of `netns.Set` is not checked (errcheck)
                        netns.Set(hostNetworkNamespaceHandle)
                                 ^
pkg/controllers/proxy/network_services_controller.go:926:13: Error return value of `netns.Set` is not checked (errcheck)
                        netns.Set(hostNetworkNamespaceHandle)
                                 ^
pkg/controllers/proxy/network_services_controller.go:950:13: Error return value of `netns.Set` is not checked (errcheck)
                        netns.Set(hostNetworkNamespaceHandle)
                                 ^
pkg/controllers/proxy/network_services_controller.go:641:9: SA4006: this value of `err` is never used (staticcheck)
        addrs, err := getAllLocalIPs()
               ^

* network_routes_controller: Lint

pkg/controllers/routing/network_routes_controller.go:340:2: S1000: should use for range instead of for { select {} } (gosimple)
        for {
        ^
pkg/controllers/routing/network_routes_controller.go:757:22: Error return value of `nrc.bgpServer.Stop` is not checked (errcheck)
                        nrc.bgpServer.Stop()
                                          ^
pkg/controllers/routing/network_routes_controller.go:770:22: Error return value of `nrc.bgpServer.Stop` is not checked (errcheck)
                        nrc.bgpServer.Stop()
                                          ^
pkg/controllers/routing/network_routes_controller.go:782:23: Error return value of `nrc.bgpServer.Stop` is not checked (errcheck)
                                nrc.bgpServer.Stop()
                                                  ^
pkg/controllers/routing/network_routes_controller.go:717:12: Error return value of `g.Serve` is not checked (errcheck)
        go g.Serve()

* ipset: Lint

pkg/utils/ipset.go:243:23: Error return value of `entry.Set.Parent.Save` is not checked (errcheck)
        entry.Set.Parent.Save()
                             ^

* pkg/cmd/kube-router: Lint

pkg/cmd/kube-router.go:214:26: SA1006: printf-style function with dynamic format string and no further arguments should use print-style function instead (staticcheck)
                fmt.Fprintf(os.Stderr, output)
                                       ^
pkg/cmd/kube-router.go:184:15: SA1017: the channel used with signal.Notify should be buffered (staticcheck)
        signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
                     ^
pkg/cmd/kube-router.go:94:17: Error return value of `hc.RunServer` is not checked (errcheck)
        go hc.RunServer(stopCh, &wg)
                       ^
pkg/cmd/kube-router.go:112:16: Error return value of `hc.RunCheck` is not checked (errcheck)
        go hc.RunCheck(healthChan, stopCh, &wg)
                      ^
pkg/cmd/kube-router.go:121:12: Error return value of `mc.Run` is not checked (errcheck)
                go mc.Run(healthChan, stopCh, &wg)
                         ^

* cmd/kube-router/kube-router: Lint

cmd/kube-router/kube-router.go:31:24: Error return value of `flag.CommandLine.Parse` is not checked (errcheck)
        flag.CommandLine.Parse([]string{})
                              ^
cmd/kube-router/kube-router.go:33:10: Error return value of `flag.Set` is not checked (errcheck)
        flag.Set("logtostderr", "true")
                ^
cmd/kube-router/kube-router.go:34:10: Error return value of `flag.Set` is not checked (errcheck)
        flag.Set("v", config.VLevel)
                ^
cmd/kube-router/kube-router.go:62:27: SA1006: printf-style function with dynamic format string and no further arguments should use print-style function instead (staticcheck)
                        fmt.Fprintf(os.Stdout, http.ListenAndServe("0.0.0.0:6060", nil).Error())
                                               ^

* kube-router_test: Lint

cmd/kube-router/kube-router_test.go:21:10: Error return value of `io.Copy` is not checked (errcheck)
                io.Copy(stderrBuf, stderrR)
                       ^
cmd/kube-router/kube-router_test.go:40:17: Error return value of `docBuf.ReadFrom` is not checked (errcheck)
        docBuf.ReadFrom(docF)
                       ^

* service_endpoints_sync: Lint

pkg/controllers/proxy/service_endpoints_sync.go:460:2: ineffectual assignment to `ipvsSvcs` (ineffassign)
        ipvsSvcs, err := nsc.ln.ipvsGetServices()
        ^
pkg/controllers/proxy/service_endpoints_sync.go:311:5: SA4006: this value of `err` is never used (staticcheck)
                                err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP)
                                ^

* node: Lint

pkg/utils/node.go:19:16: SA1019: clientset.Core is deprecated: please explicitly pick a version if possible.  (staticcheck)
                node, err := clientset.Core().Nodes().Get(nodeName, metav1.GetOptions{})
                             ^
pkg/utils/node.go:27:15: SA1019: clientset.Core is deprecated: please explicitly pick a version if possible.  (staticcheck)
        node, err := clientset.Core().Nodes().Get(hostName, metav1.GetOptions{})
                     ^
pkg/utils/node.go:34:15: SA1019: clientset.Core is deprecated: please explicitly pick a version if possible.  (staticcheck)
                node, err = clientset.Core().Nodes().Get(hostnameOverride, metav1.GetOptions{})
                            ^

* aws: Lint

pkg/controllers/routing/aws.go:31:8: SA4006: this value of `err` is never used (staticcheck)
                URL, err := url.Parse(providerID)
                     ^

* health_controller: Lint

pkg/healthcheck/health_controller.go:54:10: Error return value of `w.Write` is not checked (errcheck)
                w.Write([]byte("OK\n"))
                       ^
pkg/healthcheck/health_controller.go:68:10: Error return value of `w.Write` is not checked (errcheck)
                w.Write([]byte("Unhealthy"))
                       ^
pkg/healthcheck/health_controller.go:159:2: S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
        select {
        ^

* network_routes_controller_test: Lint

pkg/controllers/routing/network_routes_controller_test.go:1113:37: Error return value of `testcase.nrc.bgpServer.Stop` is not checked (errcheck)
                        defer testcase.nrc.bgpServer.Stop()
                                                         ^
pkg/controllers/routing/network_routes_controller_test.go:1314:37: Error return value of `testcase.nrc.bgpServer.Stop` is not checked (errcheck)
                        defer testcase.nrc.bgpServer.Stop()
                                                         ^
pkg/controllers/routing/network_routes_controller_test.go:2327:37: Error return value of `testcase.nrc.bgpServer.Stop` is not checked (errcheck)
                        defer testcase.nrc.bgpServer.Stop()
                                                         ^

* .golangci.yml: Increase timeout

Default is 1m, increase to 5m otherwise travis might fail

* Makefile: Update golangci-lint to 1.27.0

* kube-router_test.go: defer waitgroup

Co-authored-by: Aaron U'Ren <aauren@users.noreply.github.com>

* network_routes_controller: Incorporate review

* bgp_policies: Incorporate review

* network_routes_controller: Incorporate review

* bgp_policies: Log error instead

* network_services_controller: Incorporate review

Co-authored-by: Aaron U'Ren <aauren@users.noreply.github.com>
2020-06-03 22:29:06 +02:00

546 lines
20 KiB
Go

package proxy
import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"syscall"
"time"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
"github.com/docker/libnetwork/ipvs"
"github.com/golang/glog"
"github.com/vishvananda/netlink"
"k8s.io/apimachinery/pkg/util/sets"
)
// sync the ipvs service and server details configured to reflect the desired state of Kubernetes services
// and endpoints as learned from services and endpoints information from the api server
func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error {
start := time.Now()
defer func() {
endTime := time.Since(start)
if nsc.MetricsEnabled {
metrics.ControllerIpvsServicesSyncTime.Observe(endTime.Seconds())
}
glog.V(1).Infof("sync ipvs services took %v", endTime)
}()
var err error
var syncErrors bool
// map to track all active IPVS services and servers that are setup during sync of
// cluster IP, nodeport and external IP services
activeServiceEndpointMap := make(map[string][]string)
err = nsc.setupClusterIPServices(serviceInfoMap, endpointsInfoMap, activeServiceEndpointMap)
if err != nil {
syncErrors = true
glog.Errorf("Error setting up IPVS services for service cluster IP's: %s", err.Error())
}
err = nsc.setupNodePortServices(serviceInfoMap, endpointsInfoMap, activeServiceEndpointMap)
if err != nil {
syncErrors = true
glog.Errorf("Error setting up IPVS services for service nodeport's: %s", err.Error())
}
err = nsc.setupExternalIPServices(serviceInfoMap, endpointsInfoMap, activeServiceEndpointMap)
if err != nil {
syncErrors = true
glog.Errorf("Error setting up IPVS services for service external IP's and load balancer IP's: %s", err.Error())
}
err = nsc.cleanupStaleVIPs(activeServiceEndpointMap)
if err != nil {
syncErrors = true
glog.Errorf("Error cleaning up stale VIP's configured on the dummy interface: %s", err.Error())
}
err = nsc.cleanupStaleIPVSConfig(activeServiceEndpointMap)
if err != nil {
syncErrors = true
glog.Errorf("Error cleaning up stale IPVS services and servers: %s", err.Error())
}
err = nsc.syncIpvsFirewall()
if err != nil {
syncErrors = true
glog.Errorf("Error syncing ipvs svc iptables rules to permit traffic to service VIP's: %s", err.Error())
}
err = nsc.setupForDSR(serviceInfoMap)
if err != nil {
syncErrors = true
glog.Errorf("Error setting up necessary policy based routing configuration needed for direct server return: %s", err.Error())
}
if syncErrors {
glog.V(1).Info("One or more errors encountered during sync of IPVS services and servers to desired state")
} else {
glog.V(1).Info("IPVS servers and services are synced to desired state")
}
return nil
}
func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error {
ipvsSvcs, err := nsc.ln.ipvsGetServices()
if err != nil {
return errors.New("Failed get list of IPVS services due to: " + err.Error())
}
for k, svc := range serviceInfoMap {
var protocol uint16
switch svc.protocol {
case "tcp":
protocol = syscall.IPPROTO_TCP
case "udp":
protocol = syscall.IPPROTO_UDP
default:
protocol = syscall.IPPROTO_NONE
}
endpoints := endpointsInfoMap[k]
dummyVipInterface, err := nsc.ln.getKubeDummyInterface()
if err != nil {
return errors.New("Failed creating dummy interface: " + err.Error())
}
// assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node
err = nsc.ln.ipAddrAdd(dummyVipInterface, svc.clusterIP.String(), true)
if err != nil {
continue
}
// create IPVS service for the service to be exposed through the cluster ip
ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags)
if err != nil {
glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error())
continue
}
var clusterServiceId = generateIpPortId(svc.clusterIP.String(), svc.protocol, strconv.Itoa(svc.port))
activeServiceEndpointMap[clusterServiceId] = make([]string, 0)
// add IPVS remote server to the IPVS service
for _, endpoint := range endpoints {
dst := ipvs.Destination{
Address: net.ParseIP(endpoint.ip),
AddressFamily: syscall.AF_INET,
Port: uint16(endpoint.port),
Weight: 1,
}
// Conditions on which to add an endpoint on this node:
// 1) Service is not a local service
// 2) Service is a local service, but has no active endpoints on this node
// 3) Service is a local service, has active endpoints on this node, and this endpoint is one of them
if svc.local {
if hasActiveEndpoints(svc, endpoints) && !endpoint.isLocal {
continue
}
}
err := nsc.ln.ipvsAddServer(ipvsClusterVipSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
} else {
activeServiceEndpointMap[clusterServiceId] = append(activeServiceEndpointMap[clusterServiceId], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port)))
}
}
}
return nil
}
func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error {
ipvsSvcs, err := nsc.ln.ipvsGetServices()
if err != nil {
return errors.New("Failed get list of IPVS services due to: " + err.Error())
}
for k, svc := range serviceInfoMap {
var protocol uint16
switch svc.protocol {
case "tcp":
protocol = syscall.IPPROTO_TCP
case "udp":
protocol = syscall.IPPROTO_UDP
default:
protocol = syscall.IPPROTO_NONE
}
if svc.nodePort == 0 {
// service is not NodePort type
continue
}
endpoints := endpointsInfoMap[k]
if svc.local && !hasActiveEndpoints(svc, endpoints) {
glog.V(1).Infof("Skipping setting up NodePort service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
continue
}
// create IPVS service for the service to be exposed through the nodeport
var ipvsNodeportSvcs []*ipvs.Service
var nodeServiceIds []string
if nsc.nodeportBindOnAllIp {
// bind on all interfaces instead
addrs, err := getAllLocalIPs()
if err != nil {
glog.Errorf("Could not get list of system addresses for ipvs services: %s", err.Error())
continue
}
if len(addrs) == 0 {
glog.Errorf("No IP addresses returned for nodeport service creation!")
continue
}
ipvsNodeportSvcs = make([]*ipvs.Service, len(addrs))
nodeServiceIds = make([]string, len(addrs))
for i, addr := range addrs {
ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags)
if err != nil {
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
continue
}
nodeServiceIds[i] = generateIpPortId(addr.IP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
activeServiceEndpointMap[nodeServiceIds[i]] = make([]string, 0)
}
} else {
ipvsNodeportSvcs = make([]*ipvs.Service, 1)
ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags)
if err != nil {
glog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error())
continue
}
nodeServiceIds = make([]string, 1)
nodeServiceIds[0] = generateIpPortId(nsc.nodeIP.String(), svc.protocol, strconv.Itoa(svc.nodePort))
activeServiceEndpointMap[nodeServiceIds[0]] = make([]string, 0)
}
for _, endpoint := range endpoints {
dst := ipvs.Destination{
Address: net.ParseIP(endpoint.ip),
AddressFamily: syscall.AF_INET,
Port: uint16(endpoint.port),
Weight: 1,
}
for i := 0; i < len(ipvsNodeportSvcs); i++ {
if !svc.local || (svc.local && endpoint.isLocal) {
err := nsc.ln.ipvsAddServer(ipvsNodeportSvcs[i], &dst)
if err != nil {
glog.Errorf(err.Error())
} else {
activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port)))
}
}
}
}
}
return nil
}
func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error {
ipvsSvcs, err := nsc.ln.ipvsGetServices()
if err != nil {
return errors.New("Failed get list of IPVS services due to: " + err.Error())
}
for k, svc := range serviceInfoMap {
var protocol uint16
switch svc.protocol {
case "tcp":
protocol = syscall.IPPROTO_TCP
case "udp":
protocol = syscall.IPPROTO_UDP
default:
protocol = syscall.IPPROTO_NONE
}
endpoints := endpointsInfoMap[k]
dummyVipInterface, err := nsc.ln.getKubeDummyInterface()
if err != nil {
return errors.New("Failed creating dummy interface: " + err.Error())
}
externalIpServices := make([]externalIPService, 0)
// create IPVS service for the service to be exposed through the external IP's
// For external IP (which are meant for ingress traffic) Kube-router setsup IPVS services
// based on FWMARK to enable Direct server return functionality. DSR requires a director
// without a VIP http://www.austintek.com/LVS/LVS-HOWTO/HOWTO/LVS-HOWTO.routing_to_VIP-less_director.html
// to avoid martian packets
extIPSet := sets.NewString(svc.externalIPs...)
if !svc.skipLbIps {
extIPSet = extIPSet.Union(sets.NewString(svc.loadBalancerIPs...))
}
if extIPSet.Len() == 0 {
// service is not LoadBalancer type and no external IP's are configured
continue
}
if svc.local && !hasActiveEndpoints(svc, endpoints) {
glog.V(1).Infof("Skipping setting up IPVS service for external IP and LoadBalancer IP for the service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name)
continue
}
for _, externalIP := range extIPSet.List() {
var externalIpServiceId string
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags)
if err != nil {
glog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error())
continue
}
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
fwMark, err := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port))
if err != nil {
glog.Errorf("Failed to generate Fwmark")
continue
}
externalIpServiceId = fmt.Sprint(fwMark)
// ensure there is iptables mangle table rule to FWMARK the packet
err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIpServiceId)
if err != nil {
glog.Errorf("Failed to setup mangle table rule to FMWARD the traffic to external IP")
continue
}
// ensure VIP less director. we dont assign VIP to any interface
err = nsc.ln.ipAddrDel(dummyVipInterface, externalIP)
if err != nil {
glog.Errorf("Failed to delete external ip adress from dummyVipInterface due to %s", err)
continue
}
// do policy routing to deliver the packet locally so that IPVS can pick the packet
err = routeVIPTrafficToDirector("0x" + fmt.Sprintf("%x", fwMark))
if err != nil {
glog.Errorf("Failed to setup ip rule to lookup traffic to external IP: %s through custom "+
"route table due to %s", externalIP, err.Error())
continue
}
} else {
// ensure director with vip assigned
err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true)
if err != nil && err.Error() != IFACE_HAS_ADDR {
glog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KUBE_DUMMY_IF, err.Error())
}
// create IPVS service for the service to be exposed through the external ip
ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags)
if err != nil {
glog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error())
continue
}
externalIpServices = append(externalIpServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIp: externalIP})
externalIpServiceId = generateIpPortId(externalIP, svc.protocol, strconv.Itoa(svc.port))
// ensure there is NO iptables mangle table rule to FWMARK the packet
fwMark := fmt.Sprint(generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)))
err = nsc.ln.cleanupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), fwMark)
if err != nil {
glog.Errorf("Failed to verify and cleanup any mangle table rule to FMWARD the traffic to external IP due to " + err.Error())
continue
}
}
activeServiceEndpointMap[externalIpServiceId] = make([]string, 0)
for _, endpoint := range endpoints {
if !svc.local || (svc.local && endpoint.isLocal) {
activeServiceEndpointMap[externalIpServiceId] = append(activeServiceEndpointMap[externalIpServiceId], generateEndpointId(endpoint.ip, strconv.Itoa(endpoint.port)))
}
}
}
// add IPVS remote server to the IPVS service
for _, endpoint := range endpoints {
dst := ipvs.Destination{
Address: net.ParseIP(endpoint.ip),
AddressFamily: syscall.AF_INET,
Port: uint16(endpoint.port),
Weight: 1,
}
for _, externalIpService := range externalIpServices {
if svc.local && !endpoint.isLocal {
continue
}
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
dst.ConnectionFlags = ipvs.ConnectionFlagTunnel
}
// add server to IPVS service
err := nsc.ln.ipvsAddServer(externalIpService.ipvsSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
}
// For now just support IPVS tunnel mode, we can add other ways of DSR in future
if svc.directServerReturn && svc.directServerReturnMethod == "tunnel" {
podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip)
if err != nil {
glog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR")
continue
}
// we are only concerned with endpoint pod running on current node
if strings.Compare(podObj.Status.HostIP, nsc.nodeIP.String()) != 0 {
continue
}
containerID := strings.TrimPrefix(podObj.Status.ContainerStatuses[0].ContainerID, "docker://")
if containerID == "" {
glog.Errorf("Failed to find container id for the endpoint with ip: " + endpoint.ip + " so skipping peparing endpoint for DSR")
continue
}
err = nsc.ln.prepareEndpointForDsr(containerID, endpoint.ip, externalIpService.externalIp)
if err != nil {
glog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error())
}
}
}
}
}
return nil
}
func (nsc *NetworkServicesController) setupForDSR(serviceInfoMap serviceInfoMap) error {
glog.V(1).Infof("Setting up policy routing required for Direct Server Return functionality.")
err := nsc.ln.setupPolicyRoutingForDSR()
if err != nil {
return errors.New("Failed setup PBR for DSR due to: " + err.Error())
}
glog.V(1).Infof("Custom routing table " + customDSRRouteTableName + " required for Direct Server Return is setup as expected.")
glog.V(1).Infof("Setting up custom route table required to add routes for external IP's.")
err = nsc.ln.setupRoutesForExternalIPForDSR(serviceInfoMap)
if err != nil {
glog.Errorf("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
return errors.New("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error())
}
glog.V(1).Infof("Custom routing table " + externalIPRouteTableName + " required for Direct Server Return is setup as expected.")
return nil
}
func (nsc *NetworkServicesController) cleanupStaleVIPs(activeServiceEndpointMap map[string][]string) error {
// cleanup stale IPs on dummy interface
glog.V(1).Info("Cleaning up if any, old service IPs on dummy interface")
addrActive := make(map[string]bool)
for k := range activeServiceEndpointMap {
// verify active and its a generateIpPortId() type service
if strings.Contains(k, "-") {
parts := strings.SplitN(k, "-", 3)
addrActive[parts[0]] = true
}
}
dummyVipInterface, err := nsc.ln.getKubeDummyInterface()
if err != nil {
return errors.New("Failed creating dummy interface: " + err.Error())
}
var addrs []netlink.Addr
addrs, err = netlink.AddrList(dummyVipInterface, netlink.FAMILY_V4)
if err != nil {
return errors.New("Failed to list dummy interface IPs: " + err.Error())
}
for _, addr := range addrs {
isActive := addrActive[addr.IP.String()]
if !isActive {
glog.V(1).Infof("Found an IP %s which is no longer needed so cleaning up", addr.IP.String())
err := nsc.ln.ipAddrDel(dummyVipInterface, addr.IP.String())
if err != nil {
glog.Errorf("Failed to delete stale IP %s due to: %s",
addr.IP.String(), err.Error())
continue
}
}
}
return nil
}
func (nsc *NetworkServicesController) cleanupStaleIPVSConfig(activeServiceEndpointMap map[string][]string) error {
ipvsSvcs, err := nsc.ln.ipvsGetServices()
if err != nil {
return errors.New("Failed get list of IPVS services due to: " + err.Error())
}
// cleanup stale ipvs service and servers
glog.V(1).Info("Cleaning up if any, old ipvs service and servers which are no longer needed")
if err != nil {
return errors.New("Failed to list IPVS services: " + err.Error())
}
var protocol string
for _, ipvsSvc := range ipvsSvcs {
if ipvsSvc.Protocol == syscall.IPPROTO_TCP {
protocol = "tcp"
} else {
protocol = "udp"
}
var key string
if ipvsSvc.Address != nil {
key = generateIpPortId(ipvsSvc.Address.String(), protocol, strconv.Itoa(int(ipvsSvc.Port)))
} else if ipvsSvc.FWMark != 0 {
key = fmt.Sprint(ipvsSvc.FWMark)
} else {
continue
}
endpointIds, ok := activeServiceEndpointMap[key]
// Only delete the service if it's not there anymore to prevent flapping
// old: if !ok || len(endpointIds) == 0 {
if !ok {
excluded := false
for _, excludedCidr := range nsc.excludedCidrs {
if excludedCidr.Contains(ipvsSvc.Address) {
excluded = true
break
}
}
if excluded {
glog.V(1).Infof("Ignoring deletion of an IPVS service %s in an excluded cidr",
ipvsServiceString(ipvsSvc))
continue
}
glog.V(1).Infof("Found a IPVS service %s which is no longer needed so cleaning up",
ipvsServiceString(ipvsSvc))
err := nsc.ln.ipvsDelService(ipvsSvc)
if err != nil {
glog.Errorf("Failed to delete stale IPVS service %s due to: %s",
ipvsServiceString(ipvsSvc), err.Error())
continue
}
} else {
dsts, err := nsc.ln.ipvsGetDestinations(ipvsSvc)
if err != nil {
glog.Errorf("Failed to get list of servers from ipvs service")
}
for _, dst := range dsts {
validEp := false
for _, epId := range endpointIds {
if epId == generateEndpointId(dst.Address.String(), strconv.Itoa(int(dst.Port))) {
validEp = true
break
}
}
if !validEp {
glog.V(1).Infof("Found a destination %s in service %s which is no longer needed so cleaning up",
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
err = nsc.ipvsDeleteDestination(ipvsSvc, dst)
if err != nil {
glog.Errorf("Failed to delete destination %s from ipvs service %s",
ipvsDestinationString(dst), ipvsServiceString(ipvsSvc))
}
}
}
}
}
return nil
}