Yaroslav Verbin 2019-04-17 15:01:31 +03:00
parent 256f4d612c
commit 2b1e1d9eff

View File

@ -30,7 +30,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -163,12 +162,6 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
services = sc.filterByServiceType(services) services = sc.filterByServiceType(services)
} }
// get the ip addresses of all the nodes and cache them for this run
nodeTargets, err := sc.extractNodeTargets()
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{} endpoints := []*endpoint.Endpoint{}
for _, svc := range services { for _, svc := range services {
@ -180,7 +173,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue continue
} }
svcEndpoints := sc.endpoints(svc, nodeTargets) svcEndpoints := sc.endpoints(svc)
// process legacy annotations if no endpoints were returned and compatibility mode is enabled. // process legacy annotations if no endpoints were returned and compatibility mode is enabled.
if len(svcEndpoints) == 0 && sc.compatibility != "" { if len(svcEndpoints) == 0 && sc.compatibility != "" {
@ -189,7 +182,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
// apply template if none of the above is found // apply template if none of the above is found
if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil { if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
sEndpoints, err := sc.endpointsFromTemplate(svc, nodeTargets) sEndpoints, err := sc.endpointsFromTemplate(svc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -280,7 +273,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return endpoints return endpoints
} }
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endpoint.Targets) ([]*endpoint.Endpoint, error) { func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint var endpoints []*endpoint.Endpoint
// Process the whole template string // Process the whole template string
@ -293,21 +286,21 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endp
providerSpecific := getProviderSpecificAnnotations(svc.Annotations) providerSpecific := getProviderSpecificAnnotations(svc.Annotations)
hostnameList := strings.Split(strings.Replace(buf.String(), " ", "", -1), ",") hostnameList := strings.Split(strings.Replace(buf.String(), " ", "", -1), ",")
for _, hostname := range hostnameList { for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets, providerSpecific)...) endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific)...)
} }
return endpoints, nil return endpoints, nil
} }
// endpointsFromService extracts the endpoints from a service object // endpointsFromService extracts the endpoints from a service object
func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets) []*endpoint.Endpoint { func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint var endpoints []*endpoint.Endpoint
// Skip endpoints if we do not want entries from annotations // Skip endpoints if we do not want entries from annotations
if !sc.ignoreHostnameAnnotation { if !sc.ignoreHostnameAnnotation {
providerSpecific := getProviderSpecificAnnotations(svc.Annotations) providerSpecific := getProviderSpecificAnnotations(svc.Annotations)
hostnameList := getHostnamesFromAnnotations(svc.Annotations) hostnameList := getHostnamesFromAnnotations(svc.Annotations)
for _, hostname := range hostnameList { for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets, providerSpecific)...) endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific)...)
} }
} }
return endpoints return endpoints
@ -363,7 +356,7 @@ func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endp
} }
} }
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nodeTargets endpoint.Targets, providerSpecific endpoint.ProviderSpecific) []*endpoint.Endpoint { func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific) []*endpoint.Endpoint {
hostname = strings.TrimSuffix(hostname, ".") hostname = strings.TrimSuffix(hostname, ".")
ttl, err := getTTLFromAnnotations(svc.Annotations) ttl, err := getTTLFromAnnotations(svc.Annotations)
if err != nil { if err != nil {
@ -403,8 +396,12 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nod
} }
case v1.ServiceTypeNodePort: case v1.ServiceTypeNodePort:
// add the nodeTargets and extract an SRV endpoint // add the nodeTargets and extract an SRV endpoint
targets = append(targets, nodeTargets...) targets, err = sc.extractNodePortTargets(svc)
endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, nodeTargets, hostname, ttl)...) if err != nil {
log.Errorf("Unable to extract targets from service %s/%s error: %v", svc.Namespace, svc.Name, err)
return endpoints
}
endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, targets, hostname, ttl)...)
} }
for _, t := range targets { for _, t := range targets {
@ -449,21 +446,45 @@ func extractLoadBalancerTargets(svc *v1.Service) endpoint.Targets {
return targets return targets
} }
func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) { func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
var ( var (
internalIPs endpoint.Targets internalIPs endpoint.Targets
externalIPs endpoint.Targets externalIPs endpoint.Targets
nodes []*v1.Node
err error
) )
nodes, err := sc.nodeInformer.Lister().List(labels.Everything()) switch svc.Spec.ExternalTrafficPolicy {
case v1.ServiceExternalTrafficPolicyTypeLocal:
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil { if err != nil {
if errors.IsForbidden(err) {
// Return an empty list because it makes sense to continue and try other sources.
log.Debugf("Unable to list nodes (Forbidden), returning empty list of targets (NodePort services will be skipped)")
return endpoint.Targets{}, nil
}
return nil, err return nil, err
} }
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, err
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
return nil, err
}
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}
nodes = append(nodes, node)
}
}
default:
nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
return nil, err
}
}
for _, node := range nodes { for _, node := range nodes {
for _, address := range node.Status.Addresses { for _, address := range node.Status.Addresses {