mirror of
				https://github.com/kubernetes-sigs/external-dns.git
				synced 2025-10-26 16:20:59 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			721 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			721 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package source
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"text/template"
 | |
| 
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| 	v1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	kubeinformers "k8s.io/client-go/informers"
 | |
| 	coreinformers "k8s.io/client-go/informers/core/v1"
 | |
| 	"k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 
 | |
| 	"sigs.k8s.io/external-dns/endpoint"
 | |
| )
 | |
| 
 | |
| // serviceSource is an implementation of Source for Kubernetes service objects.
 | |
| // It will find all services that are under our jurisdiction, i.e. annotated
 | |
| // desired hostname and matching or no controller annotation. For each of the
 | |
| // matched services' entrypoints it will return a corresponding
 | |
| // Endpoint object.
 | |
| type serviceSource struct {
 | |
| 	client           kubernetes.Interface
 | |
| 	namespace        string
 | |
| 	annotationFilter string
 | |
| 
 | |
| 	// process Services with legacy annotations
 | |
| 	compatibility                  string
 | |
| 	fqdnTemplate                   *template.Template
 | |
| 	combineFQDNAnnotation          bool
 | |
| 	ignoreHostnameAnnotation       bool
 | |
| 	publishInternal                bool
 | |
| 	publishHostIP                  bool
 | |
| 	alwaysPublishNotReadyAddresses bool
 | |
| 	resolveLoadBalancerHostname    bool
 | |
| 	serviceInformer                coreinformers.ServiceInformer
 | |
| 	endpointsInformer              coreinformers.EndpointsInformer
 | |
| 	podInformer                    coreinformers.PodInformer
 | |
| 	nodeInformer                   coreinformers.NodeInformer
 | |
| 	serviceTypeFilter              map[string]struct{}
 | |
| 	labelSelector                  labels.Selector
 | |
| }
 | |
| 
 | |
| // NewServiceSource creates a new serviceSource with the given config.
 | |
| func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector, resolveLoadBalancerHostname bool) (Source, error) {
 | |
| 	tmpl, err := parseTemplate(fqdnTemplate)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
 | |
| 	// Set resync period to 0, to prevent processing when nothing has changed
 | |
| 	informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
 | |
| 	serviceInformer := informerFactory.Core().V1().Services()
 | |
| 	endpointsInformer := informerFactory.Core().V1().Endpoints()
 | |
| 	podInformer := informerFactory.Core().V1().Pods()
 | |
| 	nodeInformer := informerFactory.Core().V1().Nodes()
 | |
| 
 | |
| 	// Add default resource event handlers to properly initialize informer.
 | |
| 	serviceInformer.Informer().AddEventHandler(
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(obj interface{}) {
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 	endpointsInformer.Informer().AddEventHandler(
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(obj interface{}) {
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 	podInformer.Informer().AddEventHandler(
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(obj interface{}) {
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 	nodeInformer.Informer().AddEventHandler(
 | |
| 		cache.ResourceEventHandlerFuncs{
 | |
| 			AddFunc: func(obj interface{}) {
 | |
| 			},
 | |
| 		},
 | |
| 	)
 | |
| 
 | |
| 	informerFactory.Start(ctx.Done())
 | |
| 
 | |
| 	// wait for the local cache to be populated.
 | |
| 	if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Transform the slice into a map so it will
 | |
| 	// be way much easier and fast to filter later
 | |
| 	serviceTypes := make(map[string]struct{})
 | |
| 	for _, serviceType := range serviceTypeFilter {
 | |
| 		serviceTypes[serviceType] = struct{}{}
 | |
| 	}
 | |
| 
 | |
| 	return &serviceSource{
 | |
| 		client:                         kubeClient,
 | |
| 		namespace:                      namespace,
 | |
| 		annotationFilter:               annotationFilter,
 | |
| 		compatibility:                  compatibility,
 | |
| 		fqdnTemplate:                   tmpl,
 | |
| 		combineFQDNAnnotation:          combineFqdnAnnotation,
 | |
| 		ignoreHostnameAnnotation:       ignoreHostnameAnnotation,
 | |
| 		publishInternal:                publishInternal,
 | |
| 		publishHostIP:                  publishHostIP,
 | |
| 		alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
 | |
| 		serviceInformer:                serviceInformer,
 | |
| 		endpointsInformer:              endpointsInformer,
 | |
| 		podInformer:                    podInformer,
 | |
| 		nodeInformer:                   nodeInformer,
 | |
| 		serviceTypeFilter:              serviceTypes,
 | |
| 		labelSelector:                  labelSelector,
 | |
| 		resolveLoadBalancerHostname:    resolveLoadBalancerHostname,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Endpoints returns endpoint objects for each service that should be processed.
 | |
| func (sc *serviceSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
 | |
| 	services, err := sc.serviceInformer.Lister().Services(sc.namespace).List(sc.labelSelector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	services, err = sc.filterByAnnotations(services)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// filter on service types if at least one has been provided
 | |
| 	if len(sc.serviceTypeFilter) > 0 {
 | |
| 		services = sc.filterByServiceType(services)
 | |
| 	}
 | |
| 
 | |
| 	endpoints := []*endpoint.Endpoint{}
 | |
| 
 | |
| 	for _, svc := range services {
 | |
| 		// Check controller annotation to see if we are responsible.
 | |
| 		controller, ok := svc.Annotations[controllerAnnotationKey]
 | |
| 		if ok && controller != controllerAnnotationValue {
 | |
| 			log.Debugf("Skipping service %s/%s because controller value does not match, found: %s, required: %s",
 | |
| 				svc.Namespace, svc.Name, controller, controllerAnnotationValue)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		svcEndpoints := sc.endpoints(svc)
 | |
| 
 | |
| 		// process legacy annotations if no endpoints were returned and compatibility mode is enabled.
 | |
| 		if len(svcEndpoints) == 0 && sc.compatibility != "" {
 | |
| 			svcEndpoints, err = legacyEndpointsFromService(svc, sc)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// apply template if none of the above is found
 | |
| 		if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
 | |
| 			sEndpoints, err := sc.endpointsFromTemplate(svc)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			if sc.combineFQDNAnnotation {
 | |
| 				svcEndpoints = append(svcEndpoints, sEndpoints...)
 | |
| 			} else {
 | |
| 				svcEndpoints = sEndpoints
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if len(svcEndpoints) == 0 {
 | |
| 			log.Debugf("No endpoints could be generated from service %s/%s", svc.Namespace, svc.Name)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		log.Debugf("Endpoints generated from service: %s/%s: %v", svc.Namespace, svc.Name, svcEndpoints)
 | |
| 		sc.setResourceLabel(svc, svcEndpoints)
 | |
| 		endpoints = append(endpoints, svcEndpoints...)
 | |
| 	}
 | |
| 
 | |
| 	// this sorting is required to make merging work.
 | |
| 	// after we merge endpoints that have same DNS, we want to ensure that we end up with the same service being an "owner"
 | |
| 	// of all those records, as otherwise each time we update, we will end up with a different service that gets data merged in
 | |
| 	// and that will cause external-dns to recreate dns record due to different service owner in TXT record.
 | |
| 	// if new service is added or old one removed, that might cause existing record to get re-created due to potentially new
 | |
| 	// owner being selected. Which is fine, since it shouldn't happen often and shouldn't cause any disruption.
 | |
| 	if len(endpoints) > 1 {
 | |
| 		sort.Slice(endpoints, func(i, j int) bool {
 | |
| 			return endpoints[i].Labels[endpoint.ResourceLabelKey] < endpoints[j].Labels[endpoint.ResourceLabelKey]
 | |
| 		})
 | |
| 		// Use stable sort to not disrupt the order of services
 | |
| 		sort.SliceStable(endpoints, func(i, j int) bool {
 | |
| 			if endpoints[i].DNSName != endpoints[j].DNSName {
 | |
| 				return endpoints[i].DNSName < endpoints[j].DNSName
 | |
| 			}
 | |
| 			return endpoints[i].RecordType < endpoints[j].RecordType
 | |
| 		})
 | |
| 		mergedEndpoints := []*endpoint.Endpoint{}
 | |
| 		mergedEndpoints = append(mergedEndpoints, endpoints[0])
 | |
| 		for i := 1; i < len(endpoints); i++ {
 | |
| 			lastMergedEndpoint := len(mergedEndpoints) - 1
 | |
| 			if mergedEndpoints[lastMergedEndpoint].DNSName == endpoints[i].DNSName &&
 | |
| 				mergedEndpoints[lastMergedEndpoint].RecordType == endpoints[i].RecordType &&
 | |
| 				mergedEndpoints[lastMergedEndpoint].SetIdentifier == endpoints[i].SetIdentifier &&
 | |
| 				mergedEndpoints[lastMergedEndpoint].RecordTTL == endpoints[i].RecordTTL {
 | |
| 				mergedEndpoints[lastMergedEndpoint].Targets = append(mergedEndpoints[lastMergedEndpoint].Targets, endpoints[i].Targets[0])
 | |
| 			} else {
 | |
| 				mergedEndpoints = append(mergedEndpoints, endpoints[i])
 | |
| 			}
 | |
| 		}
 | |
| 		endpoints = mergedEndpoints
 | |
| 	}
 | |
| 
 | |
| 	for _, ep := range endpoints {
 | |
| 		sort.Sort(ep.Targets)
 | |
| 	}
 | |
| 
 | |
| 	return endpoints, nil
 | |
| }
 | |
| 
 | |
| // extractHeadlessEndpoints extracts endpoints from a headless service using the "Endpoints" Kubernetes API resource
 | |
| func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 
 | |
| 	labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
 | |
| 	if err != nil {
 | |
| 		log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
 | |
| 		return endpoints
 | |
| 	}
 | |
| 
 | |
| 	pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
 | |
| 	if err != nil {
 | |
| 		log.Errorf("List pods of service[%s] error: %v", svc.GetName(), err)
 | |
| 		return endpoints
 | |
| 	}
 | |
| 
 | |
| 	endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
 | |
| 
 | |
| 	targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
 | |
| 	for _, subset := range endpointsObject.Subsets {
 | |
| 		addresses := subset.Addresses
 | |
| 		if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
 | |
| 			addresses = append(addresses, subset.NotReadyAddresses...)
 | |
| 		}
 | |
| 
 | |
| 		for _, address := range addresses {
 | |
| 			// find pod for this address
 | |
| 			if address.TargetRef == nil || address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
 | |
| 				log.Debugf("Skipping address because its target is not a pod: %v", address)
 | |
| 				continue
 | |
| 			}
 | |
| 			var pod *v1.Pod
 | |
| 			for _, v := range pods {
 | |
| 				if v.Name == address.TargetRef.Name {
 | |
| 					pod = v
 | |
| 					break
 | |
| 				}
 | |
| 			}
 | |
| 			if pod == nil {
 | |
| 				log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			headlessDomains := []string{hostname}
 | |
| 			if pod.Spec.Hostname != "" {
 | |
| 				headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", pod.Spec.Hostname, hostname))
 | |
| 			}
 | |
| 
 | |
| 			for _, headlessDomain := range headlessDomains {
 | |
| 				targets := getTargetsFromTargetAnnotation(pod.Annotations)
 | |
| 				if len(targets) == 0 {
 | |
| 					if endpointsType == EndpointsTypeNodeExternalIP {
 | |
| 						node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName)
 | |
| 						if err != nil {
 | |
| 							log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err)
 | |
| 							return endpoints
 | |
| 						}
 | |
| 						for _, address := range node.Status.Addresses {
 | |
| 							if address.Type == v1.NodeExternalIP || (address.Type == v1.NodeInternalIP && suitableType(address.Address) == endpoint.RecordTypeAAAA) {
 | |
| 								targets = append(targets, address.Address)
 | |
| 								log.Debugf("Generating matching endpoint %s with NodeExternalIP %s", headlessDomain, address.Address)
 | |
| 							}
 | |
| 						}
 | |
| 					} else if endpointsType == EndpointsTypeHostIP || sc.publishHostIP {
 | |
| 						targets = endpoint.Targets{pod.Status.HostIP}
 | |
| 						log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP)
 | |
| 					} else {
 | |
| 						targets = endpoint.Targets{address.IP}
 | |
| 						log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, address.IP)
 | |
| 					}
 | |
| 				}
 | |
| 				for _, target := range targets {
 | |
| 					key := endpoint.EndpointKey{
 | |
| 						DNSName:    headlessDomain,
 | |
| 						RecordType: suitableType(target),
 | |
| 					}
 | |
| 					targetsByHeadlessDomainAndType[key] = append(targetsByHeadlessDomainAndType[key], target)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	headlessKeys := []endpoint.EndpointKey{}
 | |
| 	for headlessKey := range targetsByHeadlessDomainAndType {
 | |
| 		headlessKeys = append(headlessKeys, headlessKey)
 | |
| 	}
 | |
| 	sort.Slice(headlessKeys, func(i, j int) bool {
 | |
| 		if headlessKeys[i].DNSName != headlessKeys[j].DNSName {
 | |
| 			return headlessKeys[i].DNSName < headlessKeys[j].DNSName
 | |
| 		}
 | |
| 		return headlessKeys[i].RecordType < headlessKeys[j].RecordType
 | |
| 	})
 | |
| 	for _, headlessKey := range headlessKeys {
 | |
| 		allTargets := targetsByHeadlessDomainAndType[headlessKey]
 | |
| 		targets := []string{}
 | |
| 
 | |
| 		deduppedTargets := map[string]struct{}{}
 | |
| 		for _, target := range allTargets {
 | |
| 			if _, ok := deduppedTargets[target]; ok {
 | |
| 				log.Debugf("Removing duplicate target %s", target)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			deduppedTargets[target] = struct{}{}
 | |
| 			targets = append(targets, target)
 | |
| 		}
 | |
| 
 | |
| 		var ep *endpoint.Endpoint
 | |
| 		if ttl.IsConfigured() {
 | |
| 			ep = endpoint.NewEndpointWithTTL(headlessKey.DNSName, headlessKey.RecordType, ttl, targets...)
 | |
| 		} else {
 | |
| 			ep = endpoint.NewEndpoint(headlessKey.DNSName, headlessKey.RecordType, targets...)
 | |
| 		}
 | |
| 
 | |
| 		if ep != nil {
 | |
| 			endpoints = append(endpoints, ep)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return endpoints
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {
 | |
| 	hostnames, err := execTemplate(sc.fqdnTemplate, svc)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	providerSpecific, setIdentifier := getProviderSpecificAnnotations(svc.Annotations)
 | |
| 
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 	for _, hostname := range hostnames {
 | |
| 		endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
 | |
| 	}
 | |
| 
 | |
| 	return endpoints, nil
 | |
| }
 | |
| 
 | |
| // endpointsFromService extracts the endpoints from a service object
 | |
| func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 	// Skip endpoints if we do not want entries from annotations
 | |
| 	if !sc.ignoreHostnameAnnotation {
 | |
| 		providerSpecific, setIdentifier := getProviderSpecificAnnotations(svc.Annotations)
 | |
| 		var hostnameList []string
 | |
| 		var internalHostnameList []string
 | |
| 
 | |
| 		hostnameList = getHostnamesFromAnnotations(svc.Annotations)
 | |
| 		for _, hostname := range hostnameList {
 | |
| 			endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
 | |
| 		}
 | |
| 
 | |
| 		internalHostnameList = getInternalHostnamesFromAnnotations(svc.Annotations)
 | |
| 		for _, hostname := range internalHostnameList {
 | |
| 			endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, true)...)
 | |
| 		}
 | |
| 	}
 | |
| 	return endpoints
 | |
| }
 | |
| 
 | |
| // filterByAnnotations filters a list of services by a given annotation selector.
 | |
| func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) {
 | |
| 	labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// empty filter returns original list
 | |
| 	if selector.Empty() {
 | |
| 		return services, nil
 | |
| 	}
 | |
| 
 | |
| 	filteredList := []*v1.Service{}
 | |
| 
 | |
| 	for _, service := range services {
 | |
| 		// convert the service's annotations to an equivalent label selector
 | |
| 		annotations := labels.Set(service.Annotations)
 | |
| 
 | |
| 		// include service if its annotations match the selector
 | |
| 		if selector.Matches(annotations) {
 | |
| 			filteredList = append(filteredList, service)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return filteredList, nil
 | |
| }
 | |
| 
 | |
| // filterByServiceType filters services according their types
 | |
| func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service {
 | |
| 	filteredList := []*v1.Service{}
 | |
| 	for _, service := range services {
 | |
| 		// Check if the service is of the given type or not
 | |
| 		if _, ok := sc.serviceTypeFilter[string(service.Spec.Type)]; ok {
 | |
| 			filteredList = append(filteredList, service)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return filteredList
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endpoint.Endpoint) {
 | |
| 	for _, ep := range endpoints {
 | |
| 		ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("service/%s/%s", service.Namespace, service.Name)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific, setIdentifier string, useClusterIP bool) (endpoints []*endpoint.Endpoint) {
 | |
| 	hostname = strings.TrimSuffix(hostname, ".")
 | |
| 
 | |
| 	resource := fmt.Sprintf("service/%s/%s", svc.Namespace, svc.Name)
 | |
| 
 | |
| 	ttl := getTTLFromAnnotations(svc.Annotations, resource)
 | |
| 
 | |
| 	targets := getTargetsFromTargetAnnotation(svc.Annotations)
 | |
| 
 | |
| 	if len(targets) == 0 {
 | |
| 		switch svc.Spec.Type {
 | |
| 		case v1.ServiceTypeLoadBalancer:
 | |
| 			if useClusterIP {
 | |
| 				targets = extractServiceIps(svc)
 | |
| 			} else {
 | |
| 				targets = extractLoadBalancerTargets(svc, sc.resolveLoadBalancerHostname)
 | |
| 			}
 | |
| 		case v1.ServiceTypeClusterIP:
 | |
| 			if svc.Spec.ClusterIP == v1.ClusterIPNone {
 | |
| 				endpoints = append(endpoints, sc.extractHeadlessEndpoints(svc, hostname, ttl)...)
 | |
| 			} else if useClusterIP || sc.publishInternal {
 | |
| 				targets = extractServiceIps(svc)
 | |
| 			}
 | |
| 		case v1.ServiceTypeNodePort:
 | |
| 			// add the nodeTargets and extract an SRV endpoint
 | |
| 			var err error
 | |
| 			targets, err = sc.extractNodePortTargets(svc)
 | |
| 			if err != nil {
 | |
| 				log.Errorf("Unable to extract targets from service %s/%s error: %v", svc.Namespace, svc.Name, err)
 | |
| 				return endpoints
 | |
| 			}
 | |
| 			endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, hostname, ttl)...)
 | |
| 		case v1.ServiceTypeExternalName:
 | |
| 			targets = extractServiceExternalName(svc)
 | |
| 		}
 | |
| 
 | |
| 		for _, endpoint := range endpoints {
 | |
| 			endpoint.ProviderSpecific = providerSpecific
 | |
| 			endpoint.SetIdentifier = setIdentifier
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	endpoints = append(endpoints, endpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)...)
 | |
| 
 | |
| 	return endpoints
 | |
| }
 | |
| 
 | |
| func extractServiceIps(svc *v1.Service) endpoint.Targets {
 | |
| 	if svc.Spec.ClusterIP == v1.ClusterIPNone {
 | |
| 		log.Debugf("Unable to associate %s headless service with a Cluster IP", svc.Name)
 | |
| 		return endpoint.Targets{}
 | |
| 	}
 | |
| 	return endpoint.Targets{svc.Spec.ClusterIP}
 | |
| }
 | |
| 
 | |
| func extractServiceExternalName(svc *v1.Service) endpoint.Targets {
 | |
| 	if len(svc.Spec.ExternalIPs) > 0 {
 | |
| 		return svc.Spec.ExternalIPs
 | |
| 	}
 | |
| 	return endpoint.Targets{svc.Spec.ExternalName}
 | |
| }
 | |
| 
 | |
| func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname bool) endpoint.Targets {
 | |
| 	if len(svc.Spec.ExternalIPs) > 0 {
 | |
| 		return svc.Spec.ExternalIPs
 | |
| 	}
 | |
| 
 | |
| 	// Create a corresponding endpoint for each configured external entrypoint.
 | |
| 	var targets endpoint.Targets
 | |
| 	for _, lb := range svc.Status.LoadBalancer.Ingress {
 | |
| 		if lb.IP != "" {
 | |
| 			targets = append(targets, lb.IP)
 | |
| 		}
 | |
| 		if lb.Hostname != "" {
 | |
| 			if resolveLoadBalancerHostname {
 | |
| 				ips, err := net.LookupIP(lb.Hostname)
 | |
| 				if err != nil {
 | |
| 					log.Errorf("Unable to resolve %q: %v", lb.Hostname, err)
 | |
| 					continue
 | |
| 				}
 | |
| 				for _, ip := range ips {
 | |
| 					targets = append(targets, ip.String())
 | |
| 				}
 | |
| 			} else {
 | |
| 				targets = append(targets, lb.Hostname)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return targets
 | |
| }
 | |
| 
 | |
| func isPodStatusReady(status v1.PodStatus) bool {
 | |
| 	_, condition := getPodCondition(&status, v1.PodReady)
 | |
| 	return condition != nil && condition.Status == v1.ConditionTrue
 | |
| }
 | |
| 
 | |
| func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
 | |
| 	if status == nil {
 | |
| 		return -1, nil
 | |
| 	}
 | |
| 	return getPodConditionFromList(status.Conditions, conditionType)
 | |
| }
 | |
| 
 | |
| func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
 | |
| 	if conditions == nil {
 | |
| 		return -1, nil
 | |
| 	}
 | |
| 	for i := range conditions {
 | |
| 		if conditions[i].Type == conditionType {
 | |
| 			return i, &conditions[i]
 | |
| 		}
 | |
| 	}
 | |
| 	return -1, nil
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
 | |
| 	var (
 | |
| 		internalIPs endpoint.Targets
 | |
| 		externalIPs endpoint.Targets
 | |
| 		ipv6IPs     endpoint.Targets
 | |
| 		nodes       []*v1.Node
 | |
| 		err         error
 | |
| 	)
 | |
| 
 | |
| 	switch svc.Spec.ExternalTrafficPolicy {
 | |
| 	case v1.ServiceExternalTrafficPolicyTypeLocal:
 | |
| 		nodesMap := map[*v1.Node]struct{}{}
 | |
| 		labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		selector, err := metav1.LabelSelectorAsSelector(labelSelector)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		var nodesReady []*v1.Node
 | |
| 		var nodesRunning []*v1.Node
 | |
| 		for _, v := range pods {
 | |
| 			if v.Status.Phase == v1.PodRunning {
 | |
| 				node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
 | |
| 				if err != nil {
 | |
| 					log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				if _, ok := nodesMap[node]; !ok {
 | |
| 					nodesMap[node] = *new(struct{})
 | |
| 					nodesRunning = append(nodesRunning, node)
 | |
| 
 | |
| 					if isPodStatusReady(v.Status) {
 | |
| 						nodesReady = append(nodesReady, node)
 | |
| 						// Check pod not terminating
 | |
| 						if v.GetDeletionTimestamp() == nil {
 | |
| 							nodes = append(nodes, node)
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if len(nodes) > 0 {
 | |
| 			// Works same as service endpoints
 | |
| 		} else if len(nodesReady) > 0 {
 | |
| 			// 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
 | |
| 			// Publish all endpoints not always a bad thing
 | |
| 			log.Debugf("All pods in terminating state, use ready")
 | |
| 			nodes = nodesReady
 | |
| 		} else {
 | |
| 			log.Debugf("All pods not ready, use all running")
 | |
| 			nodes = nodesRunning
 | |
| 		}
 | |
| 	default:
 | |
| 		nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for _, node := range nodes {
 | |
| 		for _, address := range node.Status.Addresses {
 | |
| 			switch address.Type {
 | |
| 			case v1.NodeExternalIP:
 | |
| 				externalIPs = append(externalIPs, address.Address)
 | |
| 			case v1.NodeInternalIP:
 | |
| 				internalIPs = append(internalIPs, address.Address)
 | |
| 				if suitableType(address.Address) == endpoint.RecordTypeAAAA {
 | |
| 					ipv6IPs = append(ipv6IPs, address.Address)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	access := getAccessFromAnnotations(svc.Annotations)
 | |
| 	if access == "public" {
 | |
| 		return append(externalIPs, ipv6IPs...), nil
 | |
| 	}
 | |
| 	if access == "private" {
 | |
| 		return internalIPs, nil
 | |
| 	}
 | |
| 	if len(externalIPs) > 0 {
 | |
| 		return append(externalIPs, ipv6IPs...), nil
 | |
| 	}
 | |
| 	return internalIPs, nil
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
 | |
| 	var endpoints []*endpoint.Endpoint
 | |
| 
 | |
| 	for _, port := range svc.Spec.Ports {
 | |
| 		if port.NodePort > 0 {
 | |
| 			// following the RFC 2782, SRV record must have a following format
 | |
| 			// _service._proto.name. TTL class SRV priority weight port
 | |
| 			// see https://en.wikipedia.org/wiki/SRV_record
 | |
| 
 | |
| 			// build a target with a priority of 0, weight of 50, and pointing the given port on the given host
 | |
| 			target := fmt.Sprintf("0 50 %d %s", port.NodePort, hostname)
 | |
| 
 | |
| 			// take the service name from the K8s Service object
 | |
| 			// it is safe to use since it is DNS compatible
 | |
| 			// see https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
 | |
| 			serviceName := svc.ObjectMeta.Name
 | |
| 
 | |
| 			// figure out the protocol
 | |
| 			protocol := strings.ToLower(string(port.Protocol))
 | |
| 			if protocol == "" {
 | |
| 				protocol = "tcp"
 | |
| 			}
 | |
| 
 | |
| 			recordName := fmt.Sprintf("_%s._%s.%s", serviceName, protocol, hostname)
 | |
| 
 | |
| 			var ep *endpoint.Endpoint
 | |
| 			if ttl.IsConfigured() {
 | |
| 				ep = endpoint.NewEndpointWithTTL(recordName, endpoint.RecordTypeSRV, ttl, target)
 | |
| 			} else {
 | |
| 				ep = endpoint.NewEndpoint(recordName, endpoint.RecordTypeSRV, target)
 | |
| 			}
 | |
| 
 | |
| 			if ep != nil {
 | |
| 				endpoints = append(endpoints, ep)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return endpoints
 | |
| }
 | |
| 
 | |
| func (sc *serviceSource) AddEventHandler(ctx context.Context, handler func()) {
 | |
| 	log.Debug("Adding event handler for service")
 | |
| 
 | |
| 	// Right now there is no way to remove event handler from informer, see:
 | |
| 	// https://github.com/kubernetes/kubernetes/issues/79610
 | |
| 	sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
 | |
| }
 |