Merge pull request #148 from cloudnativelabs/goreportcard

fixing gofmt, go_vet, gocyclo, golint errors
This commit is contained in:
Murali Reddy 2017-09-04 14:54:52 +05:30 committed by GitHub
commit c3c5e56c3a
8 changed files with 69 additions and 57 deletions

View File

@ -36,6 +36,7 @@ import (
// by one or more network policy chains, till there is a match which will accept the packet, or gets
// dropped by the rule in the pod chain, if there is no match.
// strcut to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
@ -80,6 +81,7 @@ type protocolAndPort struct {
port string
}
// Run: runs forver till we recive notification on stopCh
func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
@ -115,6 +117,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
}
}
// OnPodUpdate: handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
@ -127,6 +130,7 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
}
}
// OnNetworkPolicyUpdate: handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
@ -138,6 +142,7 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
}
}
// OnNamespaceUpdate: handles updates to namespace from kubernetes api server
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
// namespace (and annotations on it) has no significance in GA ver of network policy
@ -511,7 +516,7 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
}
// TODO delete rule by spec, than rule number to avoid extra loop
var realRuleNo int = 0
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, chain) {
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))
@ -593,8 +598,8 @@ func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[
}
if npc.v1NetworkPolicy {
podNeedsFirewall := false
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*networking.NetworkPolicy)
for _, policyObj:= range watchers.NetworkPolicyWatcher.List() {
policy, _ := policyObj.(*networking.NetworkPolicy)
// we are only interested in the network policies in same namespace that of pod
if policy.Namespace != pod.ObjectMeta.Namespace {
@ -627,11 +632,11 @@ func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[
continue
}
} else {
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
defaultPolicy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
if strings.Compare(defaultPolicy, "DefaultDeny") != 0 {
continue
}
}
@ -647,9 +652,9 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj:= range watchers.NetworkPolicyWatcher.List() {
policy, ok := policy_obj.(*networking.NetworkPolicy)
policy, ok := policyObj.(*networking.NetworkPolicy)
if !ok {
return nil, fmt.Errorf("Failed to convert")
}
@ -742,9 +747,9 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj:= range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*apiextensions.NetworkPolicy)
policy, _ := policyObj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
@ -852,6 +857,7 @@ func getNodeIP(node *apiv1.Node) (net.IP, error) {
return nil, errors.New("host IP unknown")
}
// Cleanup: cleanup configurations done
func (npc *NetworkPolicyController) Cleanup() {
glog.Infof("Cleaning up iptables configuration permanently done by kube-router")
@ -869,7 +875,7 @@ func (npc *NetworkPolicyController) Cleanup() {
}
// TODO: need a better way to delte rule with out using number
var realRuleNo int = 0
var realRuleNo int
for i, rule := range forwardChainRules {
if strings.Contains(rule, "KUBE-POD-FW-") {
err = iptablesCmdHandler.Delete("filter", "FORWARD", strconv.Itoa(i-realRuleNo))

View File

@ -33,6 +33,7 @@ import (
"k8s.io/client-go/kubernetes"
)
// struct to hold necessary information required by controller
type NetworkRoutingController struct {
nodeIP net.IP
nodeHostName string
@ -67,6 +68,7 @@ const (
podSubnetIpSetName = "kube-router-pod-subnets"
)
// Run: run forever till until we are notified on stop channel
func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
cidr, err := utils.GetPodCidrFromCniSpec("/etc/cni/net.d/10-kuberouter.conf")
if err != nil {
@ -308,6 +310,7 @@ func (nrc *NetworkRoutingController) getClusterIps() ([]string, error) {
return clusterIpList, nil
}
// AdvertiseClusterIp: advertises the service cluster ip the configured peers
func (nrc *NetworkRoutingController) AdvertiseClusterIp(clusterIp string) error {
attrs := []bgp.PathAttributeInterface{
@ -520,6 +523,7 @@ func (nrc *NetworkRoutingController) injectRoute(path *table.Path) error {
return netlink.RouteReplace(route)
}
// Cleanup: performs the cleanup of configurations done
func (nrc *NetworkRoutingController) Cleanup() {
err := deletePodEgressRule()
if err != nil {
@ -793,7 +797,7 @@ func (nrc *NetworkRoutingController) enablePolicyBasedRouting() error {
return nil
}
// Handle updates from Node watcher. Node watcher calls this method whenever there is
// OnNodeUpdate: Handle updates from Node watcher. Node watcher calls this method whenever there is
// new node is added or old node is deleted. So peer up with new node and drop peering
// from old node
func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdate) {

View File

@ -27,6 +27,7 @@ import (
)
const (
// name of the dummy interface to which cluster ip are assigned
KUBE_DUMMY_IF = "kube-dummy-if"
IFACE_NOT_FOUND = "Link not found"
IFACE_HAS_ADDR = "file exists"
@ -58,13 +59,14 @@ var (
}, []string{"namespace", "service_name", "backend"})
)
// Network services controller enables local node as network service proxy through IPVS/LVS.
// NetworkServicesController enables local node as network service proxy through IPVS/LVS.
// Support only Kubernetes network services of type NodePort, ClusterIP, and LoadBalancer. For each service a
// IPVS service is created and for each service endpoint a server is added to the IPVS service.
// As services and endpoints are updated, network service controller gets the updates from
// the kubernetes api server and syncs the ipvs configuration to reflect state of services
// and endpoints
// struct for storing information needed by the controller
type NetworkServicesController struct {
nodeIP net.IP
nodeHostName string
@ -102,7 +104,7 @@ type endpointsInfo struct {
// map of all endpoints, with unique service id(namespace name, service name, port) as key
type endpointsInfoMap map[string][]endpointsInfo
// periodically sync ipvs configuration to reflect desired state of services and endpoints
// Run: periodically sync ipvs configuration to reflect desired state of services and endpoints
func (nsc *NetworkServicesController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) error {
t := time.NewTicker(nsc.syncPeriod)
@ -170,7 +172,7 @@ func (nsc *NetworkServicesController) sync() {
nsc.publishMetrics(nsc.serviceMap)
}
// handle change in endpoints update from the API server
// OnEndpointsUpdate: handle change in endpoints update from the API server
func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watchers.EndpointsUpdate) {
nsc.mu.Lock()
@ -192,7 +194,7 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(endpointsUpdate *watcher
}
}
// handle change in service update from the API server
// OnServiceUpdate: handle change in service update from the API server
func (nsc *NetworkServicesController) OnServiceUpdate(serviceUpdate *watchers.ServiceUpdate) {
nsc.mu.Lock()
@ -241,7 +243,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
}
// assign cluster IP of the service to the dummy interface so that its routable from the pod's on the node
vip := &netlink.Addr{IPNet: &net.IPNet{svc.clusterIP, net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK}
vip := &netlink.Addr{IPNet: &net.IPNet{IP: svc.clusterIP, Mask: net.IPv4Mask(255, 255, 255, 255)}, Scope: syscall.RT_SCOPE_LINK}
err := netlink.AddrAdd(dummyVipInterface, vip)
if err != nil && err.Error() != IFACE_HAS_ADDR {
glog.Errorf("Failed to assign cluster ip to dummy interface %s", err)
@ -249,7 +251,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
}
// create IPVS service for the service to be exposed through the cluster ip
ipvs_cluster_vip_svc, err := ipvsAddService(svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity)
ipvsClusterVipSvc, err := ipvsAddService(svc.clusterIP, protocol, uint16(svc.port), svc.sessionAffinity)
if err != nil {
glog.Errorf("Failed to create ipvs service for cluster ip: %s", err.Error())
continue
@ -258,10 +260,10 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
activeServiceEndpointMap[clusterServiceId] = make([]string, 0)
// create IPVS service for the service to be exposed through the nodeport
var ipvs_nodeport_svc *ipvs.Service
var ipvsNodeportSvc*ipvs.Service
var nodeServiceId string
if svc.nodePort != 0 {
ipvs_nodeport_svc, err = ipvsAddService(nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity)
ipvsNodeportSvc, err = ipvsAddService(nsc.nodeIP, protocol, uint16(svc.nodePort), svc.sessionAffinity)
if err != nil {
glog.Errorf("Failed to create ipvs service for node port")
continue
@ -280,7 +282,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
Weight: 1,
}
err := ipvsAddServer(ipvs_cluster_vip_svc, &dst)
err := ipvsAddServer(ipvsClusterVipSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
}
@ -289,7 +291,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
append(activeServiceEndpointMap[clusterServiceId], endpoint.ip)
if svc.nodePort != 0 {
err := ipvsAddServer(ipvs_nodeport_svc, &dst)
err := ipvsAddServer(ipvsNodeportSvc, &dst)
if err != nil {
glog.Errorf(err.Error())
}
@ -434,11 +436,11 @@ func shuffle(endPoints []endpointsInfo) []endpointsInfo {
func buildEndpointsInfo() endpointsInfoMap {
endpointsMap := make(endpointsInfoMap)
for _, ep := range watchers.EndpointsWatcher.List() {
for _, ep_subset := range ep.Subsets {
for _, port := range ep_subset.Ports {
for _, epSubset:= range ep.Subsets {
for _, port := range epSubset.Ports {
svcId := generateServiceId(ep.Namespace, ep.Name, port.Name)
endpoints := make([]endpointsInfo, 0)
for _, addr := range ep_subset.Addresses {
for _, addr := range epSubset.Addresses {
endpoints = append(endpoints, endpointsInfo{ip: addr.IP, port: int(port.Port)})
}
endpointsMap[svcId] = shuffle(endpoints)
@ -726,7 +728,7 @@ func ipvsAddService(vip net.IP, protocol, port uint16, persistent bool) (*ipvs.S
svc.Timeout = 180 * 60
}
if err := h.NewService(&svc); err != nil {
return nil, fmt.Errorf("Failed to create service: %s:%s:%s", vip.String(), protocol, strconv.Itoa(int(port)))
return nil, fmt.Errorf("Failed to create service: %s:%s:%s", vip.String(), strconv.Itoa(int(protocol)), strconv.Itoa(int(port)))
}
glog.Infof("Successfully added service: %s:%s:%s", vip.String(), protocol, strconv.Itoa(int(port)))
return &svc, nil
@ -743,10 +745,10 @@ func ipvsAddServer(service *ipvs.Service, dest *ipvs.Destination) error {
if strings.Contains(err.Error(), IPVS_SERVER_EXISTS) {
glog.Infof("ipvs destination %s:%s already exists in the ipvs service %s:%s:%s so not adding destination", dest.Address,
strconv.Itoa(int(dest.Port)), service.Address, service.Protocol, strconv.Itoa(int(service.Port)))
strconv.Itoa(int(dest.Port)), service.Address, strconv.Itoa(int(service.Protocol)), strconv.Itoa(int(service.Port)))
} else {
return fmt.Errorf("Failed to add ipvs destination %s:%s to the ipvs service %s:%s:%s due to : %s", dest.Address,
strconv.Itoa(int(dest.Port)), service.Address, service.Protocol, strconv.Itoa(int(service.Port)), err.Error())
strconv.Itoa(int(dest.Port)), service.Address, strconv.Itoa(int(service.Protocol)), strconv.Itoa(int(service.Port)), err.Error())
}
return nil
}
@ -779,7 +781,7 @@ func getKubeDummyInterface() (netlink.Link, error) {
return dummyVipInterface, nil
}
// clean up all the configurations (IPVS, iptables, links)
// Cleanup: clean all the configurations (IPVS, iptables, links) done
func (nsc *NetworkServicesController) Cleanup() {
// cleanup ipvs rules by flush
glog.Infof("Cleaning up IPVS configuration permanently")

View File

@ -76,12 +76,12 @@ func (ew *endpointsWatcher) RegisterHandler(handler EndpointsUpdatesHandler) {
}
func (ew *endpointsWatcher) List() []*api.Endpoints {
obj_list := ew.endpointsLister.List()
ep_instances := make([]*api.Endpoints, len(obj_list))
for i, ins := range obj_list {
ep_instances[i] = ins.(*api.Endpoints)
objList := ew.endpointsLister.List()
epInstances := make([]*api.Endpoints, len(objList))
for i, ins := range objList {
epInstances[i] = ins.(*api.Endpoints)
}
return ep_instances
return epInstances
}
func (ew *endpointsWatcher) HasSynced() bool {

View File

@ -7,11 +7,11 @@ import (
"github.com/cloudnativelabs/kube-router/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
api "k8s.io/client-go/pkg/api/v1"
cache "k8s.io/client-go/tools/cache"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
type NamespaceUpdate struct {
@ -62,12 +62,12 @@ func (nsw *namespaceWatcher) namespaceUpdateEventHandler(oldObj, newObj interfac
}
func (nsw *namespaceWatcher) List() []*api.Namespace {
obj_list := nsw.namespaceLister.List()
namespace_instances := make([]*api.Namespace, len(obj_list))
for i, ins := range obj_list {
namespace_instances[i] = ins.(*api.Namespace)
objList := nsw.namespaceLister.List()
namespaceInstances := make([]*api.Namespace, len(objList))
for i, ins := range objList {
namespaceInstances[i] = ins.(*api.Namespace)
}
return namespace_instances
return namespaceInstances
}
func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, error) {
@ -75,9 +75,8 @@ func (nsw *namespaceWatcher) ListByLabels(set labels.Set) ([]*api.Namespace, err
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
if err != nil {
return nil, err
} else {
return matchedNamespaces, nil
}
return matchedNamespaces, nil
}
func (nsw *namespaceWatcher) RegisterHandler(handler NamespaceUpdatesHandler) {

View File

@ -59,12 +59,12 @@ func (nw *nodeWatcher) RegisterHandler(handler NodeUpdatesHandler) {
}
func (nw *nodeWatcher) List() []*api.Node {
obj_list := nw.nodeLister.List()
node_instances := make([]*api.Node, len(obj_list))
for i, ins := range obj_list {
node_instances[i] = ins.(*api.Node)
objList := nw.nodeLister.List()
nodeInstances := make([]*api.Node, len(objList))
for i, ins := range objList {
nodeInstances[i] = ins.(*api.Node)
}
return node_instances
return nodeInstances
}
func (nw *nodeWatcher) HasSynced() bool {

View File

@ -67,12 +67,12 @@ func (pw *podWatcher) RegisterHandler(handler PodUpdatesHandler) {
}
func (pw *podWatcher) List() []*api.Pod {
obj_list := pw.podLister.List()
pod_instances := make([]*api.Pod, len(obj_list))
for i, ins := range obj_list {
pod_instances[i] = ins.(*api.Pod)
objList := pw.podLister.List()
podInstances := make([]*api.Pod, len(objList))
for i, ins := range objList {
podInstances[i] = ins.(*api.Pod)
}
return pod_instances
return podInstances
}
func (pw *podWatcher) ListByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {

View File

@ -65,12 +65,12 @@ func (svcw *serviceWatcher) RegisterHandler(handler ServiceUpdatesHandler) {
}
func (svcw *serviceWatcher) List() []*api.Service {
obj_list := svcw.serviceLister.List()
svc_instances := make([]*api.Service, len(obj_list))
for i, ins := range obj_list {
svc_instances[i] = ins.(*api.Service)
objList := svcw.serviceLister.List()
svcInstances := make([]*api.Service, len(objList))
for i, ins := range objList{
svcInstances[i] = ins.(*api.Service)
}
return svc_instances
return svcInstances
}
func (svcw *serviceWatcher) HasSynced() bool {
@ -79,6 +79,7 @@ func (svcw *serviceWatcher) HasSynced() bool {
var servicesStopCh chan struct{}
// StartServiceWatcher: start watching updates for services from Kuberentes API server
func StartServiceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration) (*serviceWatcher, error) {
svcw := serviceWatcher{}