coredns/plugin/kubernetes/controller.go

893 lines
25 KiB
Go

package kubernetes
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/coredns/coredns/plugin/kubernetes/object"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
)
const (
podIPIndex = "PodIP"
svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
svcExtIPIndex = "ServiceExternalIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
svcImportNameNamespaceIndex = "ServiceImportNameNamespace"
mcEpNameNamespaceIndex = "MultiClusterEndpointsImportNameNamespace"
)
type ModifiedMode int
const (
ModifiedInternal ModifiedMode = iota
ModifiedExternal
ModifiedMultiCluster
)
type dnsController interface {
ServiceList() []*object.Service
EndpointsList() []*object.Endpoints
ServiceImportList() []*object.ServiceImport
SvcIndex(string) []*object.Service
SvcIndexReverse(string) []*object.Service
SvcExtIndexReverse(string) []*object.Service
SvcImportIndex(string) []*object.ServiceImport
PodIndex(string) []*object.Pod
EpIndex(string) []*object.Endpoints
EpIndexReverse(string) []*object.Endpoints
McEpIndex(string) []*object.MultiClusterEndpoints
GetNodeByName(context.Context, string) (*api.Node, error)
GetNamespaceByName(string) (*object.Namespace, error)
Run()
HasSynced() bool
Stop() error
// Modified returns the timestamp of the most recent changes to services.
Modified(ModifiedMode) int64
}
type dnsControl struct {
// modified tracks timestamp of the most recent changes
// It needs to be first because it is guaranteed to be 8-byte
// aligned ( we use sync.LoadAtomic with this )
modified int64
// multiClusterModified tracks timestamp of the most recent changes to
// multi cluster services
multiClusterModified int64
// extModified tracks timestamp of the most recent changes to
// services with external facing IP addresses
extModified int64
client kubernetes.Interface
mcsClient mcsClientset.MulticlusterV1alpha1Interface
selector labels.Selector
namespaceSelector labels.Selector
svcController cache.Controller
podController cache.Controller
epController cache.Controller
nsController cache.Controller
svcImportController cache.Controller
mcEpController cache.Controller
svcLister cache.Indexer
podLister cache.Indexer
epLister cache.Indexer
nsLister cache.Store
svcImportLister cache.Indexer
mcEpLister cache.Indexer
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
zones []string
endpointNameMode bool
multiclusterZones []string
}
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
ignoreEmptyService bool
// Label handling.
labelSelector *meta.LabelSelector
selector labels.Selector
namespaceLabelSelector *meta.LabelSelector
namespaceSelector labels.Selector
zones []string
endpointNameMode bool
multiclusterZones []string
}
// newdnsController creates a controller for CoreDNS.
func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts) *dnsControl {
dns := dnsControl{
client: kubeClient,
mcsClient: mcsClient,
selector: opts.selector,
namespaceSelector: opts.namespaceSelector,
stopCh: make(chan struct{}),
zones: opts.zones,
endpointNameMode: opts.endpointNameMode,
multiclusterZones: opts.multiclusterZones,
}
dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc, svcExtIPIndex: svcExtIPIndexFunc},
object.DefaultProcessor(object.ToService, nil),
)
podLister, podController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.DefaultProcessor(object.ToPod, nil),
)
dns.podLister = podLister
if opts.initPodCache {
dns.podController = podController
}
epLister, epController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLister = epLister
if opts.initEndpointsCache {
dns.epController = epController
}
dns.nsLister, dns.nsController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector),
WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),
},
&api.Namespace{},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{},
object.DefaultProcessor(object.ToNamespace, nil),
)
if len(opts.multiclusterZones) > 0 {
mcsEpReq, _ := labels.NewRequirement(mcs.LabelServiceName, selection.Exists, []string{})
mcsEpSelector := dns.selector
if mcsEpSelector == nil {
mcsEpSelector = labels.NewSelector()
}
mcsEpSelector = mcsEpSelector.Add(*mcsEpReq)
dns.mcEpLister, dns.mcEpController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{mcEpNameNamespaceIndex: mcEpNameNamespaceIndexFunc},
object.DefaultProcessor(object.EndpointSliceToMultiClusterEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.svcImportLister, dns.svcImportController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceImportListFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector),
WatchFunc: serviceImportWatchFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector),
},
&mcs.ServiceImport{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcImportNameNamespaceIndex: svcImportNameNamespaceIndexFunc},
object.DefaultProcessor(object.ToServiceImport, nil),
)
}
return &dns
}
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
return dns.SvcIndex(object.ServiceKey(o.GetName(), o.GetNamespace()))
},
}
}
func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
return dns.SvcIndex(object.ServiceKey(o.GetLabels()[discovery.LabelServiceName], o.GetNamespace()))
},
}
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
p, ok := obj.(*object.Pod)
if !ok {
return nil, errObj
}
return []string{p.PodIP}, nil
}
func svcIPIndexFunc(obj interface{}) ([]string, error) {
svc, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
idx := make([]string, len(svc.ClusterIPs))
copy(idx, svc.ClusterIPs)
return idx, nil
}
func svcExtIPIndexFunc(obj interface{}) ([]string, error) {
svc, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
idx := make([]string, len(svc.ExternalIPs))
copy(idx, svc.ExternalIPs)
return idx, nil
}
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
return []string{s.Index}, nil
}
func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
return []string{s.Index}, nil
}
func epIPIndexFunc(obj interface{}) ([]string, error) {
ep, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
return ep.IndexIP, nil
}
func svcImportNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.ServiceImport)
if !ok {
return nil, errObj
}
return []string{s.Index}, nil
}
func mcEpNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
mcEp, ok := obj.(*object.MultiClusterEndpoints)
if !ok {
return nil, errObj
}
return []string{mcEp.Index}, nil
}
func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.CoreV1().Services(ns).List(ctx, opts)
}
}
func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
if len(opts.FieldSelector) > 0 {
opts.FieldSelector = opts.FieldSelector + ","
}
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
return c.CoreV1().Pods(ns).List(ctx, opts)
}
}
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
}
}
func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.CoreV1().Namespaces().List(ctx, opts)
}
}
func serviceImportListFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.ServiceImports(ns).List(ctx, opts)
}
}
func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.CoreV1().Services(ns).Watch(ctx, options)
}
}
func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
if len(options.FieldSelector) > 0 {
options.FieldSelector = options.FieldSelector + ","
}
options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
return c.CoreV1().Pods(ns).Watch(ctx, options)
}
}
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
}
}
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.CoreV1().Namespaces().Watch(ctx, options)
}
}
func serviceImportWatchFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.ServiceImports(ns).Watch(ctx, options)
}
}
// Stop stops the controller.
func (dns *dnsControl) Stop() error {
dns.stopLock.Lock()
defer dns.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !dns.shutdown {
close(dns.stopCh)
dns.shutdown = true
return nil
}
return fmt.Errorf("shutdown already in progress")
}
// Run starts the controller.
func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
if dns.epController != nil {
go func() {
dns.epController.Run(dns.stopCh)
}()
}
if dns.podController != nil {
go dns.podController.Run(dns.stopCh)
}
go dns.nsController.Run(dns.stopCh)
if dns.svcImportController != nil {
go dns.svcImportController.Run(dns.stopCh)
}
if dns.mcEpController != nil {
go dns.mcEpController.Run(dns.stopCh)
}
<-dns.stopCh
}
// HasSynced calls on all controllers.
func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := true
if dns.epController != nil {
b = dns.epController.HasSynced()
}
c := true
if dns.podController != nil {
c = dns.podController.HasSynced()
}
d := dns.nsController.HasSynced()
e := true
if dns.svcImportController != nil {
e = dns.svcImportController.HasSynced()
}
f := true
if dns.mcEpController != nil {
f = dns.mcEpController.HasSynced()
}
return a && b && c && d && e && f
}
func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
os := dns.svcLister.List()
for _, o := range os {
s, ok := o.(*object.Service)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) ServiceImportList() (svcs []*object.ServiceImport) {
os := dns.svcImportLister.List()
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
if !ok {
continue
}
eps = append(eps, ep)
}
return eps
}
func (dns *dnsControl) PodIndex(ip string) (pods []*object.Pod) {
os, err := dns.podLister.ByIndex(podIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
p, ok := o.(*object.Pod)
if !ok {
continue
}
pods = append(pods, p)
}
return pods
}
func (dns *dnsControl) SvcIndex(idx string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.Service)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.Service)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcExtIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.Service)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) SvcImportIndex(idx string) (svcs []*object.ServiceImport) {
os, err := dns.svcImportLister.ByIndex(svcImportNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
e, ok := o.(*object.Endpoints)
if !ok {
continue
}
ep = append(ep, e)
}
return ep
}
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
e, ok := o.(*object.Endpoints)
if !ok {
continue
}
ep = append(ep, e)
}
return ep
}
func (dns *dnsControl) McEpIndex(idx string) (ep []*object.MultiClusterEndpoints) {
os, err := dns.mcEpLister.ByIndex(mcEpNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
e, ok := o.(*object.MultiClusterEndpoints)
if !ok {
continue
}
ep = append(ep, e)
}
return ep
}
// GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a round trip to the k8s API server, so use
// sparingly. Currently, this is only used for Federation.
func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{})
return v1node, err
}
// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
func (dns *dnsControl) GetNamespaceByName(name string) (*object.Namespace, error) {
o, exists, err := dns.nsLister.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("namespace not found")
}
ns, ok := o.(*object.Namespace)
if !ok {
return nil, fmt.Errorf("found key but not namespace")
}
return ns, nil
}
func (dns *dnsControl) Add(obj interface{}) { dns.updateModified() }
func (dns *dnsControl) Delete(obj interface{}) { dns.updateModified() }
func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.detectChanges(oldObj, newObj) }
// detectChanges detects changes in objects, and updates the modified timestamp
func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
// If both objects have the same resource version, they are identical.
if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
return
}
obj := newObj
if obj == nil {
obj = oldObj
}
switch ob := obj.(type) {
case *object.Service:
imod, emod := serviceModified(oldObj, newObj)
if imod {
dns.updateModified()
}
if emod {
dns.updateExtModified()
}
case *object.ServiceImport:
if !serviceImportEquivalent(oldObj, newObj) {
dns.updateMultiClusterModified()
}
case *object.Pod:
dns.updateModified()
case *object.Endpoints:
if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) {
dns.updateModified()
}
case *object.MultiClusterEndpoints:
if !multiclusterEndpointsEquivalent(oldObj.(*object.MultiClusterEndpoints), newObj.(*object.MultiClusterEndpoints)) {
dns.updateMultiClusterModified()
}
default:
log.Warningf("Updates for %T not supported.", ob)
}
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
if len(sa.Addresses) != len(sb.Addresses) {
return false
}
if len(sa.Ports) != len(sb.Ports) {
return false
}
// in Addresses and Ports, we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for addr, aaddr := range sa.Addresses {
baddr := sb.Addresses[addr]
if aaddr.IP != baddr.IP {
return false
}
if aaddr.Hostname != baddr.Hostname {
return false
}
}
for port, aport := range sa.Ports {
bport := sb.Ports[port]
if aport.Name != bport.Name {
return false
}
if aport.Port != bport.Port {
return false
}
if aport.Protocol != bport.Protocol {
return false
}
}
return true
}
// endpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func endpointsEquivalent(a, b *object.Endpoints) bool {
if a == nil || b == nil {
return false
}
if len(a.Subsets) != len(b.Subsets) {
return false
}
// we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for i, sa := range a.Subsets {
sb := b.Subsets[i]
if !subsetsEquivalent(sa, sb) {
return false
}
}
return true
}
// multiclusterEndpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func multiclusterEndpointsEquivalent(a, b *object.MultiClusterEndpoints) bool {
if a == nil || b == nil {
return false
}
if !endpointsEquivalent(&a.Endpoints, &b.Endpoints) {
return false
}
if a.ClusterId != b.ClusterId {
return false
}
return true
}
// serviceModified checks the services passed for changes that result in changes
// to internal and or external records. It returns two booleans, one for internal
// record changes, and a second for external record changes
func serviceModified(oldObj, newObj interface{}) (intSvc, extSvc bool) {
if oldObj != nil && newObj == nil {
// deleted service only modifies external zone records if it had external ips
return true, len(oldObj.(*object.Service).ExternalIPs) > 0
}
if oldObj == nil && newObj != nil {
// added service only modifies external zone records if it has external ips
return true, len(newObj.(*object.Service).ExternalIPs) > 0
}
newSvc := newObj.(*object.Service)
oldSvc := oldObj.(*object.Service)
// External IPs are mutable, affecting external zone records
if len(oldSvc.ExternalIPs) != len(newSvc.ExternalIPs) {
extSvc = true
} else {
for i := range oldSvc.ExternalIPs {
if oldSvc.ExternalIPs[i] != newSvc.ExternalIPs[i] {
extSvc = true
break
}
}
}
// ExternalName is mutable, affecting internal zone records
intSvc = oldSvc.ExternalName != newSvc.ExternalName
if intSvc && extSvc {
return intSvc, extSvc
}
// All Port fields are mutable, affecting both internal/external zone records
if len(oldSvc.Ports) != len(newSvc.Ports) {
return true, true
}
for i := range oldSvc.Ports {
if oldSvc.Ports[i].Name != newSvc.Ports[i].Name {
return true, true
}
if oldSvc.Ports[i].Port != newSvc.Ports[i].Port {
return true, true
}
if oldSvc.Ports[i].Protocol != newSvc.Ports[i].Protocol {
return true, true
}
}
return intSvc, extSvc
}
// serviceImportEquivalent checks if the update to a ServiceImport is something
// that matters to us or if they are effectively equivalent.
func serviceImportEquivalent(oldObj, newObj interface{}) bool {
if oldObj != nil && newObj == nil {
return false
}
if oldObj == nil && newObj != nil {
return false
}
newSvc := newObj.(*object.ServiceImport)
oldSvc := oldObj.(*object.ServiceImport)
if oldSvc.Type != newSvc.Type {
return false
}
// All Port fields are mutable, affecting both internal/external zone records
if len(oldSvc.Ports) != len(newSvc.Ports) {
return false
}
for i := range oldSvc.Ports {
if oldSvc.Ports[i].Name != newSvc.Ports[i].Name {
return false
}
if oldSvc.Ports[i].Port != newSvc.Ports[i].Port {
return false
}
if oldSvc.Ports[i].Protocol != newSvc.Ports[i].Protocol {
return false
}
}
return true
}
func (dns *dnsControl) Modified(mode ModifiedMode) int64 {
switch mode {
case ModifiedInternal:
return atomic.LoadInt64(&dns.modified)
case ModifiedExternal:
return atomic.LoadInt64(&dns.extModified)
case ModifiedMultiCluster:
return atomic.LoadInt64(&dns.multiClusterModified)
}
return -1
}
// updateModified set dns.modified to the current time.
func (dns *dnsControl) updateModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&dns.modified, unix)
}
// updateMultiClusterModified set dns.modified to the current time.
func (dns *dnsControl) updateMultiClusterModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&dns.multiClusterModified, unix)
}
// updateExtModified set dns.extModified to the current time.
func (dns *dnsControl) updateExtModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&dns.extModified, unix)
}
var errObj = errors.New("obj was not of the correct type")