mirror of
https://github.com/cloudnativelabs/kube-router.git
synced 2025-10-07 16:01:08 +02:00
code cleanup (#301)
This commit is contained in:
parent
336989088a
commit
617c773655
@ -11,16 +11,16 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
//ControllerHeartbeat is the structure to hold the heartbeats sent by controlers
|
||||
//ControllerHeartbeat is the structure to hold the heartbeats sent by controllers
|
||||
type ControllerHeartbeat struct {
|
||||
Component string
|
||||
Lastheartbeat time.Time
|
||||
LastHeartBeat time.Time
|
||||
}
|
||||
|
||||
//HealthController reports the health of the controller loops as a http endpoint
|
||||
type HealthController struct {
|
||||
HealthPort uint16
|
||||
HTTPenabled bool
|
||||
HTTPEnabled bool
|
||||
Status HealthStats
|
||||
Config *options.KubeRouterConfig
|
||||
}
|
||||
@ -39,7 +39,7 @@ type HealthStats struct {
|
||||
func sendHeartBeat(channel chan<- *ControllerHeartbeat, controller string) {
|
||||
heartbeat := ControllerHeartbeat{
|
||||
Component: controller,
|
||||
Lastheartbeat: time.Now(),
|
||||
LastHeartBeat: time.Now(),
|
||||
}
|
||||
channel <- &heartbeat
|
||||
}
|
||||
@ -66,7 +66,7 @@ func (hc *HealthController) Handler(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
//HandleHeartbeat handles recevied heartbeats onthe health channel
|
||||
//HandleHeartbeat handles received heartbeats on the health channel
|
||||
func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) {
|
||||
glog.V(3).Infof("Received heartbeat from %s", beat.Component)
|
||||
|
||||
@ -131,7 +131,7 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
|
||||
http.HandleFunc("/healthz", hc.Handler)
|
||||
|
||||
if (hc.Config.HealthPort > 0) && (hc.Config.HealthPort <= 65535) {
|
||||
hc.HTTPenabled = true
|
||||
hc.HTTPEnabled = true
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
// cannot panic, because this probably is an intentional close
|
||||
@ -141,7 +141,7 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
|
||||
} else if hc.Config.MetricsPort > 65535 {
|
||||
glog.Errorf("Metrics port must be over 0 and under 65535, given port: %d", hc.Config.MetricsPort)
|
||||
} else {
|
||||
hc.HTTPenabled = false
|
||||
hc.HTTPEnabled = false
|
||||
}
|
||||
|
||||
for {
|
||||
@ -152,7 +152,7 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
|
||||
select {
|
||||
case <-stopCh:
|
||||
glog.Infof("Shutting down health controller")
|
||||
if hc.HTTPenabled {
|
||||
if hc.HTTPEnabled {
|
||||
if err := srv.Shutdown(context.Background()); err != nil {
|
||||
glog.Errorf("could not shutdown: %v", err)
|
||||
}
|
||||
@ -167,7 +167,7 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
|
||||
|
||||
}
|
||||
|
||||
//NewHealthController creates a new healh controller and returns a reference to it
|
||||
//NewHealthController creates a new health controller and returns a reference to it
|
||||
func NewHealthController(config *options.KubeRouterConfig) (*HealthController, error) {
|
||||
hc := HealthController{
|
||||
Config: config,
|
||||
|
@ -19,7 +19,7 @@ var (
|
||||
serviceTotalConn = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "service_total_connections",
|
||||
Help: "Total incoming conntections made",
|
||||
Help: "Total incoming connections made",
|
||||
}, []string{"namespace", "service_name", "service_vip", "protocol", "port"})
|
||||
servicePacketsIn = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -29,7 +29,7 @@ var (
|
||||
servicePacketsOut = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "service_packets_out",
|
||||
Help: "Total outoging packets",
|
||||
Help: "Total outgoing packets",
|
||||
}, []string{"namespace", "service_name", "service_vip", "protocol", "port"})
|
||||
serviceBytesIn = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -49,7 +49,7 @@ var (
|
||||
servicePpsOut = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "service_pps_out",
|
||||
Help: "Outoging packets per second",
|
||||
Help: "Outgoing packets per second",
|
||||
}, []string{"namespace", "service_name", "service_vip", "protocol", "port"})
|
||||
serviceCPS = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@ -64,7 +64,7 @@ var (
|
||||
serviceBpsOut = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Name: "service_bps_out",
|
||||
Help: "Outoging bytes per second",
|
||||
Help: "Outgoing bytes per second",
|
||||
}, []string{"namespace", "service_name", "service_vip", "protocol", "port"})
|
||||
controllerIpvsServices = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
|
@ -871,7 +871,7 @@ func (nrc *NetworkRoutingController) Cleanup() {
|
||||
}
|
||||
|
||||
func (nrc *NetworkRoutingController) disableSourceDestinationCheck() {
|
||||
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
|
||||
nodes, err := nrc.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list nodes from API server due to: %s. Can not perform BGP peer sync", err.Error())
|
||||
return
|
||||
@ -882,8 +882,8 @@ func (nrc *NetworkRoutingController) disableSourceDestinationCheck() {
|
||||
return
|
||||
}
|
||||
providerID := strings.Replace(node.Spec.ProviderID, "///", "//", 1)
|
||||
url, err := url.Parse(providerID)
|
||||
instanceID := url.Path
|
||||
URL, err := url.Parse(providerID)
|
||||
instanceID := URL.Path
|
||||
instanceID = strings.Trim(instanceID, "/")
|
||||
|
||||
sess, _ := session.NewSession(aws.NewConfig().WithMaxRetries(5))
|
||||
@ -913,7 +913,7 @@ func (nrc *NetworkRoutingController) disableSourceDestinationCheck() {
|
||||
|
||||
func (nrc *NetworkRoutingController) syncNodeIPSets() error {
|
||||
// Get the current list of the nodes from API server
|
||||
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
|
||||
nodes, err := nrc.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return errors.New("Failed to list nodes from API server: " + err.Error())
|
||||
}
|
||||
@ -979,7 +979,7 @@ func (nrc *NetworkRoutingController) syncInternalPeers() {
|
||||
}()
|
||||
|
||||
// get the current list of the nodes from API server
|
||||
nodes, err := nrc.clientset.Core().Nodes().List(metav1.ListOptions{})
|
||||
nodes, err := nrc.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list nodes from API server due to: %s. Can not perform BGP peer sync", err.Error())
|
||||
return
|
||||
@ -1323,7 +1323,7 @@ func (nrc *NetworkRoutingController) startBgpServer() error {
|
||||
}
|
||||
|
||||
// Get Global Peer Router Password configs
|
||||
peerPasswords := []string{}
|
||||
var peerPasswords []string
|
||||
nodeBGPPasswordsAnnotation, ok := node.ObjectMeta.Annotations["kube-router.io/peer.passwords"]
|
||||
if !ok {
|
||||
glog.Infof("Could not find BGP peer password info in the node's annotations. Assuming no passwords.")
|
||||
|
@ -336,7 +336,7 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
|
||||
for k, svc := range serviceInfoMap {
|
||||
var protocol uint16
|
||||
|
||||
switch aProtocol := svc.protocol; aProtocol {
|
||||
switch svc.protocol {
|
||||
case "tcp":
|
||||
protocol = syscall.IPPROTO_TCP
|
||||
case "udp":
|
||||
@ -663,12 +663,12 @@ func prepareEndpointForDsr(containerId string, endpointIP string, vip string) er
|
||||
glog.V(1).Infof("Current network namespace before netns.Set: " + activeNetworkNamespaceHandle.String())
|
||||
activeNetworkNamespaceHandle.Close()
|
||||
|
||||
client, err := client.NewEnvClient()
|
||||
dockerClient, err := client.NewEnvClient()
|
||||
if err != nil {
|
||||
return errors.New("Failed to get docker client due to " + err.Error())
|
||||
}
|
||||
|
||||
containerSpec, err := client.ContainerInspect(context.Background(), containerId)
|
||||
containerSpec, err := dockerClient.ContainerInspect(context.Background(), containerId)
|
||||
if err != nil {
|
||||
return errors.New("Failed to get docker container spec due to " + err.Error())
|
||||
}
|
||||
@ -842,7 +842,7 @@ func buildServicesInfo() serviceInfoMap {
|
||||
}
|
||||
}
|
||||
copy(svcInfo.externalIPs, svc.Spec.ExternalIPs)
|
||||
svcInfo.sessionAffinity = (svc.Spec.SessionAffinity == "ClientIP")
|
||||
svcInfo.sessionAffinity = svc.Spec.SessionAffinity == "ClientIP"
|
||||
_, svcInfo.hairpin = svc.ObjectMeta.Annotations["kube-router.io/service.hairpin"]
|
||||
_, svcInfo.local = svc.ObjectMeta.Annotations["kube-router.io/service.local"]
|
||||
|
||||
@ -1487,12 +1487,12 @@ func setupRoutesForExternalIPForDSR(serviceInfoMap serviceInfoMap) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// unique identfier for a load-balanced service (namespace + name + portname)
|
||||
// unique identifier for a load-balanced service (namespace + name + portname)
|
||||
func generateServiceId(namespace, svcName, port string) string {
|
||||
return namespace + "-" + svcName + "-" + port
|
||||
}
|
||||
|
||||
// unique identfier for a load-balanced service (namespace + name + portname)
|
||||
// unique identifier for a load-balanced service (namespace + name + portname)
|
||||
func generateIpPortId(ip, protocol, port string) string {
|
||||
return ip + "-" + protocol + "-" + port
|
||||
}
|
||||
@ -1532,7 +1532,7 @@ func getKubeDummyInterface() (netlink.Link, error) {
|
||||
dummyVipInterface, err := netlink.LinkByName(KUBE_DUMMY_IF)
|
||||
if err != nil && err.Error() == IFACE_NOT_FOUND {
|
||||
glog.V(1).Infof("Could not find dummy interface: " + KUBE_DUMMY_IF + " to assign cluster ip's, creating one")
|
||||
err = netlink.LinkAdd(&netlink.Dummy{netlink.LinkAttrs{Name: KUBE_DUMMY_IF}})
|
||||
err = netlink.LinkAdd(&netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: KUBE_DUMMY_IF}})
|
||||
if err != nil {
|
||||
return nil, errors.New("Failed to add dummy interface: " + err.Error())
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type Operation int
|
||||
@ -103,7 +103,7 @@ func StartEndpointsWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Du
|
||||
|
||||
ew.clientset = clientset
|
||||
ew.broadcaster = utils.NewBroadcaster()
|
||||
lw := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "endpoints", metav1.NamespaceAll, fields.Everything())
|
||||
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "endpoints", metav1.NamespaceAll, fields.Everything())
|
||||
ew.endpointsLister, ew.endpointsController = cache.NewIndexerInformer(
|
||||
lw,
|
||||
&api.Endpoints{}, resyncPeriod, eventHandler,
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type NamespaceUpdate struct {
|
||||
@ -97,7 +97,7 @@ func StartNamespaceWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Du
|
||||
|
||||
nsw.clientset = clientset
|
||||
nsw.broadcaster = utils.NewBroadcaster()
|
||||
lw := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "namespaces", metav1.NamespaceAll, fields.Everything())
|
||||
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "namespaces", metav1.NamespaceAll, fields.Everything())
|
||||
nsw.namespaceLister, nsw.namespaceController = cache.NewIndexerInformer(
|
||||
lw,
|
||||
&api.Namespace{}, resyncPeriod, eventHandler,
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type NetworkPolicyUpdate struct {
|
||||
@ -92,13 +92,13 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim
|
||||
npw.broadcaster = utils.NewBroadcaster()
|
||||
var lw *cache.ListWatch
|
||||
if v1NetworkPolicy {
|
||||
lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
|
||||
lw = cache.NewListWatchFromClient(clientset.NetworkingV1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
|
||||
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
|
||||
lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
} else {
|
||||
lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
|
||||
lw = cache.NewListWatchFromClient(clientset.ExtensionsV1beta1().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
|
||||
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
|
||||
lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type NodeUpdate struct {
|
||||
@ -85,7 +85,7 @@ func StartNodeWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duratio
|
||||
|
||||
nw.clientset = clientset
|
||||
nw.broadcaster = utils.NewBroadcaster()
|
||||
lw := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
|
||||
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
|
||||
nw.nodeLister, nw.nodeController = cache.NewIndexerInformer(
|
||||
lw,
|
||||
&api.Node{}, resyncPeriod, eventHandler,
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
cache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type PodUpdate struct {
|
||||
@ -102,7 +102,7 @@ func StartPodWatcher(clientset *kubernetes.Clientset, resyncPeriod time.Duration
|
||||
|
||||
pw.clientset = clientset
|
||||
pw.broadcaster = utils.NewBroadcaster()
|
||||
lw := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.Everything())
|
||||
lw := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.Everything())
|
||||
pw.podLister, pw.podController = cache.NewIndexerInformer(
|
||||
lw,
|
||||
&api.Pod{}, resyncPeriod, eventHandler,
|
||||
|
Loading…
x
Reference in New Issue
Block a user