Merge pull request #687 from jlamillan/jlamillan/add_watch_flag

Add --watchers flag to allow controller to respond automatically to Ingress or Service updates
This commit is contained in:
Kubernetes Prow Robot 2020-02-04 02:23:25 -08:00 committed by GitHub
commit d49d901d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 205 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"reflect"
"testing"
"time"
"sigs.k8s.io/external-dns/endpoint"
"sigs.k8s.io/external-dns/internal/testutils"
@ -151,3 +152,44 @@ func TestRunOnce(t *testing.T) {
// Validate that the mock source was called.
source.AssertExpectations(t)
}
// TestSourceEventHandler tests that the Controller can use a Source's registered handler as a callback.
func TestSourceEventHandler(t *testing.T) {
source := new(testutils.MockSource)
handlerCh := make(chan bool)
timeoutCh := make(chan bool, 1)
stopChan := make(chan struct{}, 1)
ctrl := &Controller{
Source: source,
Registry: nil,
Policy: &plan.SyncPolicy{},
}
// Define and register a simple handler that sends a message to a channel to show it was called.
handler := func() error {
handlerCh <- true
return nil
}
// Example of preventing handler from being called more than once every 5 seconds.
ctrl.Source.AddEventHandler(handler, stopChan, 5*time.Second)
// Send timeout message after 10 seconds to fail test if handler is not called.
go func() {
time.Sleep(10 * time.Second)
timeoutCh <- true
}()
// Wait until we either receive a message from handlerCh or timeoutCh channel after 10 seconds.
select {
case msg := <-handlerCh:
assert.True(t, msg)
case <-timeoutCh:
assert.Fail(t, "timed out waiting for event handler to be called")
}
close(stopChan)
close(handlerCh)
close(timeoutCh)
}

1
go.mod
View File

@ -57,6 +57,7 @@ require (
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v10.0.0+incompatible
k8s.io/kubernetes v1.14.1
)
replace (

2
go.sum
View File

@ -795,6 +795,8 @@ k8s.io/helm v2.13.1+incompatible/go.mod h1:LZzlS4LQBHfciFOurYBFkCMTaZ0D1l+p0teMg
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.13.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/kubernetes v1.14.1 h1:I9F52h5sqVxBmoSsBlNQ0YygNcukDilkpGxUbJRoBoY=
k8s.io/kubernetes v1.14.1/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
mvdan.cc/unparam v0.0.0-20190720180237-d51796306d8f/go.mod h1:4G1h5nDURzA3bwVMZIVpwbkw+04kSxk3rAtzlimaUJw=

View File

@ -17,8 +17,9 @@ limitations under the License.
package testutils
import (
"github.com/stretchr/testify/mock"
"time"
"github.com/stretchr/testify/mock"
"sigs.k8s.io/external-dns/endpoint"
)
@ -38,3 +39,23 @@ func (m *MockSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints.([]*endpoint.Endpoint), args.Error(1)
}
// AddEventHandler adds an event handler function that's called when sources that support such a thing have changed.
func (m *MockSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Execute callback handler no more than once per minInterval, until a message on stopChan is received.
go func() {
var lastCallbackTime time.Time
for {
select {
case <-stopChan:
return
default:
now := time.Now()
if now.After(lastCallbackTime.Add(minInterval)) {
handler()
lastCallbackTime = time.Now()
}
}
}
}()
}

20
main.go
View File

@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
@ -93,9 +94,15 @@ func main() {
// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
RequestTimeout: cfg.RequestTimeout,
KubeConfig: cfg.KubeConfig,
KubeMaster: cfg.Master,
// If update events are enabled, disable timeout.
RequestTimeout: func() time.Duration {
if cfg.UpdateEvents {
return 0
}
return cfg.RequestTimeout
}(),
}, cfg.Sources, sourceCfg)
if err != nil {
log.Fatal(err)
@ -281,6 +288,13 @@ func main() {
Interval: cfg.Interval,
}
if cfg.UpdateEvents {
// Add RunOnce as the handler function that will be called when ingress/service sources have changed.
// Note that k8s Informers will perform an initial list operation, which results in the handler
// function initially being called for every Service/Ingress that exists limted by minInterval.
ctrl.Source.AddEventHandler(func() error { return ctrl.RunOnce(ctx) }, stopChan, 1*time.Minute)
}
if cfg.Once {
err := ctrl.RunOnce(ctx)
if err != nil {

View File

@ -108,6 +108,7 @@ type Config struct {
Interval time.Duration
Once bool
DryRun bool
UpdateEvents bool
LogFormat string
MetricsAddress string
LogLevel string
@ -201,6 +202,7 @@ var defaultConfig = &Config{
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
@ -382,6 +384,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
app.Flag("events", "When enabled, in addition to running every interval, the reconciliation loop will get triggered when supported sources change (default: disabled)").BoolVar(&cfg.UpdateEvents)
// Miscellaneous flags
app.Flag("log-format", "The format in which log messages are printed (default: text, options: text, json)").Default(defaultConfig.LogFormat).EnumVar(&cfg.LogFormat, "text", "json")

View File

@ -83,6 +83,7 @@ var (
Interval: time.Minute,
Once: false,
DryRun: false,
UpdateEvents: false,
LogFormat: "text",
MetricsAddress: ":7979",
LogLevel: logrus.InfoLevel.String(),
@ -157,6 +158,7 @@ var (
Interval: 10 * time.Minute,
Once: true,
DryRun: true,
UpdateEvents: true,
LogFormat: "json",
MetricsAddress: "127.0.0.1:9099",
LogLevel: logrus.DebugLevel.String(),
@ -257,6 +259,7 @@ func TestParseFlags(t *testing.T) {
"--interval=10m",
"--once",
"--dry-run",
"--events",
"--log-format=json",
"--metrics-address=127.0.0.1:9099",
"--log-level=debug",
@ -338,6 +341,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_INTERVAL": "10m",
"EXTERNAL_DNS_ONCE": "1",
"EXTERNAL_DNS_DRY_RUN": "1",
"EXTERNAL_DNS_EVENTS": "1",
"EXTERNAL_DNS_LOG_FORMAT": "json",
"EXTERNAL_DNS_METRICS_ADDRESS": "127.0.0.1:9099",
"EXTERNAL_DNS_LOG_LEVEL": "debug",

View File

@ -18,6 +18,7 @@ package source
import (
"net/url"
"time"
cfclient "github.com/cloudfoundry-community/go-cfclient"
@ -35,6 +36,9 @@ func NewCloudFoundrySource(cfClient *cfclient.Client) (Source, error) {
}, nil
}
func (rs *cloudfoundrySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
// Endpoints returns endpoint objects
func (rs *cloudfoundrySource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}

View File

@ -64,3 +64,6 @@ func (cs *connectorSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}
func (cs *connectorSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strings"
"time"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -108,6 +109,9 @@ func NewCRDSource(crdClient rest.Interface, namespace, kind string, scheme *runt
}, nil
}
func (cs *crdSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
// Endpoints returns endpoint objects.
func (cs *crdSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}

View File

@ -17,6 +17,8 @@ limitations under the License.
package source
import (
"time"
log "github.com/sirupsen/logrus"
"sigs.k8s.io/external-dns/endpoint"
@ -56,3 +58,7 @@ func (ms *dedupSource) Endpoints() ([]*endpoint.Endpoint, error) {
return result, nil
}
func (ms *dedupSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
ms.source.AddEventHandler(handler, stopChan, minInterval)
}

View File

@ -16,11 +16,18 @@ limitations under the License.
package source
import "sigs.k8s.io/external-dns/endpoint"
import (
"time"
"sigs.k8s.io/external-dns/endpoint"
)
// emptySource is a Source that returns no endpoints.
type emptySource struct{}
func (e *emptySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
// Endpoints collects endpoints of all nested Sources and returns them in a single slice.
func (e *emptySource) Endpoints() ([]*endpoint.Endpoint, error) {
return []*endpoint.Endpoint{}, nil

View File

@ -54,6 +54,9 @@ func NewFakeSource(fqdnTemplate string) (Source, error) {
}, nil
}
func (sc *fakeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
// Endpoints returns endpoint objects.
func (sc *fakeSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := make([]*endpoint.Endpoint, 10)

View File

@ -173,6 +173,9 @@ func (sc *gatewaySource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}
func (sc *gatewaySource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
func (sc *gatewaySource) endpointsFromTemplate(config *istiomodel.Config) ([]*endpoint.Endpoint, error) {
// Process the whole template string
var buf bytes.Buffer

View File

@ -33,6 +33,7 @@ import (
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/util/async"
"sigs.k8s.io/external-dns/endpoint"
)
@ -56,6 +57,7 @@ type ingressSource struct {
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
ingressInformer extinformers.IngressInformer
runner *async.BoundedFrequencyRunner
}
// NewIngressSource creates a new ingressSource with the given config.
@ -303,3 +305,32 @@ func targetsFromIngressStatus(status v1beta1.IngressStatus) endpoint.Targets {
return targets
}
func (sc *ingressSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Add custom resource event handler
log.Debug("Adding (bounded) event handler for ingress")
maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours
burst := 2 // allow up to two handler burst calls
log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d",
minInterval, maxInterval, burst)
sc.runner = async.NewBoundedFrequencyRunner("ingress-handler", func() {
_ = handler()
}, minInterval, maxInterval, burst)
go sc.runner.Loop(stopChan)
// run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs
sc.ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sc.runner.Run()
},
UpdateFunc: func(old interface{}, new interface{}) {
sc.runner.Run()
},
DeleteFunc: func(obj interface{}) {
sc.runner.Run()
},
},
)
}

View File

@ -332,3 +332,6 @@ func parseContourLoadBalancerService(service string) (namespace, name string, er
return
}
func (sc *ingressRouteSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}

View File

@ -16,7 +16,11 @@ limitations under the License.
package source
import "sigs.k8s.io/external-dns/endpoint"
import (
"time"
"sigs.k8s.io/external-dns/endpoint"
)
// multiSource is a Source that merges the endpoints of its nested Sources.
type multiSource struct {
@ -39,6 +43,12 @@ func (ms *multiSource) Endpoints() ([]*endpoint.Endpoint, error) {
return result, nil
}
func (ms *multiSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
for _, s := range ms.children {
s.AddEventHandler(handler, stopChan, minInterval)
}
}
// NewMultiSource creates a new multiSource.
func NewMultiSource(children []Source) Source {
return &multiSource{children: children}

View File

@ -167,6 +167,9 @@ func (ns *nodeSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpointsSlice, nil
}
func (ns *nodeSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
}
// nodeAddress returns node's externalIP and if that's not found, node's internalIP
// basically what k8s.io/kubernetes/pkg/util/node.GetPreferredNodeAddress does
func (ns *nodeSource) nodeAddresses(node *v1.Node) ([]string, error) {

View File

@ -33,6 +33,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/util/async"
"sigs.k8s.io/external-dns/endpoint"
)
@ -61,6 +62,7 @@ type serviceSource struct {
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
runner *async.BoundedFrequencyRunner
}
// NewServiceSource creates a new serviceSource with the given config.
@ -89,21 +91,18 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("pod added")
},
},
)
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("node added")
},
},
)
@ -546,3 +545,32 @@ func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets e
return endpoints
}
func (sc *serviceSource) AddEventHandler(handler func() error, stopChan <-chan struct{}, minInterval time.Duration) {
// Add custom resource event handler
log.Debug("Adding (bounded) event handler for service")
maxInterval := 24 * time.Hour // handler will be called if it has not run in 24 hours
burst := 2 // allow up to two handler burst calls
log.Debugf("Adding handler to BoundedFrequencyRunner with minInterval: %v, syncPeriod: %v, bursts: %d",
minInterval, maxInterval, burst)
sc.runner = async.NewBoundedFrequencyRunner("service-handler", func() {
_ = handler()
}, minInterval, maxInterval, burst)
go sc.runner.Loop(stopChan)
// run the handler function as soon as the BoundedFrequencyRunner will allow when an update occurs
sc.serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sc.runner.Run()
},
UpdateFunc: func(old interface{}, new interface{}) {
sc.runner.Run()
},
DeleteFunc: func(obj interface{}) {
sc.runner.Run()
},
},
)
}

View File

@ -22,6 +22,7 @@ import (
"net"
"strconv"
"strings"
"time"
"sigs.k8s.io/external-dns/endpoint"
)
@ -57,6 +58,9 @@ const (
// Source defines the interface Endpoint sources should implement.
type Source interface {
Endpoints() ([]*endpoint.Endpoint, error)
// AddEventHandler adds an event handler function that's called when (supported) sources have changed.
// The handler should not be called more than than once per time.Duration and not again after stop channel is closed.
AddEventHandler(func() error, <-chan struct{}, time.Duration)
}
func getTTLFromAnnotations(annotations map[string]string) (endpoint.TTL, error) {