Pass stop channel to informer factory instances

This commit is contained in:
Andrey Lebedev 2022-01-22 21:11:06 +01:00
parent 98393df742
commit 4f41229820
22 changed files with 79 additions and 61 deletions

View File

@ -136,7 +136,7 @@ func main() {
} }
// Lookup all the selected sources by names and pass them the desired configuration. // Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{ sources, err := source.ByNames(ctx, &source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig, KubeConfig: cfg.KubeConfig,
APIServerURL: cfg.APIServerURL, APIServerURL: cfg.APIServerURL,
// If update events are enabled, disable timeout. // If update events are enabled, disable timeout.

View File

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -64,6 +63,7 @@ type ambassadorHostSource struct {
// NewAmbassadorHostSource creates a new ambassadorHostSource with the given config. // NewAmbassadorHostSource creates a new ambassadorHostSource with the given config.
func NewAmbassadorHostSource( func NewAmbassadorHostSource(
ctx context.Context,
dynamicKubeClient dynamic.Interface, dynamicKubeClient dynamic.Interface,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
namespace string) (Source, error) { namespace string) (Source, error) {
@ -82,8 +82,7 @@ func NewAmbassadorHostSource(
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err return nil, err

View File

@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -53,6 +52,7 @@ type httpProxySource struct {
// NewContourHTTPProxySource creates a new contourHTTPProxySource with the given config. // NewContourHTTPProxySource creates a new contourHTTPProxySource with the given config.
func NewContourHTTPProxySource( func NewContourHTTPProxySource(
ctx context.Context,
dynamicKubeClient dynamic.Interface, dynamicKubeClient dynamic.Interface,
namespace string, namespace string,
annotationFilter string, annotationFilter string,
@ -78,8 +78,7 @@ func NewContourHTTPProxySource(
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -88,6 +88,7 @@ func (suite *HTTPProxySuite) SetupTest() {
var err error var err error
suite.source, err = NewContourHTTPProxySource( suite.source, err = NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient, fakeDynamicClient,
"default", "default",
"", "",
@ -184,6 +185,7 @@ func TestNewContourHTTPProxySource(t *testing.T) {
fakeDynamicClient, _ := newDynamicKubernetesClient() fakeDynamicClient, _ := newDynamicKubernetesClient()
_, err := NewContourHTTPProxySource( _, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient, fakeDynamicClient,
"", "",
ti.annotationFilter, ti.annotationFilter,
@ -1033,6 +1035,7 @@ func testHTTPProxyEndpoints(t *testing.T) {
} }
httpProxySource, err := NewContourHTTPProxySource( httpProxySource, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient, fakeDynamicClient,
ti.targetNamespace, ti.targetNamespace,
ti.annotationFilter, ti.annotationFilter,
@ -1059,6 +1062,7 @@ func newTestHTTPProxySource() (*httpProxySource, error) {
fakeDynamicClient, _ := newDynamicKubernetesClient() fakeDynamicClient, _ := newDynamicKubernetesClient()
src, err := NewContourHTTPProxySource( src, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient, fakeDynamicClient,
"default", "default",
"", "",

View File

@ -26,7 +26,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
networkv1 "k8s.io/api/networking/v1" networkv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
netinformers "k8s.io/client-go/informers/networking/v1" netinformers "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -64,7 +63,7 @@ type ingressSource struct {
} }
// NewIngressSource creates a new ingressSource with the given config. // NewIngressSource creates a new ingressSource with the given config.
func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) { func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) {
tmpl, err := parseTemplate(fqdnTemplate) tmpl, err := parseTemplate(fqdnTemplate)
if err != nil { if err != nil {
return nil, err return nil, err
@ -83,8 +82,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -56,6 +56,7 @@ func (suite *IngressSuite) SetupTest() {
suite.NoError(err, "should succeed") suite.NoError(err, "should succeed")
suite.sc, err = NewIngressSource( suite.sc, err = NewIngressSource(
context.TODO(),
fakeClient, fakeClient,
"", "",
"", "",
@ -138,6 +139,7 @@ func TestNewIngressSource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewIngressSource( _, err := NewIngressSource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
"", "",
ti.annotationFilter, ti.annotationFilter,
@ -1225,6 +1227,7 @@ func testIngressEndpoints(t *testing.T) {
} }
source, _ := NewIngressSource( source, _ := NewIngressSource(
context.TODO(),
fakeClient, fakeClient,
ti.targetNamespace, ti.targetNamespace,
ti.annotationFilter, ti.annotationFilter,

View File

@ -30,7 +30,6 @@ import (
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3" networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -56,6 +55,7 @@ type gatewaySource struct {
// NewIstioGatewaySource creates a new gatewaySource with the given config. // NewIstioGatewaySource creates a new gatewaySource with the given config.
func NewIstioGatewaySource( func NewIstioGatewaySource(
ctx context.Context,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
istioClient istioclient.Interface, istioClient istioclient.Interface,
namespace string, namespace string,
@ -93,9 +93,8 @@ func NewIstioGatewaySource(
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop) istioInformerFactory.Start(ctx.Done())
istioInformerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -69,6 +69,7 @@ func (suite *GatewaySuite) SetupTest() {
} }
suite.source, err = NewIstioGatewaySource( suite.source, err = NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
"", "",
@ -142,6 +143,7 @@ func TestNewIstioGatewaySource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewIstioGatewaySource( _, err := NewIstioGatewaySource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
istiofake.NewSimpleClientset(), istiofake.NewSimpleClientset(),
"", "",
@ -1165,6 +1167,7 @@ func testGatewayEndpoints(t *testing.T) {
} }
gatewaySource, err := NewIstioGatewaySource( gatewaySource, err := NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
ti.targetNamespace, ti.targetNamespace,
@ -1201,6 +1204,7 @@ func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService) (*gatewa
} }
src, err := NewIstioGatewaySource( src, err := NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
"", "",

View File

@ -31,7 +31,6 @@ import (
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3" networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -60,6 +59,7 @@ type virtualServiceSource struct {
// NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config. // NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config.
func NewIstioVirtualServiceSource( func NewIstioVirtualServiceSource(
ctx context.Context,
kubeClient kubernetes.Interface, kubeClient kubernetes.Interface,
istioClient istioclient.Interface, istioClient istioclient.Interface,
namespace string, namespace string,
@ -97,9 +97,8 @@ func NewIstioVirtualServiceSource(
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop) istioInformerFactory.Start(ctx.Done())
istioInformerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -89,6 +89,7 @@ func (suite *VirtualServiceSuite) SetupTest() {
suite.NoError(err, "should succeed") suite.NoError(err, "should succeed")
suite.source, err = NewIstioVirtualServiceSource( suite.source, err = NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
"", "",
@ -165,6 +166,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewIstioVirtualServiceSource( _, err := NewIstioVirtualServiceSource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
istiofake.NewSimpleClientset(), istiofake.NewSimpleClientset(),
"", "",
@ -1482,6 +1484,7 @@ func testVirtualServiceEndpoints(t *testing.T) {
} }
virtualServiceSource, err := NewIstioVirtualServiceSource( virtualServiceSource, err := NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
ti.targetNamespace, ti.targetNamespace,
@ -1557,6 +1560,7 @@ func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, g
} }
src, err := NewIstioVirtualServiceSource( src, err := NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient, fakeKubernetesClient,
fakeIstioClient, fakeIstioClient,
"", "",

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -57,7 +56,7 @@ type kongTCPIngressSource struct {
} }
// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config. // NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) { func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
var err error var err error
// Use shared informer to listen for add/update/delete of Host in the specified namespace. // Use shared informer to listen for add/update/delete of Host in the specified namespace.
@ -73,8 +72,7 @@ func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kub
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil { if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -241,7 +241,7 @@ func TestKongTCPIngressEndpoints(t *testing.T) {
_, err = fakeDynamicClient.Resource(kongGroupdVersionResource).Namespace(defaultKongNamespace).Create(context.Background(), &tcpi, metav1.CreateOptions{}) _, err = fakeDynamicClient.Resource(kongGroupdVersionResource).Namespace(defaultKongNamespace).Create(context.Background(), &tcpi, metav1.CreateOptions{})
assert.NoError(t, err) assert.NoError(t, err)
source, err := NewKongTCPIngressSource(fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong") source, err := NewKongTCPIngressSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong")
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, source) assert.NotNil(t, source)

View File

@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -42,7 +41,7 @@ type nodeSource struct {
} }
// NewNodeSource creates a new nodeSource with the given config. // NewNodeSource creates a new nodeSource with the given config.
func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) { func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) {
tmpl, err := parseTemplate(fqdnTemplate) tmpl, err := parseTemplate(fqdnTemplate)
if err != nil { if err != nil {
return nil, err return nil, err
@ -62,8 +61,7 @@ func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTempla
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -71,6 +71,7 @@ func testNodeSourceNewNodeSource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewNodeSource( _, err := NewNodeSource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
ti.annotationFilter, ti.annotationFilter,
ti.fqdnTemplate, ti.fqdnTemplate,
@ -353,6 +354,7 @@ func testNodeSourceEndpoints(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, err := NewNodeSource( client, err := NewNodeSource(
context.TODO(),
kubernetes, kubernetes,
tc.annotationFilter, tc.annotationFilter,
tc.fqdnTemplate, tc.fqdnTemplate,

View File

@ -29,7 +29,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"sigs.k8s.io/external-dns/endpoint" "sigs.k8s.io/external-dns/endpoint"
@ -54,6 +53,7 @@ type ocpRouteSource struct {
// NewOcpRouteSource creates a new ocpRouteSource with the given config. // NewOcpRouteSource creates a new ocpRouteSource with the given config.
func NewOcpRouteSource( func NewOcpRouteSource(
ctx context.Context,
ocpClient versioned.Interface, ocpClient versioned.Interface,
namespace string, namespace string,
annotationFilter string, annotationFilter string,
@ -81,8 +81,7 @@ func NewOcpRouteSource(
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -43,6 +43,7 @@ func (suite *OCPRouteSuite) SetupTest() {
var err error var err error
suite.sc, err = NewOcpRouteSource( suite.sc, err = NewOcpRouteSource(
context.TODO(),
fakeClient, fakeClient,
"", "",
"", "",
@ -141,6 +142,7 @@ func testOcpRouteSourceNewOcpRouteSource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewOcpRouteSource( _, err := NewOcpRouteSource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
"", "",
ti.annotationFilter, ti.annotationFilter,
@ -439,6 +441,7 @@ func testOcpRouteSourceEndpoints(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
source, err := NewOcpRouteSource( source, err := NewOcpRouteSource(
context.TODO(),
fakeClient, fakeClient,
"", "",
"", "",

View File

@ -24,7 +24,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -40,7 +39,7 @@ type podSource struct {
} }
// NewPodSource creates a new podSource with the given config. // NewPodSource creates a new podSource with the given config.
func NewPodSource(kubeClient kubernetes.Interface, namespace string, compatibility string) (Source, error) { func NewPodSource(ctx context.Context, kubeClient kubernetes.Interface, namespace string, compatibility string) (Source, error) {
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace)) informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes() nodeInformer := informerFactory.Core().V1().Nodes()
@ -58,7 +57,7 @@ func NewPodSource(kubeClient kubernetes.Interface, namespace string, compatibili
}, },
) )
informerFactory.Start(wait.NeverStop) informerFactory.Start(ctx.Done())
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -412,7 +412,7 @@ func TestPodSource(t *testing.T) {
} }
} }
client, err := NewPodSource(kubernetes, tc.targetNamespace, tc.compatibility) client, err := NewPodSource(context.TODO(), kubernetes, tc.targetNamespace, tc.compatibility)
require.NoError(t, err) require.NoError(t, err)
endpoints, err := client.Endpoints(ctx) endpoints, err := client.Endpoints(ctx)

View File

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@ -67,7 +66,7 @@ type serviceSource struct {
} }
// NewServiceSource creates a new serviceSource with the given config. // NewServiceSource creates a new serviceSource with the given config.
func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector) (Source, error) { func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal bool, publishHostIP bool, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector) (Source, error) {
tmpl, err := parseTemplate(fqdnTemplate) tmpl, err := parseTemplate(fqdnTemplate)
if err != nil { if err != nil {
return nil, err return nil, err
@ -107,8 +106,7 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}, },
) )
// TODO informer is not explicitly stopped since controller is not passing in its channel. informerFactory.Start(ctx.Done())
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated. // wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil { if err := waitForCacheSync(context.Background(), informerFactory); err != nil {

View File

@ -65,6 +65,7 @@ func (suite *ServiceSuite) SetupTest() {
suite.NoError(err, "should successfully create service") suite.NoError(err, "should successfully create service")
suite.sc, err = NewServiceSource( suite.sc, err = NewServiceSource(
context.TODO(),
fakeClient, fakeClient,
"", "",
"", "",
@ -144,6 +145,7 @@ func testServiceSourceNewServiceSource(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewServiceSource( _, err := NewServiceSource(
context.TODO(),
fake.NewSimpleClientset(), fake.NewSimpleClientset(),
"", "",
ti.annotationFilter, ti.annotationFilter,
@ -1039,6 +1041,7 @@ func testServiceSourceEndpoints(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, err := NewServiceSource( client, err := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
tc.annotationFilter, tc.annotationFilter,
@ -1227,6 +1230,7 @@ func testMultipleServicesEndpoints(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, err := NewServiceSource( client, err := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
tc.annotationFilter, tc.annotationFilter,
@ -1391,6 +1395,7 @@ func TestClusterIpServices(t *testing.T) {
} }
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, _ := NewServiceSource( client, _ := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
tc.annotationFilter, tc.annotationFilter,
@ -1960,6 +1965,7 @@ func TestServiceSourceNodePortServices(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, _ := NewServiceSource( client, _ := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
tc.annotationFilter, tc.annotationFilter,
@ -2295,6 +2301,7 @@ func TestHeadlessServices(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, _ := NewServiceSource( client, _ := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
"", "",
@ -2651,6 +2658,7 @@ func TestHeadlessServicesHostIP(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, _ := NewServiceSource( client, _ := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
"", "",
@ -2762,6 +2770,7 @@ func TestExternalServices(t *testing.T) {
// Create our object under test and get the endpoints. // Create our object under test and get the endpoints.
client, _ := NewServiceSource( client, _ := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
tc.targetNamespace, tc.targetNamespace,
"", "",
@ -2815,6 +2824,7 @@ func BenchmarkServiceEndpoints(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
client, err := NewServiceSource( client, err := NewServiceSource(
context.TODO(),
kubernetes, kubernetes,
v1.NamespaceAll, v1.NamespaceAll,
"", "",

View File

@ -17,6 +17,7 @@ limitations under the License.
package source package source
import ( import (
"context"
"net/http" "net/http"
"os" "os"
"strings" "strings"
@ -158,10 +159,10 @@ func (p *SingletonClientGenerator) OpenShiftClient() (openshift.Interface, error
} }
// ByNames returns multiple Sources given multiple names. // ByNames returns multiple Sources given multiple names.
func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) { func ByNames(ctx context.Context, p ClientGenerator, names []string, cfg *Config) ([]Source, error) {
sources := []Source{} sources := []Source{}
for _, name := range names { for _, name := range names {
source, err := BuildWithConfig(name, p, cfg) source, err := BuildWithConfig(ctx, name, p, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -172,32 +173,32 @@ func ByNames(p ClientGenerator, names []string, cfg *Config) ([]Source, error) {
} }
// BuildWithConfig allows to generate a Source implementation from the shared config // BuildWithConfig allows to generate a Source implementation from the shared config
func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, error) { func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg *Config) (Source, error) {
switch source { switch source {
case "node": case "node":
client, err := p.KubeClient() client, err := p.KubeClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewNodeSource(client, cfg.AnnotationFilter, cfg.FQDNTemplate) return NewNodeSource(ctx, client, cfg.AnnotationFilter, cfg.FQDNTemplate)
case "service": case "service":
client, err := p.KubeClient() client, err := p.KubeClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewServiceSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter) return NewServiceSource(ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter)
case "ingress": case "ingress":
client, err := p.KubeClient() client, err := p.KubeClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewIngressSource(client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.IgnoreIngressTLSSpec, cfg.IgnoreIngressRulesSpec, cfg.LabelFilter) return NewIngressSource(ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.IgnoreIngressTLSSpec, cfg.IgnoreIngressRulesSpec, cfg.LabelFilter)
case "pod": case "pod":
client, err := p.KubeClient() client, err := p.KubeClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewPodSource(client, cfg.Namespace, cfg.Compatibility) return NewPodSource(ctx, client, cfg.Namespace, cfg.Compatibility)
case "istio-gateway": case "istio-gateway":
kubernetesClient, err := p.KubeClient() kubernetesClient, err := p.KubeClient()
if err != nil { if err != nil {
@ -207,7 +208,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewIstioGatewaySource(kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) return NewIstioGatewaySource(ctx, kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation)
case "istio-virtualservice": case "istio-virtualservice":
kubernetesClient, err := p.KubeClient() kubernetesClient, err := p.KubeClient()
if err != nil { if err != nil {
@ -217,7 +218,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewIstioVirtualServiceSource(kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) return NewIstioVirtualServiceSource(ctx, kubernetesClient, istioClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation)
case "cloudfoundry": case "cloudfoundry":
cfClient, err := p.CloudFoundryClient(cfg.CFAPIEndpoint, cfg.CFUsername, cfg.CFPassword) cfClient, err := p.CloudFoundryClient(cfg.CFAPIEndpoint, cfg.CFUsername, cfg.CFPassword)
if err != nil { if err != nil {
@ -233,13 +234,13 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewAmbassadorHostSource(dynamicClient, kubernetesClient, cfg.Namespace) return NewAmbassadorHostSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace)
case "contour-httpproxy": case "contour-httpproxy":
dynamicClient, err := p.DynamicKubernetesClient() dynamicClient, err := p.DynamicKubernetesClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewContourHTTPProxySource(dynamicClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation) return NewContourHTTPProxySource(ctx, dynamicClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation)
case "gloo-proxy": case "gloo-proxy":
kubernetesClient, err := p.KubeClient() kubernetesClient, err := p.KubeClient()
if err != nil { if err != nil {
@ -255,7 +256,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewOcpRouteSource(ocpClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.OCPRouterName) return NewOcpRouteSource(ctx, ocpClient, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.OCPRouterName)
case "fake": case "fake":
return NewFakeSource(cfg.FQDNTemplate) return NewFakeSource(cfg.FQDNTemplate)
case "connector": case "connector":
@ -290,7 +291,7 @@ func BuildWithConfig(source string, p ClientGenerator, cfg *Config) (Source, err
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewKongTCPIngressSource(dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter) return NewKongTCPIngressSource(ctx, dynamicClient, kubernetesClient, cfg.Namespace, cfg.AnnotationFilter)
} }
return nil, ErrSourceNotFound return nil, ErrSourceNotFound
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package source package source
import ( import (
"context"
"errors" "errors"
"testing" "testing"
@ -115,7 +116,7 @@ func (suite *ByNamesTestSuite) TestAllInitialized() {
}: "TCPIngressesList", }: "TCPIngressesList",
}), nil) }), nil)
sources, err := ByNames(mockClientGenerator, []string{"service", "ingress", "istio-gateway", "contour-httpproxy", "kong-tcpingress", "fake"}, minimalConfig) sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"service", "ingress", "istio-gateway", "contour-httpproxy", "kong-tcpingress", "fake"}, minimalConfig)
suite.NoError(err, "should not generate errors") suite.NoError(err, "should not generate errors")
suite.Len(sources, 6, "should generate all six sources") suite.Len(sources, 6, "should generate all six sources")
} }
@ -124,7 +125,7 @@ func (suite *ByNamesTestSuite) TestOnlyFake() {
mockClientGenerator := new(MockClientGenerator) mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil) mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil)
sources, err := ByNames(mockClientGenerator, []string{"fake"}, minimalConfig) sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"fake"}, minimalConfig)
suite.NoError(err, "should not generate errors") suite.NoError(err, "should not generate errors")
suite.Len(sources, 1, "should generate fake source") suite.Len(sources, 1, "should generate fake source")
suite.Nil(mockClientGenerator.kubeClient, "client should not be created") suite.Nil(mockClientGenerator.kubeClient, "client should not be created")
@ -134,7 +135,7 @@ func (suite *ByNamesTestSuite) TestSourceNotFound() {
mockClientGenerator := new(MockClientGenerator) mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil) mockClientGenerator.On("KubeClient").Return(fakeKube.NewSimpleClientset(), nil)
sources, err := ByNames(mockClientGenerator, []string{"foo"}, minimalConfig) sources, err := ByNames(context.TODO(), mockClientGenerator, []string{"foo"}, minimalConfig)
suite.Equal(err, ErrSourceNotFound, "should return source not found") suite.Equal(err, ErrSourceNotFound, "should return source not found")
suite.Len(sources, 0, "should not returns any source") suite.Len(sources, 0, "should not returns any source")
} }
@ -143,16 +144,16 @@ func (suite *ByNamesTestSuite) TestKubeClientFails() {
mockClientGenerator := new(MockClientGenerator) mockClientGenerator := new(MockClientGenerator)
mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo")) mockClientGenerator.On("KubeClient").Return(nil, errors.New("foo"))
_, err := ByNames(mockClientGenerator, []string{"service"}, minimalConfig) _, err := ByNames(context.TODO(), mockClientGenerator, []string{"service"}, minimalConfig)
suite.Error(err, "should return an error if kubernetes client cannot be created") suite.Error(err, "should return an error if kubernetes client cannot be created")
_, err = ByNames(mockClientGenerator, []string{"ingress"}, minimalConfig) _, err = ByNames(context.TODO(), mockClientGenerator, []string{"ingress"}, minimalConfig)
suite.Error(err, "should return an error if kubernetes client cannot be created") suite.Error(err, "should return an error if kubernetes client cannot be created")
_, err = ByNames(mockClientGenerator, []string{"istio-gateway"}, minimalConfig) _, err = ByNames(context.TODO(), mockClientGenerator, []string{"istio-gateway"}, minimalConfig)
suite.Error(err, "should return an error if kubernetes client cannot be created") suite.Error(err, "should return an error if kubernetes client cannot be created")
_, err = ByNames(mockClientGenerator, []string{"kong-tcpingress"}, minimalConfig) _, err = ByNames(context.TODO(), mockClientGenerator, []string{"kong-tcpingress"}, minimalConfig)
suite.Error(err, "should return an error if kubernetes client cannot be created") suite.Error(err, "should return an error if kubernetes client cannot be created")
} }
@ -162,10 +163,10 @@ func (suite *ByNamesTestSuite) TestIstioClientFails() {
mockClientGenerator.On("IstioClient").Return(nil, errors.New("foo")) mockClientGenerator.On("IstioClient").Return(nil, errors.New("foo"))
mockClientGenerator.On("DynamicKubernetesClient").Return(nil, errors.New("foo")) mockClientGenerator.On("DynamicKubernetesClient").Return(nil, errors.New("foo"))
_, err := ByNames(mockClientGenerator, []string{"istio-gateway"}, minimalConfig) _, err := ByNames(context.TODO(), mockClientGenerator, []string{"istio-gateway"}, minimalConfig)
suite.Error(err, "should return an error if istio client cannot be created") suite.Error(err, "should return an error if istio client cannot be created")
_, err = ByNames(mockClientGenerator, []string{"contour-httpproxy"}, minimalConfig) _, err = ByNames(context.TODO(), mockClientGenerator, []string{"contour-httpproxy"}, minimalConfig)
suite.Error(err, "should return an error if contour client cannot be created") suite.Error(err, "should return an error if contour client cannot be created")
} }