Merge pull request #2220 from ericrrath/crd-source-event-handler

CRD source: add event-handler support
This commit is contained in:
Kubernetes Prow Robot 2023-04-13 23:20:39 -07:00 committed by GitHub
commit e6ec8ea329
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 5 deletions

View File

@ -140,6 +140,7 @@ func main() {
RequestTimeout: cfg.RequestTimeout,
DefaultTargets: cfg.DefaultTargets,
OCPRouterName: cfg.OCPRouterName,
UpdateEvents: cfg.UpdateEvents,
}
// Lookup all the selected sources by names and pass them the desired configuration.

View File

@ -22,6 +22,10 @@ import (
"os"
"strings"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -44,6 +48,7 @@ type crdSource struct {
codec runtime.ParameterCodec
annotationFilter string
labelSelector labels.Selector
informer *cache.SharedInformer
}
func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
@ -103,18 +108,55 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
}
// NewCRDSource creates a new crdSource with the given config.
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) {
return &crdSource{
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) {
sourceCrd := crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
annotationFilter: annotationFilter,
labelSelector: labelSelector,
crdClient: crdClient,
codec: runtime.NewParameterCodec(scheme),
}, nil
}
if startInformer {
// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
// missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations.
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
return sourceCrd.List(context.TODO(), &lo)
},
WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
return sourceCrd.watch(context.TODO(), &lo)
},
},
&endpoint.DNSEndpoint{},
0)
sourceCrd.informer = &informer
go informer.Run(wait.NeverStop)
}
return &sourceCrd, nil
}
func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
if cs.informer != nil {
log.Debug("Adding event handler for CRD")
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
informer := *cs.informer
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
}
}
// Endpoints returns endpoint objects.
@ -189,6 +231,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en
}
}
func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
return cs.crdClient.Get().
Namespace(cs.namespace).
Resource(cs.crdResource).
VersionedParams(opts, cs.codec).
Watch(ctx)
}
func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) {
result = &endpoint.DNSEndpointList{}
err = cs.crdClient.Get().

View File

@ -398,7 +398,12 @@ func testCRDSourceEndpoints(t *testing.T) {
labelSelector, err := labels.Parse(ti.labelFilter)
require.NoError(t, err)
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme)
// At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used
// with informers: https://github.com/kubernetes/kubernetes/issues/95372
// So don't start the informer during testing.
startInformer := false
cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, startInformer)
require.NoError(t, err)
receivedEndpoints, err := cs.Endpoints(context.Background())

View File

@ -72,6 +72,7 @@ type Config struct {
RequestTimeout time.Duration
DefaultTargets []string
OCPRouterName string
UpdateEvents bool
}
// ClientGenerator provides clients
@ -308,7 +309,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg
if err != nil {
return nil, err
}
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme)
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, cfg.UpdateEvents)
case "skipper-routegroup":
apiServerURL := cfg.APIServerURL
tokenPath := ""