diff --git a/.golangci.yml b/.golangci.yml index 61058df1..b05badbc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -19,6 +19,7 @@ linters: - gosimple - govet - ineffassign + - lll - misspell issues: exclude-rules: @@ -47,6 +48,10 @@ issues: - text: "G306:" linters: - gosec + # Exlude tests from long line linter + - linters: + - lll + path: _test\.go # always show all issues rather than only showing 50 at a time max-issues-per-linter: 0 # always show all issues of a type rather than showing 3 diff --git a/docs/user-guide.md b/docs/user-guide.md index ac3a3f23..ccdfc8d4 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -48,7 +48,7 @@ Usage of kube-router: --bgp-graceful-restart Enables the BGP Graceful Restart capability so that routes are preserved on unexpected restarts --bgp-graceful-restart-deferral-time duration BGP Graceful restart deferral time according to RFC4724 4.1, maximum 18h. (default 6m0s) --bgp-graceful-restart-time duration BGP Graceful restart time according to RFC4724 3, maximum 4095s. (default 1m30s) - --bgp-holdtime duration This parameter is mainly used to modify the holdtime declared to BGP peer. When Kube-router goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 18h12m16s. (default 1m30s) + --bgp-holdtime duration This parameter is mainly used to modify the holdtime declared to BGP peer. When Kube-router goes down abnormally, the local saving time of BGP route will be affected. Holdtime must be in the range 3s to 18h12m16s. (default 1m30s) --bgp-port uint32 The port open for incoming BGP connections and to use for connecting with other BGP peers. (default 179) --cache-sync-timeout duration The timeout for cache synchronization (e.g. '5s', '1m'). Must be greater than 0. (default 1m0s) --cleanup-config Cleanup iptables rules, ipvs, ipset configuration and exit. diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index 0398a941..f66ba774 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -211,7 +211,8 @@ func (kr *KubeRouter) Run() error { } // CacheSyncOrTimeout performs cache synchronization under timeout limit -func (kr *KubeRouter) CacheSyncOrTimeout(informerFactory informers.SharedInformerFactory, stopCh <-chan struct{}) error { +func (kr *KubeRouter) CacheSyncOrTimeout(informerFactory informers.SharedInformerFactory, + stopCh <-chan struct{}) error { syncOverCh := make(chan struct{}) go func() { informerFactory.WaitForCacheSync(stopCh) diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index 2a0d2b11..71af5bdd 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -42,7 +42,11 @@ const ( ) var ( - defaultChains = map[string]string{"INPUT": kubeInputChainName, "FORWARD": kubeForwardChainName, "OUTPUT": kubeOutputChainName} + defaultChains = map[string]string{ + "INPUT": kubeInputChainName, + "FORWARD": kubeForwardChainName, + "OUTPUT": kubeOutputChainName, + } ) // Network policy controller provides both ingress and egress filtering for the pods as per the defined network @@ -146,7 +150,8 @@ type protocol2eps map[string]numericPort2eps type namedPort2eps map[string]protocol2eps // Run runs forever till we receive notification on stopCh -func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, + wg *sync.WaitGroup) { t := time.NewTicker(npc.syncPeriod) defer t.Stop() defer wg.Done() @@ -157,7 +162,8 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle // setup kube-router specific top level custom chains (KUBE-ROUTER-INPUT, KUBE-ROUTER-FORWARD, KUBE-ROUTER-OUTPUT) npc.ensureTopLevelChains() - // setup default network policy chain that is applied to traffic from/to the pods that does not match any network policy + // setup default network policy chain that is applied to traffic from/to the pods that does not match any network + // policy npc.ensureDefaultNetworkPolicyChain() // Full syncs of the network policy controller take a lot of time and can only be processed one at a time, @@ -234,7 +240,8 @@ func (npc *NetworkPolicyController) fullPolicySync() { // ensure kube-router specific top level chains and corresponding rules exist npc.ensureTopLevelChains() - // ensure default network policy chain that is applied to traffic from/to the pods that does not match any network policy + // ensure default network policy chain that is applied to traffic from/to the pods that does not match any network + // policy npc.ensureDefaultNetworkPolicyChain() networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo() @@ -272,7 +279,8 @@ func (npc *NetworkPolicyController) fullPolicySync() { } if err := utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil { - klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", err.Error(), npc.filterTableRules.String()) + klog.Errorf("Aborting sync. Failed to run iptables-restore: %v\n%s", + err.Error(), npc.filterTableRules.String()) return } @@ -308,7 +316,8 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { return encoded, nil } } - return "", fmt.Errorf("could not find a comment in the ruleSpec string given: %s", strings.Join(*ruleSpec, " ")) + return "", fmt.Errorf("could not find a comment in the ruleSpec string given: %s", + strings.Join(*ruleSpec, " ")) } ensureRuleAtPosition := func(chain string, ruleSpec []string, uuid string, position int) { @@ -363,7 +372,8 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { if !exists { err = iptablesCmdHandler.NewChain("filter", customChain) if err != nil { - klog.Fatalf("failed to run iptables command to create %s chain due to %s", customChain, err.Error()) + klog.Fatalf("failed to run iptables command to create %s chain due to %s", customChain, + err.Error()) } } args := []string{"-m", "comment", "--comment", "kube-router netpol", "-j", customChain} @@ -374,14 +384,16 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { ensureRuleAtPosition(builtinChain, args, uuid, 1) } - whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", npc.serviceClusterIPRange.String(), "-j", "RETURN"} + whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to cluster IP", "-d", + npc.serviceClusterIPRange.String(), "-j", "RETURN"} uuid, err := addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) if err != nil { klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) } ensureRuleAtPosition(kubeInputChainName, whitelistServiceVips, uuid, serviceVIPPosition) - whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", "allow LOCAL TCP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", + whitelistTCPNodeports := []string{"-p", "tcp", "-m", "comment", "--comment", + "allow LOCAL TCP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistTCPNodeports) if err != nil { @@ -389,7 +401,8 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { } ensureRuleAtPosition(kubeInputChainName, whitelistTCPNodeports, uuid, whitelistTCPNodePortsPosition) - whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", "allow LOCAL UDP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", + whitelistUDPNodeports := []string{"-p", "udp", "-m", "comment", "--comment", + "allow LOCAL UDP traffic to node ports", "-m", "addrtype", "--dst-type", "LOCAL", "-m", "multiport", "--dports", npc.serviceNodePortRange, "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistUDPNodeports) if err != nil { @@ -398,7 +411,9 @@ func (npc *NetworkPolicyController) ensureTopLevelChains() { ensureRuleAtPosition(kubeInputChainName, whitelistUDPNodeports, uuid, whitelistUDPNodePortsPosition) for externalIPIndex, externalIPRange := range npc.serviceExternalIPRanges { - whitelistServiceVips := []string{"-m", "comment", "--comment", "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), "-j", "RETURN"} + whitelistServiceVips := []string{"-m", "comment", "--comment", + "allow traffic to external IP range: " + externalIPRange.String(), "-d", externalIPRange.String(), + "-j", "RETURN"} uuid, err = addUUIDForRuleSpec(kubeInputChainName, &whitelistServiceVips) if err != nil { klog.Fatalf("Failed to get uuid for rule: %s", err.Error()) @@ -412,7 +427,8 @@ func (npc *NetworkPolicyController) ensureExplicitAccept() { // authoritative entity to ACCEPT the traffic if it complies to network policies for _, chain := range defaultChains { comment := "\"rule to explicitly ACCEPT traffic that comply to network policies\"" - args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", "-j", "ACCEPT"} + args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", "0x20000/0x20000", + "-j", "ACCEPT"} npc.filterTableRules = utils.AppendUnique(npc.filterTableRules, chain, args) } } @@ -427,7 +443,8 @@ func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { markArgs := make([]string, 0) markComment := "rule to mark traffic matching a network policy" - markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, "--set-xmark", "0x10000/0x10000") + markArgs = append(markArgs, "-j", "MARK", "-m", "comment", "--comment", markComment, + "--set-xmark", "0x10000/0x10000") exists, err := iptablesCmdHandler.ChainExists("filter", kubeDefaultNetpolChain) if err != nil { @@ -436,7 +453,8 @@ func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { if !exists { err = iptablesCmdHandler.NewChain("filter", kubeDefaultNetpolChain) if err != nil { - klog.Fatalf("failed to run iptables command to create %s chain due to %s", kubeDefaultNetpolChain, err.Error()) + klog.Fatalf("failed to run iptables command to create %s chain due to %s", + kubeDefaultNetpolChain, err.Error()) } } err = iptablesCmdHandler.AppendUnique("filter", kubeDefaultNetpolChain, markArgs...) @@ -445,7 +463,8 @@ func (npc *NetworkPolicyController) ensureDefaultNetworkPolicyChain() { } } -func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool, deleteDefaultChains bool) error { +func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool, + deleteDefaultChains bool) error { cleanupPodFwChains := make([]string, 0) cleanupPolicyChains := make([]string, 0) @@ -499,7 +518,8 @@ func (npc *NetworkPolicyController) cleanupStaleRules(activePolicyChains, active } } if deleteDefaultChains { - for _, chain := range []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName, kubeDefaultNetpolChain} { + for _, chain := range []string{kubeInputChainName, kubeForwardChainName, kubeOutputChainName, + kubeDefaultNetpolChain} { if strings.Contains(rule, chain) { skipRule = true break @@ -590,7 +610,8 @@ func (npc *NetworkPolicyController) Cleanup() { // Restore (iptables-restore) npc's cleaned up version of the iptables filter chain if err = utils.Restore("filter", npc.filterTableRules.Bytes()); err != nil { klog.Errorf( - "error encountered while loading running iptables-restore: %v\n%s", err, npc.filterTableRules.String()) + "error encountered while loading running iptables-restore: %v\n%s", err, + npc.filterTableRules.String()) } // Cleanup ipsets @@ -606,7 +627,8 @@ func (npc *NetworkPolicyController) Cleanup() { // NewNetworkPolicyController returns new NetworkPolicyController object func NewNetworkPolicyController(clientset kubernetes.Interface, config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer, - npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) { + npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, + ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) { npc := NetworkPolicyController{ipsetMutex: ipsetMutex} // Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time, @@ -630,7 +652,8 @@ func NewNetworkPolicyController(clientset kubernetes.Interface, for _, externalIPRange := range config.ExternalIPCIDRs { _, ipnet, err := net.ParseCIDR(externalIPRange) if err != nil { - return nil, fmt.Errorf("failed to get parse --service-external-ip-range parameter: '%s'. Error: %s", externalIPRange, err.Error()) + return nil, fmt.Errorf("failed to get parse --service-external-ip-range parameter: '%s'. Error: %s", + externalIPRange, err.Error()) } npc.serviceExternalIPRanges = append(npc.serviceExternalIPRanges, *ipnet) } diff --git a/pkg/controllers/netpol/pod.go b/pkg/controllers/netpol/pod.go index df14d591..0846923e 100644 --- a/pkg/controllers/netpol/pod.go +++ b/pkg/controllers/netpol/pod.go @@ -34,8 +34,9 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand } // We don't check isNetPolActionable here, because if it is transitioning in or out of the actionable state - // we want to run the full sync so that it can be added or removed from the existing network policy of the host - // For the network policies, we are only interested in some changes, most pod changes aren't relevant to network policy + // we want to run the full sync so that it can be added or removed from the existing network policy of the + // host. For the network policies, we are only interested in some changes, most pod changes aren't relevant + // to network policy if isPodUpdateNetPolRelevant(oldPodObj, newPodObj) { npc.OnPodUpdate(newObj) } @@ -72,15 +73,19 @@ func (npc *NetworkPolicyController) handlePodDelete(obj interface{}) { npc.RequestFullSync() } -func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []networkPolicyInfo, version string) (map[string]bool, error) { +func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo []networkPolicyInfo, + version string) (map[string]bool, error) { activePodFwChains := make(map[string]bool) dropUnmarkedTrafficRules := func(podName, podNamespace, podFwChainName string) error { // add rule to log the packets that will be dropped due to network policy enforcement comment := "\"rule to log dropped traffic POD name:" + podName + " namespace: " + podNamespace + "\"" - args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"} - // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop unmarked for this chain already + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "NFLOG", + "--nflog-group", "100", "-m", "limit", "--limit", "10/minute", "--limit-burst", "10", "\n"} + // This used to be AppendUnique when we were using iptables directly, this checks to make sure we didn't drop + // unmarked for this chain already if strings.Contains(npc.filterTableRules.String(), strings.Join(args, " ")) { return nil } @@ -88,7 +93,8 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] // add rule to DROP if no applicable network policy permits the traffic comment = "\"rule to REJECT traffic destined for POD name:" + podName + " namespace: " + podNamespace + "\"" - args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"} + args = []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-m", "mark", "!", "--mark", "0x10000/0x10000", "-j", "REJECT", "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) // reset mark to let traffic pass through rest of the chains @@ -128,7 +134,8 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] // set mark to indicate traffic from/to the pod passed network policies. // Mark will be checked to explicitly ACCEPT the traffic comment := "\"set mark to ACCEPT traffic that comply to network policies\"" - args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"} + args := []string{"-A", podFwChainName, "-m", "comment", "--comment", comment, + "-j", "MARK", "--set-mark", "0x20000/0x20000", "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) } @@ -136,7 +143,8 @@ func (npc *NetworkPolicyController) syncPodFirewallChains(networkPoliciesInfo [] } // setup rules to jump to applicable network policy chains for the traffic from/to the pod -func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainName string, networkPoliciesInfo []networkPolicyInfo, version string) { +func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainName string, + networkPoliciesInfo []networkPolicyInfo, version string) { hasIngressPolicy := false hasEgressPolicy := false @@ -153,13 +161,16 @@ func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainN case kubeBothPolicyType: hasIngressPolicy = true hasEgressPolicy = true - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} case kubeIngressPolicyType: hasIngressPolicy = true - args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + args = []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} case kubeEgressPolicyType: hasEgressPolicy = true - args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", policyChainName, "\n"} + args = []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, + "-j", policyChainName, "\n"} } npc.filterTableRules.WriteString(strings.Join(args, " ")) } @@ -168,7 +179,8 @@ func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainN // then apply default network policy if !hasIngressPolicy { comment := "\"run through default ingress network policy chain\"" - args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"} + args := []string{"-I", podFwChainName, "1", "-d", pod.ip, "-m", "comment", "--comment", comment, + "-j", kubeDefaultNetpolChain, "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) } @@ -176,17 +188,20 @@ func (npc *NetworkPolicyController) setupPodNetpolRules(pod podInfo, podFwChainN // then apply default network policy if !hasEgressPolicy { comment := "\"run through default egress network policy chain\"" - args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, "-j", kubeDefaultNetpolChain, "\n"} + args := []string{"-I", podFwChainName, "1", "-s", pod.ip, "-m", "comment", "--comment", comment, + "-j", kubeDefaultNetpolChain, "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) } comment := "\"rule to permit the traffic traffic to pods when source is the pod's local node\"" - args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} + args := []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-m", "addrtype", "--src-type", "LOCAL", "-d", pod.ip, "-j", "ACCEPT", "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) // ensure statefull firewall that permits RELATED,ESTABLISHED traffic from/to the pod comment = "\"rule for stateful firewall for pod\"" - args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} + args = []string{"-I", podFwChainName, "1", "-m", "comment", "--comment", comment, + "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) } @@ -196,12 +211,14 @@ func (npc *NetworkPolicyController) interceptPodInboundTraffic(pod podInfo, podF // this rule applies to the traffic getting routed (coming for other node pods) comment := "\"rule to jump traffic destined to POD name:" + pod.name + " namespace: " + pod.namespace + " to chain " + podFwChainName + "\"" - args := []string{"-A", kubeForwardChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + args := []string{"-A", kubeForwardChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, + "-j", podFwChainName + "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) // ensure there is rule in filter table and OUTPUT chain to jump to pod specific firewall chain // this rule applies to the traffic from a pod getting routed back to another pod on same node by service proxy - args = []string{"-A", kubeOutputChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, "-j", podFwChainName + "\n"} + args = []string{"-A", kubeOutputChainName, "-m", "comment", "--comment", comment, "-d", pod.ip, + "-j", podFwChainName + "\n"} npc.filterTableRules.WriteString(strings.Join(args, " ")) // ensure there is rule in filter table and forward chain to jump to pod specific firewall chain diff --git a/pkg/controllers/netpol/policy.go b/pkg/controllers/netpol/policy.go index d4851d05..bdd2d1c4 100644 --- a/pkg/controllers/netpol/policy.go +++ b/pkg/controllers/netpol/policy.go @@ -68,7 +68,8 @@ func (npc *NetworkPolicyController) handleNetworkPolicyDelete(obj interface{}) { // is used for matching destination ip address. Each ingress rule in the network // policyspec is evaluated to set of matching pods, which are grouped in to a // ipset used for source ip addr matching. -func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo []networkPolicyInfo, version string) (map[string]bool, map[string]bool, error) { +func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo []networkPolicyInfo, + version string) (map[string]bool, map[string]bool, error) { start := time.Now() defer func() { endTime := time.Since(start) @@ -197,7 +198,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcPodIPSetName, targetDestPodIPSetName, + "", "", ""); err != nil { return err } } @@ -209,7 +211,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, + portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -221,7 +224,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", namedPortIPSetName, + eps.protocol, eps.port, eps.endport); err != nil { return err } } @@ -232,7 +236,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllSource && ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from all sources to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, "", targetDestPodIPSetName, + "", "", ""); err != nil { return err } } @@ -246,7 +251,9 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo for _, portProtocol := range ingressRule.ports { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, + targetDestPodIPSetName, portProtocol.protocol, portProtocol.port, + portProtocol.endport); err != nil { return err } } @@ -258,7 +265,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, + namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { return err } } @@ -266,7 +274,8 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo if ingressRule.matchAllPorts { comment := "rule to ACCEPT traffic from specified ipBlocks to dest pods selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, targetDestPodIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, srcIPBlockIPSetName, + targetDestPodIPSetName, "", "", ""); err != nil { return err } } @@ -317,7 +326,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + namedPortIPSetName, eps.protocol, eps.port, eps.endport); err != nil { return err } } @@ -329,7 +339,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, // so match on specified source and destination ip with all port and protocol comment := "rule to ACCEPT traffic from source pods to dest pods selected by policy name " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstPodIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + dstPodIPSetName, "", "", ""); err != nil { return err } } @@ -341,14 +352,16 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } for _, portProtocol := range egressRule.namedPorts { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + "", portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { return err } } @@ -359,7 +372,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllDestinations && egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to all destinations selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, "", "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + "", "", "", ""); err != nil { return err } } @@ -372,7 +386,9 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, for _, portProtocol := range egressRule.ports { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, portProtocol.endport); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + dstIPBlockIPSetName, portProtocol.protocol, portProtocol.port, + portProtocol.endport); err != nil { return err } } @@ -380,7 +396,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, if egressRule.matchAllPorts { comment := "rule to ACCEPT traffic from source pods to specified ipBlocks selected by policy name: " + policy.name + " namespace " + policy.namespace - if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, dstIPBlockIPSetName, "", "", ""); err != nil { + if err := npc.appendRuleToPolicyChain(policyChainName, comment, targetSourcePodIPSetName, + dstIPBlockIPSetName, "", "", ""); err != nil { return err } } @@ -389,7 +406,8 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo, return nil } -func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, protocol, dPort, endDport string) error { +func (npc *NetworkPolicyController) appendRuleToPolicyChain(policyChainName, comment, srcIPSetName, dstIPSetName, + protocol, dPort, endDport string) error { args := make([]string, 0) args = append(args, "-A", policyChainName) @@ -523,7 +541,8 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI ingressRule.matchAllPorts = true } else { ingressRule.matchAllPorts = false - ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts(specIngressRule.Ports, namedPort2IngressEps) + ingressRule.ports, ingressRule.namedPorts = npc.processNetworkPolicyPorts( + specIngressRule.Ports, namedPort2IngressEps) } newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule) @@ -538,8 +557,9 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI // If this field is empty or missing in the spec, this rule matches all sources if len(specEgressRule.To) == 0 { egressRule.matchAllDestinations = true - // if rule.To is empty but rule.Ports not, we must try to grab NamedPort from pods that in same namespace, - // so that we can design iptables rule to describe "match all dst but match some named dst-port" egress rule + // if rule.To is empty but rule.Ports not, we must try to grab NamedPort from pods that in same + // namespace, so that we can design iptables rule to describe "match all dst but match some named + // dst-port" egress rule if policyRulePortsHasNamedPort(specEgressRule.Ports) { matchingPeerPods, _ := npc.ListPodsByNamespaceAndLabels(policy.Namespace, labels.Everything()) for _, peerPod := range matchingPeerPods { @@ -577,7 +597,8 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI egressRule.matchAllPorts = true } else { egressRule.matchAllPorts = false - egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts(specEgressRule.Ports, namedPort2EgressEps) + egressRule.ports, egressRule.namedPorts = npc.processNetworkPolicyPorts( + specEgressRule.Ports, namedPort2EgressEps) } newPolicy.egressRules = append(newPolicy.egressRules, egressRule) @@ -588,7 +609,8 @@ func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() ([]networkPolicyI return NetworkPolicies, nil } -func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, peer networking.NetworkPolicyPeer) ([]*api.Pod, error) { +func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy, + peer networking.NetworkPolicyPeer) ([]*api.Pod, error) { var matchingPods []*api.Pod matchingPods = make([]*api.Pod, 0) @@ -620,7 +642,8 @@ func (npc *NetworkPolicyController) evalPodPeer(policy *networking.NetworkPolicy return matchingPods, err } -func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { +func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networking.NetworkPolicyPort, + namedPort2eps namedPort2eps) (numericPorts []protocolAndPort, namedPorts []endPoints) { numericPorts, namedPorts = make([]protocolAndPort, 0), make([]endPoints, 0) for _, npPort := range npPorts { var protocol string @@ -649,7 +672,8 @@ func (npc *NetworkPolicyController) processNetworkPolicyPorts(npPorts []networki return } -func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, podSelector labels.Selector) (ret []*api.Pod, err error) { +func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, + podSelector labels.Selector) (ret []*api.Pod, err error) { podLister := listers.NewPodLister(npc.podLister) allMatchedNameSpacePods, err := podLister.Pods(namespace).List(podSelector) if err != nil { @@ -671,13 +695,15 @@ func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolic ipBlock := make([][]string, 0) if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil { if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") { - ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, []string{"128.0.0.0/1", utils.OptionTimeout, "0"}) + ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, + []string{"128.0.0.0/1", utils.OptionTimeout, "0"}) } else { ipBlock = append(ipBlock, []string{cidr, utils.OptionTimeout, "0"}) } for _, except := range peer.IPBlock.Except { if strings.HasSuffix(except, "/0") { - ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}) + ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, + []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}) } else { ipBlock = append(ipBlock, []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch}) } @@ -757,13 +783,15 @@ func policyIndexedDestinationIPBlockIPSetName(namespace, policyName string, egre } func policyIndexedIngressNamedPortIPSetName(namespace, policyName string, ingressRuleNo, namedPortNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) + hash := sha256.Sum256([]byte(namespace + policyName + "ingressrule" + strconv.Itoa(ingressRuleNo) + + strconv.Itoa(namedPortNo) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } func policyIndexedEgressNamedPortIPSetName(namespace, policyName string, egressRuleNo, namedPortNo int) string { - hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + strconv.Itoa(namedPortNo) + "namedport")) + hash := sha256.Sum256([]byte(namespace + policyName + "egressrule" + strconv.Itoa(egressRuleNo) + + strconv.Itoa(namedPortNo) + "namedport")) encoded := base32.StdEncoding.EncodeToString(hash[:]) return kubeDestinationIPSetPrefix + encoded[:16] } diff --git a/pkg/controllers/netpol/utils.go b/pkg/controllers/netpol/utils.go index 086e2577..bf478505 100644 --- a/pkg/controllers/netpol/utils.go +++ b/pkg/controllers/netpol/utils.go @@ -18,7 +18,8 @@ const ( // finds a relevant change, it returns true otherwise it returns false. The things we care about for NetworkPolicies: // 1) Is the phase of the pod changing? (matters for catching completed, succeeded, or failed jobs) // 2) Is the pod IP changing? (changes how the network policy is applied to the host) -// 3) Is the pod's host IP changing? (should be caught in the above, with the CNI kube-router runs with but we check this as well for sanity) +// 3) Is the pod's host IP changing? (should be caught in the above, with the CNI kube-router runs with but we check +// this as well for sanity) // 4) Is a pod's label changing? (potentially changes which NetworkPolicies select this pod) func isPodUpdateNetPolRelevant(oldPod, newPod *api.Pod) bool { return newPod.Status.Phase != oldPod.Status.Phase || @@ -33,7 +34,8 @@ func isNetPolActionable(pod *api.Pod) bool { } func isFinished(pod *api.Pod) bool { - // nolint:exhaustive // We don't care about PodPending, PodRunning, PodUnknown here as we want those to fall into the false case + // nolint:exhaustive // We don't care about PodPending, PodRunning, PodUnknown here as we want those to fall + // into the false case switch pod.Status.Phase { case api.PodFailed, api.PodSucceeded, PodCompleted: return true @@ -46,7 +48,8 @@ func validateNodePortRange(nodePortOption string) (string, error) { nodePortValidator := regexp.MustCompile(`^([0-9]+)[:-]([0-9]+)$`) if matched := nodePortValidator.MatchString(nodePortOption); !matched { - return "", fmt.Errorf("failed to parse node port range given: '%s' please see specification in help text", nodePortOption) + return "", fmt.Errorf( + "failed to parse node port range given: '%s' please see specification in help text", nodePortOption) } matches := nodePortValidator.FindStringSubmatch(nodePortOption) if len(matches) != 3 { diff --git a/pkg/controllers/proxy/network_service_graceful.go b/pkg/controllers/proxy/network_service_graceful.go index 91417526..f2b2d4e3 100644 --- a/pkg/controllers/proxy/network_service_graceful.go +++ b/pkg/controllers/proxy/network_service_graceful.go @@ -60,9 +60,11 @@ func (nsc *NetworkServicesController) addToGracefulQueue(req *gracefulRequest) { defer nsc.gracefulQueue.mu.Unlock() var alreadyExists bool for _, jobQitem := range nsc.gracefulQueue.queue { - if jobQitem.ipvsSvc.Address.Equal(req.ipvsSvc.Address) && jobQitem.ipvsSvc.Port == req.ipvsSvc.Port && jobQitem.ipvsSvc.Protocol == req.ipvsSvc.Protocol { + if jobQitem.ipvsSvc.Address.Equal(req.ipvsSvc.Address) && + jobQitem.ipvsSvc.Port == req.ipvsSvc.Port && jobQitem.ipvsSvc.Protocol == req.ipvsSvc.Protocol { if jobQitem.ipvsDst.Address.Equal(req.ipvsDst.Address) && jobQitem.ipvsDst.Port == req.ipvsDst.Port { - klog.V(2).Infof("Endpoint already scheduled for removal %+v %+v %s", *req.ipvsSvc, *req.ipvsDst, req.gracefulTerminationPeriod.String()) + klog.V(2).Infof("Endpoint already scheduled for removal %+v %+v %s", + *req.ipvsSvc, *req.ipvsDst, req.gracefulTerminationPeriod.String()) alreadyExists = true break } @@ -72,11 +74,14 @@ func (nsc *NetworkServicesController) addToGracefulQueue(req *gracefulRequest) { // try to get get Termination grace period from the pod, if unsuccesfull use the default timeout podObj, err := nsc.getPodObjectForEndpoint(req.ipvsDst.Address.String()) if err != nil { - klog.V(1).Infof("Failed to find endpoint with ip: %s err: %s", req.ipvsDst.Address.String(), err.Error()) + klog.V(1).Infof("Failed to find endpoint with ip: %s err: %s", + req.ipvsDst.Address.String(), err.Error()) req.gracefulTerminationPeriod = nsc.gracefulPeriod } else { - klog.V(1).Infof("Found pod termination grace period %d for pod %s", *podObj.Spec.TerminationGracePeriodSeconds, podObj.Name) - req.gracefulTerminationPeriod = time.Duration(float64(*podObj.Spec.TerminationGracePeriodSeconds) * float64(time.Second)) + klog.V(1).Infof("Found pod termination grace period %d for pod %s", + *podObj.Spec.TerminationGracePeriodSeconds, podObj.Name) + req.gracefulTerminationPeriod = + time.Duration(float64(*podObj.Spec.TerminationGracePeriodSeconds) * float64(time.Second)) } nsc.gracefulQueue.queue = append(nsc.gracefulQueue.queue, *req) } @@ -86,7 +91,8 @@ func (nsc *NetworkServicesController) gracefulSync() { nsc.gracefulQueue.mu.Lock() defer nsc.gracefulQueue.mu.Unlock() var newQueue []gracefulRequest - // Itterate over our queued destination removals one by one, and don't add them back to the queue if they were processed + // Iterate over our queued destination removals one by one, and don't add them back to the queue if they were + // processed for _, job := range nsc.gracefulQueue.queue { if removed := nsc.gracefulDeleteIpvsDestination(job); removed { continue @@ -117,17 +123,20 @@ func (nsc *NetworkServicesController) gracefulDeleteIpvsDestination(req graceful if deleteDestination { klog.V(2).Infof("Deleting IPVS destination: %s", ipvsDestinationString(req.ipvsDst)) if err := nsc.ln.ipvsDelDestination(req.ipvsSvc, req.ipvsDst); err != nil { - klog.Errorf("Failed to delete IPVS destination: %s, %s", ipvsDestinationString(req.ipvsDst), err.Error()) + klog.Errorf("Failed to delete IPVS destination: %s, %s", + ipvsDestinationString(req.ipvsDst), err.Error()) } } return deleteDestination } // getConnStats returns the number of active & inactive connections for the IPVS destination -func (nsc *NetworkServicesController) getIpvsDestinationConnStats(ipvsSvc *ipvs.Service, dest *ipvs.Destination) (int, int, error) { +func (nsc *NetworkServicesController) getIpvsDestinationConnStats(ipvsSvc *ipvs.Service, + dest *ipvs.Destination) (int, int, error) { destStats, err := nsc.ln.ipvsGetDestinations(ipvsSvc) if err != nil { - return 0, 0, fmt.Errorf("failed to get IPVS destinations for service : %s : %s", ipvsServiceString(ipvsSvc), err.Error()) + return 0, 0, fmt.Errorf("failed to get IPVS destinations for service : %s : %s", + ipvsServiceString(ipvsSvc), err.Error()) } for _, destStat := range destStats { @@ -135,12 +144,14 @@ func (nsc *NetworkServicesController) getIpvsDestinationConnStats(ipvsSvc *ipvs. return destStat.ActiveConnections, destStat.InactiveConnections, nil } } - return 0, 0, fmt.Errorf("destination %s not found on IPVS service %s ", ipvsDestinationString(dest), ipvsServiceString(ipvsSvc)) + return 0, 0, fmt.Errorf("destination %s not found on IPVS service %s ", + ipvsDestinationString(dest), ipvsServiceString(ipvsSvc)) } // flushConntrackUDP flushes UDP conntrack records for the given service destination func (nsc *NetworkServicesController) flushConntrackUDP(svc *ipvs.Service) error { - // Conntrack exits with non zero exit code when exiting if 0 flow entries have been deleted, use regex to check output and don't Error when matching + // Conntrack exits with non zero exit code when exiting if 0 flow entries have been deleted, use regex to + // check output and don't Error when matching re := regexp.MustCompile("([[:space:]]0 flow entries have been deleted.)") // Shell out and flush conntrack records @@ -149,7 +160,8 @@ func (nsc *NetworkServicesController) flushConntrackUDP(svc *ipvs.Service) error "--dport", strconv.Itoa(int(svc.Port))).CombinedOutput() if err != nil { if matched := re.MatchString(string(out)); !matched { - return fmt.Errorf("failed to delete conntrack entry for endpoint: %s:%d due to %s", svc.Address.String(), svc.Port, err.Error()) + return fmt.Errorf("failed to delete conntrack entry for endpoint: %s:%d due to %s", + svc.Address.String(), svc.Port, err.Error()) } } klog.V(1).Infof("Deleted conntrack entry for endpoint: %s:%d", svc.Address.String(), svc.Port) diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 49adc898..ffe5dc7c 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -96,7 +96,8 @@ var ( type ipvsCalls interface { ipvsNewService(ipvsSvc *ipvs.Service) error - ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, + persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) ipvsDelService(ipvsSvc *ipvs.Service) error ipvsUpdateService(ipvsSvc *ipvs.Service) error ipvsGetServices() ([]*ipvs.Service, error) @@ -105,7 +106,8 @@ type ipvsCalls interface { ipvsUpdateDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error ipvsGetDestinations(ipvsSvc *ipvs.Service) ([]*ipvs.Destination, error) ipvsDelDestination(ipvsSvc *ipvs.Service, ipvsDst *ipvs.Destination) error - ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) + ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, + scheduler string, flags schedFlags) (*ipvs.Service, error) } type netlinkCalls interface { @@ -115,7 +117,8 @@ type netlinkCalls interface { getKubeDummyInterface() (netlink.Link, error) setupRoutesForExternalIPForDSR(serviceInfoMap) error prepareEndpointForDsrWithCRI(runtimeEndpoint, containerID, endpointIP, vip string) error - configureContainerForDSR(vip, endpointIP, containerID string, pid int, hostNetworkNamespaceHandle netns.NsHandle) error + configureContainerForDSR(vip, endpointIP, containerID string, pid int, + hostNetworkNamespaceHandle netns.NsHandle) error setupPolicyRoutingForDSR() error cleanupMangleTableRule(ip string, protocol string, port string, fwmark string, tcpMSS int) error } @@ -132,7 +135,9 @@ type linuxNetworking struct { } func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string) error { - naddr := &netlink.Addr{IPNet: &net.IPNet{IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} + naddr := &netlink.Addr{IPNet: &net.IPNet{ + IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255), + }, Scope: syscall.RT_SCOPE_LINK} err := netlink.AddrDel(iface, naddr) if err != nil && err.Error() != IfaceHasNoAddr { klog.Errorf("Failed to verify is external ip %s is assocated with dummy interface %s due to %s", @@ -140,10 +145,12 @@ func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string) error { } // Delete VIP addition to "local" rt table also, fail silently if not found (DSR special case) if err == nil { - out, err := exec.Command("ip", "route", "delete", "local", ip, "dev", KubeDummyIf, "table", "local", "proto", "kernel", "scope", "host", "src", + out, err := exec.Command("ip", "route", "delete", "local", ip, "dev", KubeDummyIf, + "table", "local", "proto", "kernel", "scope", "host", "src", NodeIP.String(), "table", "local").CombinedOutput() if err != nil && !strings.Contains(string(out), "No such process") { - klog.Errorf("Failed to delete route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KubeDummyIf, err, out) + klog.Errorf("Failed to delete route to service VIP %s configured on %s. Error: %v, Output: %s", + ip, KubeDummyIf, err, out) } } return err @@ -153,7 +160,9 @@ func (ln *linuxNetworking) ipAddrDel(iface netlink.Link, ip string) error { // to kube-dummy-if. Also when DSR is used, used to assign VIP to dummy interface // inside the container. func (ln *linuxNetworking) ipAddrAdd(iface netlink.Link, ip string, addRoute bool) error { - naddr := &netlink.Addr{IPNet: &net.IPNet{IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK} + naddr := &netlink.Addr{IPNet: &net.IPNet{ + IP: net.ParseIP(ip), Mask: net.IPv4Mask(255, 255, 255, 255), + }, Scope: syscall.RT_SCOPE_LINK} err := netlink.AddrAdd(iface, naddr) if err != nil && err.Error() != IfaceHasAddr { klog.Errorf("Failed to assign cluster ip %s to dummy interface: %s", @@ -172,10 +181,12 @@ func (ln *linuxNetworking) ipAddrAdd(iface netlink.Link, ip string, addRoute boo // TODO: netlink.RouteReplace which is replacement for below command is not working as expected. Call succeeds but // route is not replaced. For now do it with command. - out, err := exec.Command("ip", "route", "replace", "local", ip, "dev", KubeDummyIf, "table", "local", "proto", "kernel", "scope", "host", "src", + out, err := exec.Command("ip", "route", "replace", "local", ip, "dev", KubeDummyIf, + "table", "local", "proto", "kernel", "scope", "host", "src", NodeIP.String(), "table", "local").CombinedOutput() if err != nil { - klog.Errorf("Failed to replace route to service VIP %s configured on %s. Error: %v, Output: %s", ip, KubeDummyIf, err, out) + klog.Errorf("Failed to replace route to service VIP %s configured on %s. Error: %v, Output: %s", + ip, KubeDummyIf, err, out) } return nil } @@ -318,7 +329,8 @@ type endpointsInfo struct { type endpointsInfoMap map[string][]endpointsInfo // Run periodically sync ipvs configuration to reflect desired state of services and endpoints -func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, + stopCh <-chan struct{}, wg *sync.WaitGroup) { t := time.NewTicker(nsc.syncPeriod) defer t.Stop() defer wg.Done() @@ -657,7 +669,7 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() { ipvsFirewallInputChainRule := getIpvsFirewallInputChainRule() exists, err := iptablesCmdHandler.Exists("filter", "INPUT", ipvsFirewallInputChainRule...) if err != nil { - // Changing to level 1 logging as errors occur when ipsets have already been cleaned and needlessly worries users + // Changed to level 1 as errors occur when ipsets have already been cleaned and needlessly worries users klog.V(1).Infof("failed to check if iptables rules exists: %v", err) } else if exists { err = iptablesCmdHandler.Delete("filter", "INPUT", ipvsFirewallInputChainRule...) @@ -867,7 +879,8 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM if pushMetric { - klog.V(3).Infof("Publishing metrics for %s/%s (%s:%d/%s)", svc.namespace, svc.name, svcVip, svc.port, svc.protocol) + klog.V(3).Infof("Publishing metrics for %s/%s (%s:%d/%s)", + svc.namespace, svc.name, svcVip, svc.port, svc.protocol) labelValues := []string{ svc.namespace, @@ -950,7 +963,9 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *api.Endpoints) { defer nsc.mu.Unlock() klog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) if !nsc.readyForUpdates { - klog.V(3).Infof("Skipping update to endpoint: %s/%s as controller is not ready to process service and endpoints updates", ep.Namespace, ep.Name) + klog.V(3).Infof( + "Skipping update to endpoint: %s/%s as controller is not ready to process service and endpoints updates", + ep.Namespace, ep.Name) return } @@ -968,7 +983,8 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *api.Endpoints) { return } if utils.ServiceIsHeadless(svc) { - klog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", ep.Namespace, ep.Name) + klog.V(1).Infof("The service associated with endpoint: %s/%s is headless, skipping...", + ep.Namespace, ep.Name) return } @@ -982,7 +998,8 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(ep *api.Endpoints) { klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", ep.Namespace, ep.Name) nsc.sync(synctypeIpvs) } else { - klog.V(1).Infof("Skipping IPVS services sync on endpoint: %s/%s update as nothing changed", ep.Namespace, ep.Name) + klog.V(1).Infof("Skipping IPVS services sync on endpoint: %s/%s update as nothing changed", + ep.Namespace, ep.Name) } } @@ -994,7 +1011,9 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) { klog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name) if !nsc.readyForUpdates { - klog.V(3).Infof("Skipping update to service: %s/%s as controller is not ready to process service and endpoints updates", svc.Namespace, svc.Name) + klog.V(3).Infof( + "Skipping update to service: %s/%s as controller is not ready to process service and endpoints updates", + svc.Namespace, svc.Name) return } @@ -1017,7 +1036,8 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *api.Service) { klog.V(1).Infof("Syncing IPVS services sync on update to service: %s/%s", svc.Namespace, svc.Name) nsc.sync(synctypeIpvs) } else { - klog.V(1).Infof("Skipping syncing IPVS services for update to service: %s/%s as nothing changed", svc.Namespace, svc.Name) + klog.V(1).Infof("Skipping syncing IPVS services for update to service: %s/%s as nothing changed", + svc.Namespace, svc.Name) } } @@ -1138,12 +1158,14 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { svc := obj.(*api.Service) if utils.ClusterIPIsNoneOrBlank(svc.Spec.ClusterIP) { - klog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", svc.Name, svc.Namespace) + klog.V(2).Infof("Skipping service name:%s namespace:%s as there is no cluster IP", + svc.Name, svc.Namespace) continue } if svc.Spec.Type == "ExternalName" { - klog.V(2).Infof("Skipping service name:%s namespace:%s due to service Type=%s", svc.Name, svc.Namespace, svc.Spec.Type) + klog.V(2).Infof("Skipping service name:%s namespace:%s due to service Type=%s", + svc.Name, svc.Namespace, svc.Spec.Type) continue } @@ -1195,7 +1217,8 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap { svcInfo.sessionAffinity = svc.Spec.SessionAffinity == api.ServiceAffinityClientIP if svcInfo.sessionAffinity { - // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP" + // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity + // type is ClientIP // https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/v1/defaults.go#L106 svcInfo.sessionAffinityTimeoutSeconds = *svc.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds } @@ -1274,7 +1297,8 @@ func (nsc *NetworkServicesController) ensureMasqueradeIptablesRule() error { if err != nil { return errors.New("Failed to initialize iptables executor" + err.Error()) } - var args = []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", "-j", "SNAT", "--to-source", nsc.nodeIP.String()} + var args = []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", + "-m", "comment", "--comment", "", "-j", "SNAT", "--to-source", nsc.nodeIP.String()} if iptablesCmdHandler.HasRandomFully() { args = append(args, "--random-fully") } @@ -1299,8 +1323,9 @@ func (nsc *NetworkServicesController) ensureMasqueradeIptablesRule() error { } if len(nsc.podCidr) > 0 { // TODO: ipset should be used for destination podCidr(s) match after multiple podCidr(s) per node get supported - args = []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", - "!", "-s", nsc.podCidr, "!", "-d", nsc.podCidr, "-j", "SNAT", "--to-source", nsc.nodeIP.String()} + args = []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", + "-m", "comment", "--comment", "", "!", "-s", nsc.podCidr, "!", "-d", nsc.podCidr, + "-j", "SNAT", "--to-source", nsc.nodeIP.String()} if iptablesCmdHandler.HasRandomFully() { args = append(args, "--random-fully") } @@ -1322,16 +1347,20 @@ func (nsc *NetworkServicesController) deleteBadMasqueradeIptablesRules() error { } var argsBad = [][]string{ - {"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", "-j", "MASQUERADE"}, - {"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", "!", "-s", nsc.podCidr, "!", "-d", nsc.podCidr, "-j", "MASQUERADE"}, + {"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", + "-j", "MASQUERADE"}, + {"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", + "!", "-s", nsc.podCidr, "!", "-d", nsc.podCidr, "-j", "MASQUERADE"}, } // If random fully is supported remove the original rules as well if iptablesCmdHandler.HasRandomFully() { - argsBad = append(argsBad, []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", "-j", "SNAT", "--to-source", nsc.nodeIP.String()}) + argsBad = append(argsBad, []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", + "-m", "comment", "--comment", "", "-j", "SNAT", "--to-source", nsc.nodeIP.String()}) if len(nsc.podCidr) > 0 { - argsBad = append(argsBad, []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", "-m", "comment", "--comment", "", + argsBad = append(argsBad, []string{"-m", "ipvs", "--ipvs", "--vdir", "ORIGINAL", "--vmethod", "MASQ", + "-m", "comment", "--comment", "", "!", "-s", nsc.podCidr, "!", "-d", nsc.podCidr, "-j", "SNAT", "--to-source", nsc.nodeIP.String()}) } } @@ -1687,7 +1716,8 @@ func changedIpvsSchedFlags(svc *ipvs.Service, s schedFlags) bool { return false } -func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, protocol, port uint16, + persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { var err error for _, svc := range svcs { @@ -1701,7 +1731,8 @@ func (ln *linuxNetworking) ipvsAddService(svcs []*ipvs.Service, vip net.IP, prot if err != nil { return nil, err } - klog.V(2).Infof("Updated persistence/session-affinity for service: %s", ipvsServiceString(svc)) + klog.V(2).Infof("Updated persistence/session-affinity for service: %s", + ipvsServiceString(svc)) } if changedIpvsSchedFlags(svc, flags) { @@ -1764,7 +1795,8 @@ func generateFwmark(ip, protocol, port string) (uint32, error) { } // ipvsAddFWMarkService: creates a IPVS service using FWMARK -func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { +func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint16, persistent bool, + persistentTimeout int32, scheduler string, flags schedFlags) (*ipvs.Service, error) { var protocolStr string switch { @@ -1800,7 +1832,8 @@ func (ln *linuxNetworking) ipvsAddFWMarkService(vip net.IP, protocol, port uint1 if err != nil { return nil, err } - klog.V(2).Infof("Updated persistence/session-affinity for service: %s", ipvsServiceString(svc)) + klog.V(2).Infof("Updated persistence/session-affinity for service: %s", + ipvsServiceString(svc)) } if changedIpvsSchedFlags(svc, flags) { @@ -1910,7 +1943,8 @@ func setupMangleTableRule(ip string, protocol string, port string, fwmark string return nil } -func (ln *linuxNetworking) cleanupMangleTableRule(ip string, protocol string, port string, fwmark string, tcpMSS int) error { +func (ln *linuxNetworking) cleanupMangleTableRule(ip string, protocol string, port string, + fwmark string, tcpMSS int) error { iptablesCmdHandler, err := iptables.New() if err != nil { return errors.New("Failed to initialize iptables executor" + err.Error()) @@ -1974,7 +2008,8 @@ func routeVIPTrafficToDirector(fwmark string) error { return errors.New("Failed to verify if `ip rule` exists due to: " + err.Error()) } if !strings.Contains(string(out), fwmark) { - err = exec.Command("ip", "rule", "add", "prio", "32764", "fwmark", fwmark, "table", customDSRRouteTableID).Run() + err = exec.Command("ip", "rule", "add", "prio", "32764", "fwmark", fwmark, "table", + customDSRRouteTableID).Run() if err != nil { return errors.New("Failed to add policy rule to lookup traffic to VIP through the custom " + " routing table due to " + err.Error()) @@ -2036,14 +2071,19 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service out, err := exec.Command("ip", "rule", "list").Output() if err != nil { - return errors.New("Failed to verify if `ip rule add prio 32765 from all lookup external_ip` exists due to: " + err.Error()) + return fmt.Errorf("failed to verify if `ip rule add prio 32765 from all lookup external_ip` exists due to: %v", + err) } - if !(strings.Contains(string(out), externalIPRouteTableName) || strings.Contains(string(out), externalIPRouteTableID)) { - err = exec.Command("ip", "rule", "add", "prio", "32765", "from", "all", "lookup", externalIPRouteTableID).Run() + if !(strings.Contains(string(out), externalIPRouteTableName) || + strings.Contains(string(out), externalIPRouteTableID)) { + err = exec.Command("ip", "rule", "add", "prio", "32765", "from", "all", "lookup", + externalIPRouteTableID).Run() if err != nil { - klog.Infof("Failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` due to " + err.Error()) - return errors.New("Failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` due to " + err.Error()) + klog.Infof("Failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` due to %v", + err.Error()) + return fmt.Errorf("failed to add policy rule `ip rule add prio 32765 from all lookup external_ip` "+ + "due to %v", err) } } @@ -2054,7 +2094,8 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service for _, externalIP := range svc.externalIPs { // Verify the DSR annotation exists if !svc.directServerReturn { - klog.V(1).Infof("Skipping service %s/%s as it does not have DSR annotation\n", svc.namespace, svc.name) + klog.V(1).Infof("Skipping service %s/%s as it does not have DSR annotation\n", + svc.namespace, svc.name) continue } @@ -2063,7 +2104,8 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service if !strings.Contains(outStr, externalIP) { if err = exec.Command("ip", "route", "add", externalIP, "dev", "kube-bridge", "table", externalIPRouteTableID).Run(); err != nil { - klog.Error("Failed to add route for " + externalIP + " in custom route table for external IP's due to: " + err.Error()) + klog.Errorf("Failed to add route for %s in custom route table for external IP's due to: %v", + externalIP, err) continue } } @@ -2080,7 +2122,8 @@ func (ln *linuxNetworking) setupRoutesForExternalIPForDSR(serviceInfoMap service args := []string{"route", "del", "table", externalIPRouteTableID} args = append(args, route...) if err = exec.Command("ip", args...).Run(); err != nil { - klog.Errorf("Failed to del route for %v in custom route table for external IP's due to: %s", ip, err) + klog.Errorf("Failed to del route for %v in custom route table for external IP's due to: %s", + ip, err) continue } } @@ -2143,7 +2186,8 @@ func (ln *linuxNetworking) getKubeDummyInterface() (netlink.Link, error) { var dummyVipInterface netlink.Link dummyVipInterface, err := netlink.LinkByName(KubeDummyIf) if err != nil && err.Error() == IfaceNotFound { - klog.V(1).Infof("Could not find dummy interface: " + KubeDummyIf + " to assign cluster ip's, creating one") + klog.V(1).Infof("Could not find dummy interface: %s to assign cluster ip's, creating one", + KubeDummyIf) err = netlink.LinkAdd(&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: KubeDummyIf}}) if err != nil { return nil, errors.New("Failed to add dummy interface: " + err.Error()) @@ -2321,7 +2365,8 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) { // NewNetworkServicesController returns NetworkServicesController object func NewNetworkServicesController(clientset kubernetes.Interface, config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, - epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) { + epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, + ipsetMutex *sync.Mutex) (*NetworkServicesController, error) { var err error ln, err := newLinuxNetworking() diff --git a/pkg/controllers/proxy/service_endpoints_sync.go b/pkg/controllers/proxy/service_endpoints_sync.go index 8a39fac5..e6e0c406 100644 --- a/pkg/controllers/proxy/service_endpoints_sync.go +++ b/pkg/controllers/proxy/service_endpoints_sync.go @@ -21,7 +21,8 @@ import ( // sync the ipvs service and server details configured to reflect the desired state of Kubernetes services // and endpoints as learned from services and endpoints information from the api server -func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap) error { +func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInfoMap, + endpointsInfoMap endpointsInfoMap) error { start := time.Now() defer func() { endTime := time.Since(start) @@ -51,7 +52,8 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf err = nsc.setupExternalIPServices(serviceInfoMap, endpointsInfoMap, activeServiceEndpointMap) if err != nil { syncErrors = true - klog.Errorf("Error setting up IPVS services for service external IP's and load balancer IP's: %s", err.Error()) + klog.Errorf("Error setting up IPVS services for service external IP's and load balancer IP's: %s", + err.Error()) } err = nsc.cleanupStaleVIPs(activeServiceEndpointMap) if err != nil { @@ -74,11 +76,13 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf err = nsc.setupForDSR(serviceInfoMap) if err != nil { syncErrors = true - klog.Errorf("Error setting up necessary policy based routing configuration needed for direct server return: %s", err.Error()) + klog.Errorf("Error setting up necessary policy based routing configuration needed for "+ + "direct server return: %s", err.Error()) } if syncErrors { - klog.V(1).Info("One or more errors encountered during sync of IPVS services and servers to desired state") + klog.V(1).Info("One or more errors encountered during sync of IPVS services and servers " + + "to desired state") } else { klog.V(1).Info("IPVS servers and services are synced to desired state") } @@ -86,7 +90,8 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf return nil } -func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { +func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serviceInfoMap, + endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -115,7 +120,8 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv } // create IPVS service for the service to be exposed through the cluster ip - ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + ipvsClusterVipSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, svc.clusterIP, protocol, uint16(svc.port), + svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { klog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error()) continue @@ -145,14 +151,16 @@ func (nsc *NetworkServicesController) setupClusterIPServices(serviceInfoMap serv if err != nil { klog.Errorf(err.Error()) } else { - activeServiceEndpointMap[clusterServiceID] = append(activeServiceEndpointMap[clusterServiceID], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[clusterServiceID] = append(activeServiceEndpointMap[clusterServiceID], + generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } return nil } -func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { +func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap serviceInfoMap, + endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -175,7 +183,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } endpoints := endpointsInfoMap[k] if svc.local && !hasActiveEndpoints(endpoints) { - klog.V(1).Infof("Skipping setting up NodePort service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name) + klog.V(1).Infof("Skipping setting up NodePort service %s/%s as it does not have active endpoints", + svc.namespace, svc.name) continue } @@ -202,7 +211,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi nodeServiceIds = make([]string, len(addrs)) for i, addr := range addrs { - ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + ipvsNodeportSvcs[i], err = nsc.ln.ipvsAddService(ipvsSvcs, addr.IP, protocol, uint16(svc.nodePort), + svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { klog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error()) continue @@ -213,7 +223,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi } } else { ipvsNodeportSvcs = make([]*ipvs.Service, 1) - ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + ipvsNodeportSvcs[0], err = nsc.ln.ipvsAddService(ipvsSvcs, nsc.nodeIP, protocol, uint16(svc.nodePort), + svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { klog.Errorf("Failed to create ipvs service for node port due to: %s", err.Error()) continue @@ -237,7 +248,9 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi if err != nil { klog.Errorf(err.Error()) } else { - activeServiceEndpointMap[nodeServiceIds[i]] = append(activeServiceEndpointMap[nodeServiceIds[i]], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[nodeServiceIds[i]] = + append(activeServiceEndpointMap[nodeServiceIds[i]], + generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } @@ -246,7 +259,8 @@ func (nsc *NetworkServicesController) setupNodePortServices(serviceInfoMap servi return nil } -func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { +func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap serviceInfoMap, + endpointsInfoMap endpointsInfoMap, activeServiceEndpointMap map[string][]string) error { ipvsSvcs, err := nsc.ln.ipvsGetServices() if err != nil { return errors.New("Failed get list of IPVS services due to: " + err.Error()) @@ -287,7 +301,8 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser } if svc.local && !hasActiveEndpoints(endpoints) { - klog.V(1).Infof("Skipping setting up IPVS service for external IP and LoadBalancer IP for the service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name) + klog.V(1).Infof("Skipping setting up IPVS service for external IP and LoadBalancer IP "+ + "for the service %s/%s as it does not have active endpoints\n", svc.namespace, svc.name) continue } mangleTableRulesDump := bytes.Buffer{} @@ -300,12 +315,15 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser for _, externalIP := range extIPSet.List() { var externalIPServiceID string if svc.directServerReturn && svc.directServerReturnMethod == tunnelInterfaceType { - ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + ipvsExternalIPSvc, err := nsc.ln.ipvsAddFWMarkService(net.ParseIP(externalIP), protocol, + uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { - klog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", externalIP, err.Error()) + klog.Errorf("Failed to create ipvs service for External IP: %s due to: %s", + externalIP, err.Error()) continue } - externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) + externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, + externalIP: externalIP}) fwMark, err := generateFwmark(externalIP, svc.protocol, strconv.Itoa(svc.port)) if err != nil { klog.Errorf("Failed to generate Fwmark") @@ -314,7 +332,8 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser externalIPServiceID = fmt.Sprint(fwMark) // ensure there is iptables mangle table rule to FWMARK the packet - err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIPServiceID, nsc.dsrTCPMSS) + err = setupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), externalIPServiceID, + nsc.dsrTCPMSS) if err != nil { klog.Errorf("Failed to setup mangle table rule to FMWARD the traffic to external IP") continue @@ -337,16 +356,20 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser // ensure director with vip assigned err := nsc.ln.ipAddrAdd(dummyVipInterface, externalIP, true) if err != nil && err.Error() != IfaceHasAddr { - klog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", externalIP, KubeDummyIf, err.Error()) + klog.Errorf("Failed to assign external ip %s to dummy interface %s due to %s", + externalIP, KubeDummyIf, err.Error()) } // create IPVS service for the service to be exposed through the external ip - ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) + ipvsExternalIPSvc, err := nsc.ln.ipvsAddService(ipvsSvcs, net.ParseIP(externalIP), protocol, + uint16(svc.port), svc.sessionAffinity, svc.sessionAffinityTimeoutSeconds, svc.scheduler, svc.flags) if err != nil { - klog.Errorf("Failed to create ipvs service for external ip: %s due to %s", externalIP, err.Error()) + klog.Errorf("Failed to create ipvs service for external ip: %s due to %s", + externalIP, err.Error()) continue } - externalIPServices = append(externalIPServices, externalIPService{ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) + externalIPServices = append(externalIPServices, externalIPService{ + ipvsSvc: ipvsExternalIPSvc, externalIP: externalIP}) externalIPServiceID = generateIPPortID(externalIP, svc.protocol, strconv.Itoa(svc.port)) // ensure there is NO iptables mangle table rule to FWMARK the packet @@ -358,9 +381,11 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser fwMark := fmt.Sprint(fwmark) for _, mangleTableRule := range mangleTableRules { if strings.Contains(mangleTableRule, externalIP) && strings.Contains(mangleTableRule, fwMark) { - err = nsc.ln.cleanupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), fwMark, nsc.dsrTCPMSS) + err = nsc.ln.cleanupMangleTableRule(externalIP, svc.protocol, strconv.Itoa(svc.port), fwMark, + nsc.dsrTCPMSS) if err != nil { - klog.Errorf("Failed to verify and cleanup any mangle table rule to FMWARD the traffic to external IP due to " + err.Error()) + klog.Errorf("Failed to verify and cleanup any mangle table rule to FMWARD the traffic " + + "to external IP due to " + err.Error()) continue } } @@ -370,7 +395,9 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser activeServiceEndpointMap[externalIPServiceID] = make([]string, 0) for _, endpoint := range endpoints { if !svc.local || (svc.local && endpoint.isLocal) { - activeServiceEndpointMap[externalIPServiceID] = append(activeServiceEndpointMap[externalIPServiceID], generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) + activeServiceEndpointMap[externalIPServiceID] = + append(activeServiceEndpointMap[externalIPServiceID], + generateEndpointID(endpoint.ip, strconv.Itoa(endpoint.port))) } } } @@ -404,7 +431,8 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser podObj, err := nsc.getPodObjectForEndpoint(endpoint.ip) if err != nil { - klog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping peparing endpoint for DSR") + klog.Errorf("Failed to find endpoint with ip: " + endpoint.ip + ". so skipping " + + "preparing endpoint for DSR") continue } @@ -416,27 +444,33 @@ func (nsc *NetworkServicesController) setupExternalIPServices(serviceInfoMap ser containerURL := podObj.Status.ContainerStatuses[0].ContainerID runtime, containerID, err := cri.EndpointParser(containerURL) if err != nil { - klog.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint set up", podObj.Spec.Containers[0].Name, podObj.Name) + klog.Errorf("couldn't get containerID (container=%s, pod=%s). Skipping DSR endpoint "+ + "set up", podObj.Spec.Containers[0].Name, podObj.Name) continue } if containerID == "" { - klog.Errorf("Failed to find container id for the endpoint with ip: " + endpoint.ip + " so skipping peparing endpoint for DSR") + klog.Errorf("Failed to find container id for the endpoint with ip: %s so skipping "+ + "preparing endpoint for DSR", endpoint.ip) continue } if runtime == "docker" { // WARN: This method is deprecated and will be removed once docker-shim is removed from kubelet. - err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpoint.ip, externalIPService.externalIP) + err = nsc.ln.prepareEndpointForDsrWithDocker(containerID, endpoint.ip, + externalIPService.externalIP) if err != nil { - klog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", endpoint.ip, err.Error()) + klog.Errorf("Failed to prepare endpoint %s to do direct server return due to %s", + endpoint.ip, err.Error()) } } else { // We expect CRI compliant runtimes here // ugly workaround, refactoring of pkg/Proxy is required - err = nsc.ln.(*linuxNetworking).prepareEndpointForDsrWithCRI(nsc.dsr.runtimeEndpoint, containerID, endpoint.ip, externalIPService.externalIP) + err = nsc.ln.(*linuxNetworking).prepareEndpointForDsrWithCRI(nsc.dsr.runtimeEndpoint, + containerID, endpoint.ip, externalIPService.externalIP) if err != nil { - klog.Errorf("Failed to prepare endpoint %s to do DSR due to: %s", endpoint.ip, err.Error()) + klog.Errorf("Failed to prepare endpoint %s to do DSR due to: %s", + endpoint.ip, err.Error()) } } } @@ -452,22 +486,27 @@ func (nsc *NetworkServicesController) setupForDSR(serviceInfoMap serviceInfoMap) if err != nil { return errors.New("Failed setup PBR for DSR due to: " + err.Error()) } - klog.V(1).Infof("Custom routing table " + customDSRRouteTableName + " required for Direct Server Return is setup as expected.") + klog.V(1).Infof("Custom routing table %s required for Direct Server Return is setup as expected.", + customDSRRouteTableName) klog.V(1).Infof("Setting up custom route table required to add routes for external IP's.") err = nsc.ln.setupRoutesForExternalIPForDSR(serviceInfoMap) if err != nil { - klog.Errorf("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error()) - return errors.New("Failed setup custom routing table required to add routes for external IP's due to: " + err.Error()) + klog.Errorf("failed setup custom routing table required to add routes for external IP's due to: %v", + err) + return fmt.Errorf("failed setup custom routing table required to add routes for external IP's due to: %v", + err) } - klog.V(1).Infof("Custom routing table " + externalIPRouteTableName + " required for Direct Server Return is setup as expected.") + klog.V(1).Infof("Custom routing table required for Direct Server Return is setup as expected.", + externalIPRouteTableName) return nil } func (nsc *NetworkServicesController) cleanupStaleVIPs(activeServiceEndpointMap map[string][]string) error { // cleanup stale IPs on dummy interface klog.V(1).Info("Cleaning up if any, old service IPs on dummy interface") - // This represents "ip - protocol - port" that is created as the key to activeServiceEndpointMap in generateIPPortID() + // This represents "ip - protocol - port" that is created as the key to activeServiceEndpointMap in + // generateIPPortID() const expectedServiceIDParts = 3 addrActive := make(map[string]bool) for k := range activeServiceEndpointMap { @@ -568,8 +607,8 @@ func (nsc *NetworkServicesController) cleanupStaleIPVSConfig(activeServiceEndpoi } } if !validEp { - klog.V(1).Infof("Found a destination %s in service %s which is no longer needed so cleaning up", - ipvsDestinationString(dst), ipvsServiceString(ipvsSvc)) + klog.V(1).Infof("Found a destination %s in service %s which is no longer needed so "+ + "cleaning up", ipvsDestinationString(dst), ipvsServiceString(ipvsSvc)) err = nsc.ipvsDeleteDestination(ipvsSvc, dst) if err != nil { klog.Errorf("Failed to delete destination %s from ipvs service %s", diff --git a/pkg/controllers/proxy/utils.go b/pkg/controllers/proxy/utils.go index 7d7a5572..a635b63b 100644 --- a/pkg/controllers/proxy/utils.go +++ b/pkg/controllers/proxy/utils.go @@ -20,14 +20,17 @@ const ( func attemptNamespaceResetAfterError(hostNSHandle netns.NsHandle) { err := netns.Set(hostNSHandle) if err != nil { - klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) + klog.Errorf("failed to set hostNetworkNamespace while resetting namespace after a previous error due to %v", + err) } activeNetworkNamespaceHandle, err := netns.Get() if err != nil { - klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after a previous error due to " + err.Error()) + klog.Errorf("failed to confirm activeNetworkNamespace while resetting namespace after "+ + "a previous error due to %v", err) return } - klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: " + activeNetworkNamespaceHandle.String()) + klog.V(2).Infof("Current network namespace after revert namespace to host network namespace: %s", + activeNetworkNamespaceHandle.String()) _ = activeNetworkNamespaceHandle.Close() } @@ -86,7 +89,8 @@ func (ln *linuxNetworking) configureContainerForDSR( break } if err.Error() == IfaceNotFound { - klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", KubeTunnelIf) + klog.V(3).Infof("Waiting for tunnel interface %s to come up in the pod, retrying", + KubeTunnelIf) continue } else { break @@ -98,7 +102,8 @@ func (ln *linuxNetworking) configureContainerForDSR( return fmt.Errorf("failed to get %s tunnel interface handle due to %v", KubeTunnelIf, err) } - klog.V(2).Infof("Successfully created tunnel interface %s in endpoint %s.", KubeTunnelIf, endpointIP) + klog.V(2).Infof("Successfully created tunnel interface %s in endpoint %s.", + KubeTunnelIf, endpointIP) } // bring the tunnel interface up @@ -117,7 +122,8 @@ func (ln *linuxNetworking) configureContainerForDSR( klog.Infof("Successfully assigned VIP: %s in endpoint %s.", vip, endpointIP) // disable rp_filter on all interface - err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", []byte(strconv.Itoa(0)), 0640) + err = ioutil.WriteFile("/proc/sys/net/ipv4/conf/kube-tunnel-if/rp_filter", + []byte(strconv.Itoa(0)), 0640) if err != nil { attemptNamespaceResetAfterError(hostNetworkNamespaceHandle) return fmt.Errorf("failed to disable rp_filter on kube-tunnel-if in the endpoint container") diff --git a/pkg/controllers/routing/aws.go b/pkg/controllers/routing/aws.go index 524c9751..7402a3c7 100644 --- a/pkg/controllers/routing/aws.go +++ b/pkg/controllers/routing/aws.go @@ -62,7 +62,8 @@ func (nrc *NetworkRoutingController) disableSourceDestinationCheck() { awsErr := err.(awserr.Error) if awsErr.Code() == "UnauthorizedOperation" { nrc.ec2IamAuthorized = false - klog.Errorf("Node does not have necessary IAM creds to modify instance attribute. So skipping disabling src-dst check.") + klog.Errorf("Node does not have necessary IAM creds to modify instance attribute. So skipping " + + "disabling src-dst check.") return } klog.Errorf("Failed to disable source destination check due to: %v", err.Error()) diff --git a/pkg/controllers/routing/bgp_peers.go b/pkg/controllers/routing/bgp_peers.go index abafaf10..3d44ae7b 100644 --- a/pkg/controllers/routing/bgp_peers.go +++ b/pkg/controllers/routing/bgp_peers.go @@ -182,9 +182,9 @@ func (nrc *NetworkRoutingController) syncInternalPeers() { } } -// connectToExternalBGPPeers adds all the configured eBGP peers (global or node specific) as neighbours// connectToExternalBGPPeers adds all the configured eBGP peers (global or node specific) as neighbours -func connectToExternalBGPPeers(server *gobgp.BgpServer, peerNeighbors []*gobgpapi.Peer, bgpGracefulRestart bool, bgpGracefulRestartDeferralTime time.Duration, - bgpGracefulRestartTime time.Duration, peerMultihopTTL uint8) error { +// connectToExternalBGPPeers adds all the configured eBGP peers (global or node specific) as neighbours +func connectToExternalBGPPeers(server *gobgp.BgpServer, peerNeighbors []*gobgpapi.Peer, bgpGracefulRestart bool, + bgpGracefulRestartDeferralTime time.Duration, bgpGracefulRestartTime time.Duration, peerMultihopTTL uint8) error { for _, n := range peerNeighbors { if bgpGracefulRestart { @@ -303,7 +303,8 @@ func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHa node := obj.(*v1core.Node) nodeIP, err := utils.GetNodeIP(node) if err != nil { - klog.Errorf("New node received, but we were unable to add it as we were couldn't find it's node IP: %v", err) + klog.Errorf( + "New node received, but we were unable to add it as we were couldn't find it's node IP: %v", err) return } @@ -332,7 +333,8 @@ func (nrc *NetworkRoutingController) newNodeEventHandler() cache.ResourceEventHa if err == nil { klog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP) } else { - klog.Infof("Received node (IP unavailable) removed update from watch API, so remove node from peer") + klog.Infof("Received node (IP unavailable) removed update from watch API, so remove node " + + "from peer") } nrc.OnNodeUpdate(obj) diff --git a/pkg/controllers/routing/bgp_policies.go b/pkg/controllers/routing/bgp_policies.go index 8db4d7e0..7fb2bb5f 100644 --- a/pkg/controllers/routing/bgp_policies.go +++ b/pkg/controllers/routing/bgp_policies.go @@ -92,7 +92,8 @@ func (nrc *NetworkRoutingController) addPodCidrDefinedSet() error { }, }, } - return nrc.bgpServer.AddDefinedSet(context.Background(), &gobgpapi.AddDefinedSetRequest{DefinedSet: podCidrDefinedSet}) + return nrc.bgpServer.AddDefinedSet(context.Background(), + &gobgpapi.AddDefinedSetRequest{DefinedSet: podCidrDefinedSet}) } return nil } @@ -111,7 +112,8 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error { advIPPrefixList := make([]*gobgpapi.Prefix, 0) advIps, _, _ := nrc.getAllVIPs() for _, ip := range advIps { - advIPPrefixList = append(advIPPrefixList, &gobgpapi.Prefix{IpPrefix: ip + "/32", MaskLengthMin: 32, MaskLengthMax: 32}) + advIPPrefixList = append(advIPPrefixList, + &gobgpapi.Prefix{IpPrefix: ip + "/32", MaskLengthMin: 32, MaskLengthMax: 32}) } if currentDefinedSet == nil { clusterIPPrefixSet := &gobgpapi.DefinedSet{ @@ -119,7 +121,8 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error { Name: "servicevipsdefinedset", Prefixes: advIPPrefixList, } - return nrc.bgpServer.AddDefinedSet(context.Background(), &gobgpapi.AddDefinedSetRequest{DefinedSet: clusterIPPrefixSet}) + return nrc.bgpServer.AddDefinedSet(context.Background(), + &gobgpapi.AddDefinedSetRequest{DefinedSet: clusterIPPrefixSet}) } if reflect.DeepEqual(advIPPrefixList, currentDefinedSet.Prefixes) { @@ -154,7 +157,8 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error { Name: "servicevipsdefinedset", Prefixes: toAdd, } - err = nrc.bgpServer.AddDefinedSet(context.Background(), &gobgpapi.AddDefinedSetRequest{DefinedSet: clusterIPPrefixSet}) + err = nrc.bgpServer.AddDefinedSet(context.Background(), + &gobgpapi.AddDefinedSetRequest{DefinedSet: clusterIPPrefixSet}) if err != nil { return err } @@ -163,7 +167,8 @@ func (nrc *NetworkRoutingController) addServiceVIPsDefinedSet() error { Name: "servicevipsdefinedset", Prefixes: toDelete, } - err = nrc.bgpServer.DeleteDefinedSet(context.Background(), &gobgpapi.DeleteDefinedSetRequest{DefinedSet: clusterIPPrefixSet, All: false}) + err = nrc.bgpServer.DeleteDefinedSet(context.Background(), + &gobgpapi.DeleteDefinedSetRequest{DefinedSet: clusterIPPrefixSet, All: false}) if err != nil { return err } @@ -195,7 +200,8 @@ func (nrc *NetworkRoutingController) addDefaultRouteDefinedSet() error { }, }, } - return nrc.bgpServer.AddDefinedSet(context.Background(), &gobgpapi.AddDefinedSetRequest{DefinedSet: defaultRouteDefinedSet}) + return nrc.bgpServer.AddDefinedSet(context.Background(), + &gobgpapi.AddDefinedSetRequest{DefinedSet: defaultRouteDefinedSet}) } return nil } @@ -278,7 +284,8 @@ func (nrc *NetworkRoutingController) addiBGPPeersDefinedSet() ([]string, error) Name: "iBGPpeerset", List: toDelete, } - err = nrc.bgpServer.DeleteDefinedSet(context.Background(), &gobgpapi.DeleteDefinedSetRequest{DefinedSet: iBGPPeerNS, All: false}) + err = nrc.bgpServer.DeleteDefinedSet(context.Background(), + &gobgpapi.DeleteDefinedSetRequest{DefinedSet: iBGPPeerNS, All: false}) if err != nil { return iBGPPeerCIDRs, err } @@ -336,7 +343,8 @@ func (nrc *NetworkRoutingController) addAllBGPPeersDefinedSet(iBGPPeerCIDRs, ext if err != nil { return err } - // nolint:gocritic // We intentionally append to a different array here so as to not change the passed in externalBGPPeerCIDRs + // nolint:gocritic // We intentionally append to a different array here so as to not change the passed + // in externalBGPPeerCIDRs allBgpPeers := append(externalBGPPeerCIDRs, iBGPPeerCIDRs...) if currentDefinedSet == nil { allPeerNS := &gobgpapi.DefinedSet{ @@ -385,7 +393,8 @@ func (nrc *NetworkRoutingController) addAllBGPPeersDefinedSet(iBGPPeerCIDRs, ext Name: "allpeerset", List: toDelete, } - err = nrc.bgpServer.DeleteDefinedSet(context.Background(), &gobgpapi.DeleteDefinedSetRequest{DefinedSet: allPeerNS, All: false}) + err = nrc.bgpServer.DeleteDefinedSet(context.Background(), + &gobgpapi.DeleteDefinedSetRequest{DefinedSet: allPeerNS, All: false}) if err != nil { return err } @@ -394,9 +403,10 @@ func (nrc *NetworkRoutingController) addAllBGPPeersDefinedSet(iBGPPeerCIDRs, ext // BGP export policies are added so that following conditions are met: // -// - by default export of all routes from the RIB to the neighbour's is denied, and explicitly statements are added i +// - by default export of all routes from the RIB to the neighbour's is denied, and explicitly statements are added // to permit the desired routes to be exported -// - each node is allowed to advertise its assigned pod CIDR's to all of its iBGP peer neighbours with same ASN if --enable-ibgp=true +// - each node is allowed to advertise its assigned pod CIDR's to all of its iBGP peer neighbours with same +// ASN if --enable-ibgp=true // - each node is allowed to advertise its assigned pod CIDR's to all of its external BGP peer neighbours // only if --advertise-pod-cidr flag is set to true // - each node is NOT allowed to advertise its assigned pod CIDR's to all of its external BGP peer neighbours @@ -540,7 +550,8 @@ func (nrc *NetworkRoutingController) addExportPolicies() error { } } err = nrc.bgpServer.ListPolicyAssignment(context.Background(), - &gobgpapi.ListPolicyAssignmentRequest{Name: "global", Direction: gobgpapi.PolicyDirection_EXPORT}, checkExistingPolicyAssignment) + &gobgpapi.ListPolicyAssignmentRequest{Name: "global", Direction: gobgpapi.PolicyDirection_EXPORT}, + checkExistingPolicyAssignment) if err != nil { return errors.New("Failed to verify if kube-router BGP export policy assignment exists: " + err.Error()) } @@ -552,7 +563,8 @@ func (nrc *NetworkRoutingController) addExportPolicies() error { DefaultAction: gobgpapi.RouteAction_REJECT, } if !policyAssignmentExists { - err = nrc.bgpServer.AddPolicyAssignment(context.Background(), &gobgpapi.AddPolicyAssignmentRequest{Assignment: &policyAssignment}) + err = nrc.bgpServer.AddPolicyAssignment(context.Background(), + &gobgpapi.AddPolicyAssignmentRequest{Assignment: &policyAssignment}) if err != nil { return errors.New("Failed to add policy assignment: " + err.Error()) } @@ -562,7 +574,8 @@ func (nrc *NetworkRoutingController) addExportPolicies() error { } // BGP import policies are added so that the following conditions are met: -// - do not import Service VIPs advertised from any peers, instead each kube-router originates and injects Service VIPs into local rib. +// - do not import Service VIPs advertised from any peers, instead each kube-router originates and injects +// Service VIPs into local rib. func (nrc *NetworkRoutingController) addImportPolicies() error { statements := make([]*gobgpapi.Statement, 0) @@ -629,7 +642,8 @@ func (nrc *NetworkRoutingController) addImportPolicies() error { } } err = nrc.bgpServer.ListPolicyAssignment(context.Background(), - &gobgpapi.ListPolicyAssignmentRequest{Name: "global", Direction: gobgpapi.PolicyDirection_IMPORT}, checkExistingPolicyAssignment) + &gobgpapi.ListPolicyAssignmentRequest{Name: "global", Direction: gobgpapi.PolicyDirection_IMPORT}, + checkExistingPolicyAssignment) if err != nil { return errors.New("Failed to verify if kube-router BGP import policy assignment exists: " + err.Error()) } @@ -641,7 +655,8 @@ func (nrc *NetworkRoutingController) addImportPolicies() error { DefaultAction: gobgpapi.RouteAction_ACCEPT, } if !policyAssignmentExists { - err = nrc.bgpServer.AddPolicyAssignment(context.Background(), &gobgpapi.AddPolicyAssignmentRequest{Assignment: &policyAssignment}) + err = nrc.bgpServer.AddPolicyAssignment(context.Background(), + &gobgpapi.AddPolicyAssignmentRequest{Assignment: &policyAssignment}) if err != nil { return errors.New("Failed to add policy assignment: " + err.Error()) } diff --git a/pkg/controllers/routing/ecmp_vip.go b/pkg/controllers/routing/ecmp_vip.go index d87e0156..e799f76f 100644 --- a/pkg/controllers/routing/ecmp_vip.go +++ b/pkg/controllers/routing/ecmp_vip.go @@ -21,7 +21,8 @@ import ( // bgpAdvertiseVIP advertises the service vip (cluster ip or load balancer ip or external IP) the configured peers func (nrc *NetworkRoutingController) bgpAdvertiseVIP(vip string) error { - klog.V(2).Infof("Advertising route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String()) + klog.V(2).Infof("Advertising route: '%s/%s via %s' to peers", + vip, strconv.Itoa(32), nrc.nodeIP.String()) a1, _ := ptypes.MarshalAny(&gobgpapi.OriginAttribute{ Origin: 0, @@ -47,7 +48,8 @@ func (nrc *NetworkRoutingController) bgpAdvertiseVIP(vip string) error { // bgpWithdrawVIP unadvertises the service vip func (nrc *NetworkRoutingController) bgpWithdrawVIP(vip string) error { - klog.V(2).Infof("Withdrawing route: '%s/%s via %s' to peers", vip, strconv.Itoa(32), nrc.nodeIP.String()) + klog.V(2).Infof("Withdrawing route: '%s/%s via %s' to peers", + vip, strconv.Itoa(32), nrc.nodeIP.String()) a1, _ := ptypes.MarshalAny(&gobgpapi.OriginAttribute{ Origin: 0, @@ -114,7 +116,8 @@ func getServiceObject(obj interface{}) (svc *v1core.Service) { func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) { if !nrc.bgpServerStarted { - klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name) + klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", + svc.Namespace, svc.Name) return } @@ -137,7 +140,8 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) { func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) { if !nrc.bgpServerStarted { - klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name) + klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", + svc.Namespace, svc.Name) return } @@ -172,9 +176,9 @@ func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, log klog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name) // If the service is headless and the previous version of the service is either non-existent or also headless, - // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we don't - // need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, it was a - // ClusterIP before. + // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we + // don't need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, + // it was a ClusterIP before. if utils.ServiceIsHeadless(obj) { klog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) return @@ -312,7 +316,8 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { klog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name) if !nrc.bgpServerStarted { - klog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name) + klog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", + ep.Namespace, ep.Name) return } @@ -406,7 +411,8 @@ OUTER: for _, withdrawVIP := range toWithdrawList { for _, advertiseVIP := range toAdvertiseList { if withdrawVIP == advertiseVIP { - // if there is a VIP that is set to both be advertised and withdrawn, don't add it to the final withdraw list + // if there is a VIP that is set to both be advertised and withdrawn, don't add it to the final + // withdraw list continue OUTER } } @@ -416,7 +422,8 @@ OUTER: return toAdvertiseList, finalToWithdrawList, nil } -func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service, annotation string, defaultValue bool) bool { +func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service, annotation string, + defaultValue bool) bool { returnValue := defaultValue stringValue, exists := svc.Annotations[annotation] if exists { @@ -426,7 +433,8 @@ func (nrc *NetworkRoutingController) shouldAdvertiseService(svc *v1core.Service, return returnValue } -func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, onlyActiveEndpoints bool) ([]string, []string, error) { +func (nrc *NetworkRoutingController) getVIPsForService(svc *v1core.Service, + onlyActiveEndpoints bool) ([]string, []string, error) { advertise := true @@ -468,7 +476,8 @@ func (nrc *NetworkRoutingController) getAllVIPsForService(svc *v1core.Service) [ // Deprecated: Use service.advertise.loadbalancer=false instead of service.skiplbips. _, skiplbips := svc.Annotations[svcSkipLbIpsAnnotation] - advertiseLoadBalancer := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, nrc.advertiseLoadBalancerIP) + advertiseLoadBalancer := nrc.shouldAdvertiseService(svc, svcAdvertiseLoadBalancerAnnotation, + nrc.advertiseLoadBalancerIP) if advertiseLoadBalancer && !skiplbips { ipList = append(ipList, nrc.getLoadBalancerIPs(svc)...) } diff --git a/pkg/controllers/routing/network_routes_controller.go b/pkg/controllers/routing/network_routes_controller.go index 5fd0f5f8..4fc835c4 100644 --- a/pkg/controllers/routing/network_routes_controller.go +++ b/pkg/controllers/routing/network_routes_controller.go @@ -141,7 +141,8 @@ type NetworkRoutingController struct { } // Run runs forever until we are notified on stop channel -func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, + wg *sync.WaitGroup) { var err error if nrc.enableCNI { nrc.updateCNIConfig() @@ -214,22 +215,26 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll linkAttrs.Name = "kube-bridge" bridge := &netlink.Bridge{LinkAttrs: linkAttrs} if err = netlink.LinkAdd(bridge); err != nil { - klog.Errorf("Failed to create `kube-router` bridge due to %s. Will be created by CNI bridge plugin when pod is launched.", err.Error()) + klog.Errorf("Failed to create `kube-router` bridge due to %s. Will be created by CNI bridge "+ + "plugin when pod is launched.", err.Error()) } kubeBridgeIf, err = netlink.LinkByName("kube-bridge") if err != nil { - klog.Errorf("Failed to find created `kube-router` bridge due to %s. Will be created by CNI bridge plugin when pod is launched.", err.Error()) + klog.Errorf("Failed to find created `kube-router` bridge due to %s. Will be created by CNI "+ + "bridge plugin when pod is launched.", err.Error()) } err = netlink.LinkSetUp(kubeBridgeIf) if err != nil { - klog.Errorf("Failed to bring `kube-router` bridge up due to %s. Will be created by CNI bridge plugin at later point when pod is launched.", err.Error()) + klog.Errorf("Failed to bring `kube-router` bridge up due to %s. Will be created by CNI bridge "+ + "plugin at later point when pod is launched.", err.Error()) } } if nrc.autoMTU { mtu, err := utils.GetMTUFromNodeIP(nrc.nodeIP, nrc.enableOverlays) if err != nil { - klog.Errorf("Failed to find MTU for node IP: %s for intelligently setting the kube-bridge MTU due to %s.", nrc.nodeIP, err.Error()) + klog.Errorf("Failed to find MTU for node IP: %s for intelligently setting the kube-bridge MTU "+ + "due to %s.", nrc.nodeIP, err.Error()) } if mtu > 0 { klog.Infof("Setting MTU of kube-bridge interface to: %d", mtu) @@ -243,14 +248,20 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll } // enable netfilter for the bridge if _, err := exec.Command("modprobe", "br_netfilter").CombinedOutput(); err != nil { - klog.Errorf("Failed to enable netfilter for bridge. Network policies and service proxy may not work: %s", err.Error()) + klog.Errorf("Failed to enable netfilter for bridge. Network policies and service proxy may "+ + "not work: %s", err.Error()) } - if err = ioutil.WriteFile("/proc/sys/net/bridge/bridge-nf-call-iptables", []byte(strconv.Itoa(1)), 0640); err != nil { - klog.Errorf("Failed to enable iptables for bridge. Network policies and service proxy may not work: %s", err.Error()) + if err = ioutil.WriteFile( + "/proc/sys/net/bridge/bridge-nf-call-iptables", []byte(strconv.Itoa(1)), 0640); err != nil { + klog.Errorf("Failed to enable iptables for bridge. Network policies and service proxy may "+ + "not work: %s", err.Error()) } if nrc.isIpv6 { - if err = ioutil.WriteFile("/proc/sys/net/bridge/bridge-nf-call-ip6tables", []byte(strconv.Itoa(1)), 0640); err != nil { - klog.Errorf("Failed to enable ip6tables for bridge. Network policies and service proxy may not work: %s", err.Error()) + if err = ioutil.WriteFile( + "/proc/sys/net/bridge/bridge-nf-call-ip6tables", []byte(strconv.Itoa(1)), + 0640); err != nil { + klog.Errorf("Failed to enable ip6tables for bridge. Network policies and service proxy may "+ + "not work: %s", err.Error()) } } @@ -362,7 +373,8 @@ func (nrc *NetworkRoutingController) updateCNIConfig() { } if reflect.DeepEqual(cidr, net.IPNet{}) { - klog.Infof("`subnet` in CNI conf file is empty so populating `subnet` in CNI conf file with pod CIDR assigned to the node obtained from node spec.") + klog.Infof("`subnet` in CNI conf file is empty so populating `subnet` in CNI conf file with pod " + + "CIDR assigned to the node obtained from node spec.") } cidrlen, _ := cidr.Mask.Size() oldCidr := cidr.IP.String() + "/" + strconv.Itoa(cidrlen) @@ -544,7 +556,8 @@ func (nrc *NetworkRoutingController) injectRoute(path *gobgpapi.Path) error { klog.Errorf("encountered error while checking peer status: %v", err) } if err == nil && !peerEstablished { - klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes", nextHop.String()) + klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes", + nextHop.String()) nrc.cleanupTunnel(dst, tunnelName) return nil } @@ -696,13 +709,13 @@ func (nrc *NetworkRoutingController) Cleanup() { // Pod egress cleanup err := nrc.deletePodEgressRule() if err != nil { - // Changing to level 1 logging as errors occur when ipsets have already been cleaned and needlessly worries users + // Changed to level 1 logging as errors occur when ipsets have already been cleaned and needlessly worries users klog.V(1).Infof("Error deleting Pod egress iptables rule: %v", err) } err = nrc.deleteBadPodEgressRules() if err != nil { - // Changing to level 1 logging as errors occur when ipsets have already been cleaned and needlessly worries users + // Changed to level 1 logging as errors occur when ipsets have already been cleaned and needlessly worries users klog.V(1).Infof("Error deleting Pod egress iptables rule: %s", err.Error()) } @@ -968,7 +981,8 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error { } if grpcServer { - nrc.bgpServer = gobgp.NewBgpServer(gobgp.GrpcListenAddress(nrc.nodeIP.String() + ":50051" + "," + "127.0.0.1:50051")) + nrc.bgpServer = gobgp.NewBgpServer( + gobgp.GrpcListenAddress(nrc.nodeIP.String() + ":50051" + "," + "127.0.0.1:50051")) } else { nrc.bgpServer = gobgp.NewBgpServer() } @@ -1003,7 +1017,8 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error { // Get Global Peer Router ASN configs nodeBgpPeerAsnsAnnotation, ok := node.ObjectMeta.Annotations[peerASNAnnotation] if !ok { - klog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") + klog.Infof("Could not find BGP peer info for the node in the node annotations so " + + "skipping configuring peer.") return nil } @@ -1020,7 +1035,8 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error { // Get Global Peer Router IP Address configs nodeBgpPeersAnnotation, ok := node.ObjectMeta.Annotations[peerIPAnnotation] if !ok { - klog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.") + klog.Infof("Could not find BGP peer info for the node in the node annotations " + + "so skipping configuring peer.") return nil } ipStrings := stringToSlice(nodeBgpPeersAnnotation, ",") @@ -1140,7 +1156,8 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.bgpHoldtime = kubeRouterConfig.BGPHoldTime.Seconds() if nrc.bgpHoldtime > 65536 || nrc.bgpHoldtime < 3 { - return nil, errors.New("this is an incorrect BGP holdtime range, holdtime must be in the range 3s to 18h12m16s") + return nil, errors.New("this is an incorrect BGP holdtime range, holdtime must be in the range " + + "3s to 18h12m16s") } nrc.hostnameOverride = kubeRouterConfig.HostnameOverride @@ -1182,7 +1199,8 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, cidr, err := utils.GetPodCidrFromNodeSpec(clientset, nrc.hostnameOverride) if err != nil { - klog.Fatalf("Failed to get pod CIDR from node spec. kube-router relies on kube-controller-manager to allocate pod CIDR for the node or an annotation `kube-router.io/pod-cidr`. Error: %v", err) + klog.Fatalf("Failed to get pod CIDR from node spec. kube-router relies on kube-controller-manager to "+ + "allocate pod CIDR for the node or an annotation `kube-router.io/pod-cidr`. Error: %v", err) return nil, fmt.Errorf("failed to get pod CIDR details from Node.spec: %s", err.Error()) } nrc.podCidr = cidr @@ -1259,7 +1277,8 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, } } - nrc.globalPeerRouters, err = newGlobalPeers(kubeRouterConfig.PeerRouters, peerPorts, peerASNs, peerPasswords, nrc.bgpHoldtime) + nrc.globalPeerRouters, err = newGlobalPeers(kubeRouterConfig.PeerRouters, peerPorts, peerASNs, + peerPasswords, nrc.bgpHoldtime) if err != nil { return nil, fmt.Errorf("error processing Global Peer Router configs: %s", err) } @@ -1272,10 +1291,12 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, bgpLocalAddressListAnnotation, ok := node.ObjectMeta.Annotations[bgpLocalAddressAnnotation] if !ok { - klog.Infof("Could not find annotation `kube-router.io/bgp-local-addresses` on node object so BGP will listen on node IP: %s address.", nrc.nodeIP.String()) + klog.Infof("Could not find annotation `kube-router.io/bgp-local-addresses` on node object so BGP "+ + "will listen on node IP: %s address.", nrc.nodeIP.String()) nrc.localAddressList = append(nrc.localAddressList, nrc.nodeIP.String()) } else { - klog.Infof("Found annotation `kube-router.io/bgp-local-addresses` on node object so BGP will listen on local IP's: %s", bgpLocalAddressListAnnotation) + klog.Infof("Found annotation `kube-router.io/bgp-local-addresses` on node object so BGP will listen "+ + "on local IP's: %s", bgpLocalAddressListAnnotation) localAddresses := stringToSlice(bgpLocalAddressListAnnotation, ",") for _, addr := range localAddresses { ip := net.ParseIP(addr) diff --git a/pkg/cri/remote_runtime.go b/pkg/cri/remote_runtime.go index 21f2bca2..2e5673dc 100644 --- a/pkg/cri/remote_runtime.go +++ b/pkg/cri/remote_runtime.go @@ -46,7 +46,8 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) ( ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() - conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithContextDialer(dialer), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) if err != nil { klog.Errorf("Connect remote runtime %s failed: %v", addr, err) return nil, err diff --git a/pkg/healthcheck/health_controller.go b/pkg/healthcheck/health_controller.go index ba44c788..7b77d796 100644 --- a/pkg/healthcheck/health_controller.go +++ b/pkg/healthcheck/health_controller.go @@ -88,7 +88,8 @@ func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) { defer hc.Status.Unlock() switch { - // The first heartbeat will set the initial gracetime the controller has to report in, A static time is added as well when checking to allow for load variation in sync time + // The first heartbeat will set the initial gracetime the controller has to report in, A static time is added as + // well when checking to allow for load variation in sync time case beat.Component == "NSC": if hc.Status.NetworkServicesControllerAliveTTL == 0 { hc.Status.NetworkServicesControllerAliveTTL = time.Since(hc.Status.NetworkServicesControllerAlive) @@ -118,21 +119,24 @@ func (hc *HealthController) CheckHealth() bool { graceTime := defaultGraceTimeDuration if hc.Config.RunFirewall { - if time.Since(hc.Status.NetworkPolicyControllerAlive) > hc.Config.IPTablesSyncPeriod+hc.Status.NetworkPolicyControllerAliveTTL+graceTime { + if time.Since(hc.Status.NetworkPolicyControllerAlive) > + hc.Config.IPTablesSyncPeriod+hc.Status.NetworkPolicyControllerAliveTTL+graceTime { klog.Error("Network Policy Controller heartbeat missed") health = false } } if hc.Config.RunRouter { - if time.Since(hc.Status.NetworkRoutingControllerAlive) > hc.Config.RoutesSyncPeriod+hc.Status.NetworkRoutingControllerAliveTTL+graceTime { + if time.Since(hc.Status.NetworkRoutingControllerAlive) > + hc.Config.RoutesSyncPeriod+hc.Status.NetworkRoutingControllerAliveTTL+graceTime { klog.Error("Network Routing Controller heartbeat missed") health = false } } if hc.Config.RunServiceProxy { - if time.Since(hc.Status.NetworkServicesControllerAlive) > hc.Config.IpvsSyncPeriod+hc.Status.NetworkServicesControllerAliveTTL+graceTime { + if time.Since(hc.Status.NetworkServicesControllerAlive) > + hc.Config.IpvsSyncPeriod+hc.Status.NetworkServicesControllerAliveTTL+graceTime { klog.Error("NetworkService Controller heartbeat missed") health = false } @@ -176,7 +180,8 @@ func (hc *HealthController) RunServer(stopCh <-chan struct{}, wg *sync.WaitGroup } // RunCheck starts the HealthController's check -func (hc *HealthController) RunCheck(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (hc *HealthController) RunCheck(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{}, + wg *sync.WaitGroup) { t := time.NewTicker(healthControllerTickTime) defer wg.Done() for { diff --git a/pkg/metrics/metrics_controller.go b/pkg/metrics/metrics_controller.go index bb8445fb..65506c27 100644 --- a/pkg/metrics/metrics_controller.go +++ b/pkg/metrics/metrics_controller.go @@ -169,7 +169,8 @@ type Controller struct { } // Run prometheus metrics controller -func (mc *Controller) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (mc *Controller) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, + wg *sync.WaitGroup) { t := time.NewTicker(metricsControllerTickTime) defer wg.Done() klog.Info("Starting metrics controller") diff --git a/pkg/options/options.go b/pkg/options/options.go index b8a0b777..6da6cbd2 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -98,19 +98,23 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.AdvertiseExternalIP, "advertise-external-ip", false, "Add External IP of service to the RIB so that it gets advertised to the BGP peers.") fs.BoolVar(&s.AdvertiseLoadBalancerIP, "advertise-loadbalancer-ip", false, - "Add LoadbBalancer IP of service status as set by the LB provider to the RIB so that it gets advertised to the BGP peers.") + "Add LoadbBalancer IP of service status as set by the LB provider to the RIB so that it gets "+ + "advertised to the BGP peers.") fs.BoolVar(&s.AdvertiseNodePodCidr, "advertise-pod-cidr", true, "Add Node's POD cidr to the RIB so that it gets advertised to the BGP peers.") fs.BoolVar(&s.AutoMTU, "auto-mtu", true, "Auto detect and set the largest possible MTU for pod interfaces.") fs.BoolVar(&s.BGPGracefulRestart, "bgp-graceful-restart", false, "Enables the BGP Graceful Restart capability so that routes are preserved on unexpected restarts") - fs.DurationVar(&s.BGPGracefulRestartDeferralTime, "bgp-graceful-restart-deferral-time", s.BGPGracefulRestartDeferralTime, + fs.DurationVar(&s.BGPGracefulRestartDeferralTime, "bgp-graceful-restart-deferral-time", + s.BGPGracefulRestartDeferralTime, "BGP Graceful restart deferral time according to RFC4724 4.1, maximum 18h.") fs.DurationVar(&s.BGPGracefulRestartTime, "bgp-graceful-restart-time", s.BGPGracefulRestartTime, "BGP Graceful restart time according to RFC4724 3, maximum 4095s.") fs.DurationVar(&s.BGPHoldTime, "bgp-holdtime", DefaultBgpHoldTime, - "This parameter is mainly used to modify the holdtime declared to BGP peer. When Kube-router goes down abnormally, the local saving time of BGP route will be affected.Holdtime must be in the range 3s to 18h12m16s.") + "This parameter is mainly used to modify the holdtime declared to BGP peer. When Kube-router goes down "+ + "abnormally, the local saving time of BGP route will be affected. "+ + "Holdtime must be in the range 3s to 18h12m16s.") fs.Uint32Var(&s.BGPPort, "bgp-port", DefaultBgpPort, "The port open for incoming BGP connections and to use for connecting with other BGP peers.") fs.DurationVar(&s.CacheSyncTimeout, "cache-sync-timeout", s.CacheSyncTimeout, @@ -120,14 +124,16 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { fs.UintVar(&s.ClusterAsn, "cluster-asn", s.ClusterAsn, "ASN number under which cluster nodes will run iBGP.") fs.BoolVar(&s.DisableSrcDstCheck, "disable-source-dest-check", true, - "Disable the source-dest-check attribute for AWS EC2 instances. When this option is false, it must be set some other way.") + "Disable the source-dest-check attribute for AWS EC2 instances. When this option is false, it must be "+ + "set some other way.") fs.BoolVar(&s.EnableCNI, "enable-cni", true, "Enable CNI plugin. Disable if you want to use kube-router features alongside another CNI plugin.") fs.BoolVar(&s.EnableiBGP, "enable-ibgp", true, "Enables peering with nodes with the same ASN, if disabled will only peer with external BGP peers") fs.BoolVar(&s.EnableOverlay, "enable-overlay", true, - "When enable-overlay is set to true, IP-in-IP tunneling is used for pod-to-pod networking across nodes in different subnets. "+ - "When set to false no tunneling is used and routing infrastructure is expected to route traffic for pod-to-pod networking across nodes in different subnets") + "When enable-overlay is set to true, IP-in-IP tunneling is used for pod-to-pod networking across "+ + "nodes in different subnets. When set to false no tunneling is used and routing infrastructure is "+ + "expected to route traffic for pod-to-pod networking across nodes in different subnets") fs.BoolVar(&s.EnablePodEgress, "enable-pod-egress", true, "SNAT traffic from Pods to destinations outside the cluster.") fs.BoolVar(&s.EnablePprof, "enable-pprof", false, @@ -140,11 +146,13 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { fs.BoolVarP(&s.HelpRequested, "help", "h", false, "Print usage information.") fs.StringVar(&s.HostnameOverride, "hostname-override", s.HostnameOverride, - "Overrides the NodeName of the node. Set this if kube-router is unable to determine your NodeName automatically.") + "Overrides the NodeName of the node. Set this if kube-router is unable to determine your NodeName "+ + "automatically.") fs.DurationVar(&s.IPTablesSyncPeriod, "iptables-sync-period", s.IPTablesSyncPeriod, "The delay between iptables rule synchronizations (e.g. '5s', '1m'). Must be greater than 0.") fs.DurationVar(&s.IpvsGracefulPeriod, "ipvs-graceful-period", s.IpvsGracefulPeriod, - "The graceful period before removing destinations from IPVS services (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") + "The graceful period before removing destinations from IPVS services (e.g. '5s', '1m', '2h22m'). Must "+ + "be greater than 0.") fs.BoolVar(&s.IpvsGracefulTermination, "ipvs-graceful-termination", false, "Enables the experimental IPVS graceful terminaton capability") fs.BoolVar(&s.IpvsPermitAll, "ipvs-permit-all", true, @@ -166,21 +174,27 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.OverlayType, "overlay-type", s.OverlayType, "Possible values: subnet,full - "+ "When set to \"subnet\", the default, default \"--enable-overlay=true\" behavior is used. "+ - "When set to \"full\", it changes \"--enable-overlay=true\" default behavior so that IP-in-IP tunneling is used for pod-to-pod networking across nodes regardless of the subnet the nodes are in.") - fs.BoolVar(&s.OverrideNextHop, "override-nexthop", false, "Override the next-hop in bgp routes sent to peers with the local ip.") + "When set to \"full\", it changes \"--enable-overlay=true\" default behavior so that IP-in-IP tunneling "+ + "is used for pod-to-pod networking across nodes regardless of the subnet the nodes are in.") + fs.BoolVar(&s.OverrideNextHop, "override-nexthop", false, "Override the next-hop in bgp "+ + "routes sent to peers with the local ip.") fs.UintSliceVar(&s.PeerASNs, "peer-router-asns", s.PeerASNs, "ASN numbers of the BGP peer to which cluster nodes will advertise cluster ip and node's pod cidr.") fs.IPSliceVar(&s.PeerRouters, "peer-router-ips", s.PeerRouters, - "The ip address of the external router to which all nodes will peer and advertise the cluster ip and pod cidr's.") + "The ip address of the external router to which all nodes will peer and advertise the cluster ip and "+ + "pod cidr's.") fs.Uint8Var(&s.PeerMultihopTTL, "peer-router-multihop-ttl", s.PeerMultihopTTL, "Enable eBGP multihop supports -- sets multihop-ttl. (Relevant only if ttl >= 2)") fs.StringSliceVar(&s.PeerPasswords, "peer-router-passwords", s.PeerPasswords, "Password for authenticating against the BGP peer defined with \"--peer-router-ips\".") fs.StringVar(&s.PeerPasswordsFile, "peer-router-passwords-file", s.PeerPasswordsFile, - "Path to file containing password for authenticating against the BGP peer defined with \"--peer-router-ips\". --peer-router-passwords will be preferred if both are set.") + "Path to file containing password for authenticating against the BGP peer defined with "+ + "\"--peer-router-ips\". --peer-router-passwords will be preferred if both are set.") fs.UintSliceVar(&s.PeerPorts, "peer-router-ports", s.PeerPorts, - "The remote port of the external BGP to which all nodes will peer. If not set, default BGP port ("+strconv.Itoa(DefaultBgpPort)+") will be used.") - fs.StringVar(&s.RouterID, "router-id", "", "BGP router-id. Must be specified in a ipv6 only cluster.") + "The remote port of the external BGP to which all nodes will peer. If not set, default BGP "+ + "port ("+strconv.Itoa(DefaultBgpPort)+") will be used.") + fs.StringVar(&s.RouterID, "router-id", "", "BGP router-id. Must be specified in a ipv6 only "+ + "cluster.") fs.DurationVar(&s.RoutesSyncPeriod, "routes-sync-period", s.RoutesSyncPeriod, "The delay between route updates and advertisements (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") fs.BoolVar(&s.RunFirewall, "run-firewall", true, @@ -190,11 +204,13 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.RunServiceProxy, "run-service-proxy", true, "Enables Service Proxy -- sets up IPVS for Kubernetes Services.") fs.StringVar(&s.RuntimeEndpoint, "runtime-endpoint", "", - "Path to CRI compatible container runtime socket (used for DSR mode). Currently known working with containerd.") + "Path to CRI compatible container runtime socket (used for DSR mode). Currently known working with "+ + "containerd.") fs.StringVar(&s.ClusterIPCIDR, "service-cluster-ip-range", s.ClusterIPCIDR, "CIDR value from which service cluster IPs are assigned. Default: 10.96.0.0/12") fs.StringSliceVar(&s.ExternalIPCIDRs, "service-external-ip-range", s.ExternalIPCIDRs, - "Specify external IP CIDRs that are used for inter-cluster communication (can be specified multiple times)") + "Specify external IP CIDRs that are used for inter-cluster communication "+ + "(can be specified multiple times)") fs.StringVar(&s.NodePortRange, "service-node-port-range", s.NodePortRange, "NodePort range specified with either a hyphen or colon") fs.StringVarP(&s.VLevel, "v", "v", "0", "log level for V logs") diff --git a/pkg/utils/ipset.go b/pkg/utils/ipset.go index 35773c72..16b1b5c4 100644 --- a/pkg/utils/ipset.go +++ b/pkg/utils/ipset.go @@ -28,61 +28,125 @@ const ( // DefaultHasSize Defaul OptionHashSize value. DefaultHasSize = "1024" - // TypeHashIP The hash:ip set type uses a hash to store IP host addresses (default) or network addresses. Zero valued IP address cannot be stored in a hash:ip type of set. + // TypeHashIP The hash:ip set type uses a hash to store IP host addresses (default) or network addresses. Zero + // valued IP address cannot be stored in a hash:ip type of set. TypeHashIP = "hash:ip" - // TypeHashMac The hash:mac set type uses a hash to store MAC addresses. Zero valued MAC addresses cannot be stored in a hash:mac type of set. + // TypeHashMac The hash:mac set type uses a hash to store MAC addresses. Zero valued MAC addresses cannot be stored + // in a hash:mac type of set. TypeHashMac = "hash:mac" - // TypeHashNet The hash:net set type uses a hash to store different sized IP network addresses. Network address with zero prefix size cannot be stored in this type of sets. + // TypeHashNet The hash:net set type uses a hash to store different sized IP network addresses. Network address with + // zero prefix size cannot be stored in this type of sets. TypeHashNet = "hash:net" - // TypeHashNetNet The hash:net,net set type uses a hash to store pairs of different sized IP network addresses. Bear in mind that the first parameter has precedence over the second, so a nomatch entry could be potentially be ineffective if a more specific first parameter existed with a suitable second parameter. Network address with zero prefix size cannot be stored in this type of set. + // TypeHashNetNet The hash:net,net set type uses a hash to store pairs of different sized IP network addresses. Bear + // in mind that the first parameter has precedence over the second, so a nomatch entry could be potentially be + // ineffective if a more specific first parameter existed with a suitable second parameter. Network address with + // zero prefix size cannot be stored in this type of set. TypeHashNetNet = "hash:net,net" - // TypeHashIPPort The hash:ip,port set type uses a hash to store IP address and port number pairs. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. + // TypeHashIPPort The hash:ip,port set type uses a hash to store IP address and port number pairs. The port number + // is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. TypeHashIPPort = "hash:ip,port" - // TypeHashNetPort The hash:net,port set type uses a hash to store different sized IP network address and port pairs. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address with zero prefix size is not accepted either. + // TypeHashNetPort The hash:net,port set type uses a hash to store different sized IP network address and port + // pairs. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be + // used. Network address with zero prefix size is not accepted either. TypeHashNetPort = "hash:net,port" - // TypeHashIPPortIP The hash:ip,port,ip set type uses a hash to store IP address, port number and a second IP address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. + // TypeHashIPPortIP The hash:ip,port,ip set type uses a hash to store IP address, port number and a second IP + // address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number + // cannot be used. TypeHashIPPortIP = "hash:ip,port,ip" - // TypeHashIPPortNet The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address with zero prefix size cannot be stored either. + // TypeHashIPPortNet The hash:ip,port,net set type uses a hash to store IP address, port number and IP network + // address triples. The port number is interpreted together with a protocol (default TCP) and zero protocol number + // cannot be used. Network address with zero prefix size cannot be stored either. TypeHashIPPortNet = "hash:ip,port,net" // TypeHashIPMark The hash:ip,mark set type uses a hash to store IP address and packet mark pairs. TypeHashIPMark = "hash:ip,mark" - // TypeHashIPNetPortNet The hash:net,port,net set type behaves similarly to hash:ip,port,net but accepts a cidr value for both the first and last parameter. Either subnet is permitted to be a /0 should you wish to match port between all destinations. + // TypeHashIPNetPortNet The hash:net,port,net set type behaves similarly to hash:ip,port,net but accepts a cidr + // value for both the first and last parameter. Either subnet is permitted to be a /0 should you wish to match port + // between all destinations. TypeHashIPNetPortNet = "hash:net,port,net" - // TypeHashNetIface The hash:net,iface set type uses a hash to store different sized IP network address and interface name pairs. + // TypeHashNetIface The hash:net,iface set type uses a hash to store different sized IP network address and + // interface name pairs. TypeHashNetIface = "hash:net,iface" // TypeListSet The list:set type uses a simple list in which you can store set names. TypeListSet = "list:set" - // OptionTimeout All set types supports the optional timeout parameter when creating a set and adding entries. The value of the timeout parameter for the create command means the default timeout value (in seconds) for new entries. If a set is created with timeout support, then the same timeout option can be used to specify non-default timeout values when adding entries. Zero timeout value means the entry is added permanent to the set. The timeout value of already added elements can be changed by readding the element using the -exist option. When listing the set, the number of entries printed in the header might be larger than the listed number of entries for sets with the timeout extensions: the number of entries in the set is updated when elements added/deleted to the set and periodically when the garbage colletor evicts the timed out entries.` + // OptionTimeout All set types supports the optional timeout parameter when creating a set and adding entries. The + // value of the timeout parameter for the create command means the default timeout value (in seconds) for new + // entries. If a set is created with timeout support, then the same timeout option can be used to specify + // non-default timeout values when adding entries. Zero timeout value means the entry is added permanent to the + // set. The timeout value of already added elements can be changed by readding the element using the -exist option. + // When listing the set, the number of entries printed in the header might be larger than the listed number of + // entries for sets with the timeout extensions: the number of entries in the set is updated when elements + // added/deleted to the set and periodically when the garbage colletor evicts the timed out entries. OptionTimeout = "timeout" - // OptionCounters All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + // OptionCounters All set types support the optional counters option when creating a set. If the option is specified + // then the set is created with packet and byte counters per element support. The packet and byte counters are + // initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are + // explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero + // counter values. OptionCounters = "counters" - // OptionPackets All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + // OptionPackets All set types support the optional counters option when creating a set. If the option is specified + // then the set is created with packet and byte counters per element support. The packet and byte counters are + // initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are + // explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero + // counter values. OptionPackets = "packets" - // OptionBytes All set types support the optional counters option when creating a set. If the option is specified then the set is created with packet and byte counters per element support. The packet and byte counters are initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero counter values. + // OptionBytes All set types support the optional counters option when creating a set. If the option is specified + // then the set is created with packet and byte counters per element support. The packet and byte counters are + // initialized to zero when the elements are (re-)added to the set, unless the packet and byte counter values are + // explicitly specified by the packets and bytes options. An example when an element is added to a set with non-zero + // counter values. OptionBytes = "bytes" - // OptionComment All set types support the optional comment extension. Enabling this extension on an ipset enables you to annotate an ipset entry with an arbitrary string. This string is completely ignored by both the kernel and ipset itself and is purely for providing a convenient means to document the reason for an entry's existence. Comments must not contain any quotation marks and the usual escape character (\) has no meaning + // OptionComment All set types support the optional comment extension. Enabling this extension on an ipset enables + // you to annotate an ipset entry with an arbitrary string. This string is completely ignored by both the kernel and + // ipset itself and is purely for providing a convenient means to document the reason for an entry's existence. + // Comments must not contain any quotation marks and the usual escape character (\) has no meaning. OptionComment = "comment" - // OptionSkbinfo All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + // OptionSkbinfo All set types support the optional skbinfo extension. This extension allow to store the metainfo + // (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter + // target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers + // with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: + // MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. OptionSkbinfo = "skbinfo" - // OptionSkbmark All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + // OptionSkbmark All set types support the optional skbinfo extension. This extension allow to store the metainfo + // (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter + // target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers + // with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: + //MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. OptionSkbmark = "skbmark" - // OptionSkbprio All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + // OptionSkbprio All set types support the optional skbinfo extension. This extension allow to store the metainfo + // (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter + // target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers + // with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: + // MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. OptionSkbprio = "skbprio" - // OptionSkbqueue All set types support the optional skbinfo extension. This extension allow to store the metainfo (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. + // OptionSkbqueue All set types support the optional skbinfo extension. This extension allow to store the metainfo + // (firewall mark, tc class and hardware queue) with every entry and map it to packets by usage of SET netfilter + // target with --map-set option. skbmark option format: MARK or MARK/MASK, where MARK and MASK are 32bit hex numbers + // with 0x prefix. If only mark is specified mask 0xffffffff are used. skbprio option has tc class format: + // MAJOR:MINOR, where major and minor numbers are hex without 0x prefix. skbqueue option is just decimal number. OptionSkbqueue = "skbqueue" - // OptionHashSize This parameter is valid for the create command of all hash type sets. It defines the initial hash size for the set, default is 1024. The hash size must be a power of two, the kernel automatically rounds up non power of two hash sizes to the first correct value. + // OptionHashSize This parameter is valid for the create command of all hash type sets. It defines the initial hash + // size for the set, default is 1024. The hash size must be a power of two, the kernel automatically rounds up non + // power of two hash sizes to the first correct value. OptionHashSize = "hashsize" - // OptionMaxElem This parameter is valid for the create command of all hash type sets. It does define the maximal number of elements which can be stored in the set, default 65536. + // OptionMaxElem This parameter is valid for the create command of all hash type sets. It does define the maximal + // number of elements which can be stored in the set, default 65536. OptionMaxElem = "maxelem" - // OptionFamilly This parameter is valid for the create command of all hash type sets except for hash:mac. It defines the protocol family of the IP addresses to be stored in the set. The default is inet, i.e IPv4. + // OptionFamilly This parameter is valid for the create command of all hash type sets except for hash:mac. It + // defines the protocol family of the IP addresses to be stored in the set. The default is inet, i.e IPv4. OptionFamilly = "family" - // OptionNoMatch The hash set types which can store net type of data (i.e. hash:*net*) support the optional nomatch option when adding entries. When matching elements in the set, entries marked as nomatch are skipped as if those were not added to the set, which makes possible to build up sets with exceptions. See the example at hash type hash:net below. When elements are tested by ipset, the nomatch flags are taken into account. If one wants to test the existence of an element marked with nomatch in a set, then the flag must be specified too. + // OptionNoMatch The hash set types which can store net type of data (i.e. hash:*net*) support the optional nomatch + // option when adding entries. When matching elements in the set, entries marked as nomatch are skipped as if those + // were not added to the set, which makes possible to build up sets with exceptions. See the example at hash type + // hash:net below. When elements are tested by ipset, the nomatch flags are taken into account. If one wants to test + // the existence of an element marked with nomatch in a set, then the flag must be specified too. OptionNoMatch = "nomatch" - // OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created with this option become full the next addition to the set may succeed and evict a random entry from the set. + // OptionForceAdd All hash set types support the optional forceadd parameter when creating a set. When sets created + // with this option become full the next addition to the set may succeed and evict a random entry from the set. OptionForceAdd = "forceadd" - // tmpIPSetPrefix Is the prefix added to temporary ipset names used in the atomic swap operations during ipset restore. You should never see these on your system because they only exist during the restore. + // tmpIPSetPrefix Is the prefix added to temporary ipset names used in the atomic swap operations during ipset + // restore. You should never see these on your system because they only exist during the restore. tmpIPSetPrefix = "TMP-" ) diff --git a/pkg/utils/iptables.go b/pkg/utils/iptables.go index f1b71e36..2078f311 100644 --- a/pkg/utils/iptables.go +++ b/pkg/utils/iptables.go @@ -72,7 +72,8 @@ func Restore(table string, data []byte) error { return nil } -// AppendUnique ensures that rule is in chain only once in the buffer and that the occurrence is at the end of the buffer +// AppendUnique ensures that rule is in chain only once in the buffer and that the occurrence is at the end of the +// buffer func AppendUnique(buffer bytes.Buffer, chain string, rule []string) bytes.Buffer { var desiredBuffer bytes.Buffer