mirror of
https://github.com/vmware-tanzu/pinniped.git
synced 2026-01-05 13:07:14 +00:00
Add initial controller boilerplate and example controller
Signed-off-by: Monis Khan <mok@vmware.com>
This commit is contained in:
216
internal/controller/controller.go
Normal file
216
internal/controller/controller.go
Normal file
@@ -0,0 +1,216 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// Controller interface represents a runnable Kubernetes controller.
|
||||
// Cancelling the context passed will cause the controller to shutdown.
|
||||
// Number of workers determine how much parallel the job processing should be.
|
||||
type Controller interface {
|
||||
// Run runs the controller and blocks until the controller is finished.
|
||||
// Number of workers can be specified via workers parameter.
|
||||
// This function will return when all internal loops are finished.
|
||||
// Note that having more than one worker usually means handing parallelization of Sync().
|
||||
Run(ctx context.Context, workers int)
|
||||
|
||||
// Name returns the controller name string.
|
||||
Name() string
|
||||
|
||||
// The methods below should only be called during tests via the Test* functions.
|
||||
|
||||
// sync contains the main controller logic.
|
||||
// This can be used in unit tests to exercise the Syncer by directly calling it.
|
||||
sync(ctx Context) error
|
||||
|
||||
// wrap wraps the main controller logic provided via the Syncer.
|
||||
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
|
||||
// The wrapping must be done after New is called and before Run is called.
|
||||
wrap(wrapper SyncWrapperFunc)
|
||||
}
|
||||
|
||||
var _ Controller = &controller{}
|
||||
|
||||
type Config struct {
|
||||
Name string
|
||||
Syncer Syncer
|
||||
}
|
||||
|
||||
func New(config Config, opts ...Option) Controller {
|
||||
c := &controller{
|
||||
config: config,
|
||||
}
|
||||
|
||||
// set up defaults
|
||||
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
|
||||
WithRecorder(klogRecorder{})(c)
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
type controller struct {
|
||||
config Config
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
queueWrapper Queue
|
||||
maxRetries int
|
||||
recorder events.EventRecorder
|
||||
|
||||
run bool
|
||||
runOpts []Option
|
||||
|
||||
cacheSyncs []cache.InformerSynced
|
||||
}
|
||||
|
||||
func (c *controller) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
||||
|
||||
klog.InfoS("starting controller", "controller", c.Name(), "workers", workers)
|
||||
|
||||
c.run = true
|
||||
for _, opt := range c.runOpts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
if !c.waitForCacheSyncWithTimeout() {
|
||||
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
|
||||
}
|
||||
|
||||
var workerWg sync.WaitGroup
|
||||
|
||||
// workerContext is used to track and initiate worker shutdown
|
||||
workerContext, workerContextCancel := context.WithCancel(context.Background())
|
||||
|
||||
defer func() {
|
||||
c.queue.ShutDown() // shutdown the controller queue first
|
||||
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
|
||||
|
||||
// Wait for all workers to finish their job.
|
||||
// at this point the Run() can hang and callers have to implement the logic that will kill
|
||||
// this controller (SIGKILL).
|
||||
workerWg.Wait()
|
||||
klog.InfoS("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
|
||||
}()
|
||||
|
||||
for i := 1; i <= workers; i++ {
|
||||
idx := i
|
||||
klog.InfoS("starting worker", "controller", c.Name(), "worker", idx)
|
||||
workerWg.Add(1)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
|
||||
defer func() {
|
||||
klog.InfoS("shutting down worker", "controller", c.Name(), "worker", idx)
|
||||
workerWg.Done()
|
||||
}()
|
||||
c.runWorker(workerContext)
|
||||
}()
|
||||
}
|
||||
|
||||
<-ctx.Done() // wait for controller context to be cancelled
|
||||
}
|
||||
|
||||
func (c *controller) Name() string {
|
||||
return c.config.Name
|
||||
}
|
||||
|
||||
func (c *controller) sync(ctx Context) error {
|
||||
return c.config.Syncer.Sync(ctx)
|
||||
}
|
||||
|
||||
func (c *controller) wrap(wrapper SyncWrapperFunc) {
|
||||
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
|
||||
c.config.Syncer = wrapper(c.config.Syncer)
|
||||
}))
|
||||
}
|
||||
|
||||
func (c *controller) waitForCacheSyncWithTimeout() bool {
|
||||
// prevent us from blocking forever due to a broken informer
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
|
||||
}
|
||||
|
||||
func (c *controller) add(filter Filter, object metav1.Object) {
|
||||
key := filter.Parent(object)
|
||||
c.queueWrapper.Add(key)
|
||||
}
|
||||
|
||||
// runWorker runs a single worker
|
||||
// The worker is asked to terminate when the passed context is cancelled.
|
||||
func (c *controller) runWorker(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
c.processNextWorkItem(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) processNextWorkItem(ctx context.Context) {
|
||||
queueKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
|
||||
key := queueKey.(Key)
|
||||
defer c.queue.Done(key)
|
||||
|
||||
syncCtx := Context{
|
||||
Context: ctx,
|
||||
Name: c.Name(),
|
||||
Key: key,
|
||||
Queue: c.queueWrapper,
|
||||
Recorder: c.recorder,
|
||||
}
|
||||
|
||||
err := c.sync(syncCtx)
|
||||
c.handleKey(key, err)
|
||||
}
|
||||
|
||||
func (c *controller) handleKey(key Key, err error) {
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
retryForever := c.maxRetries <= 0
|
||||
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
|
||||
|
||||
if !shouldRetry {
|
||||
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
|
||||
c.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if errors.Is(err, ErrSyntheticRequeue) {
|
||||
// logging this helps detecting wedged controllers with missing pre-requirements
|
||||
klog.V(4).InfoS("requested synthetic requeue", "controller", c.Name(), "key", key)
|
||||
} else {
|
||||
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
|
||||
}
|
||||
|
||||
c.queue.AddRateLimited(key)
|
||||
}
|
||||
15
internal/controller/die.go
Normal file
15
internal/controller/die.go
Normal file
@@ -0,0 +1,15 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
type die string
|
||||
|
||||
func crash(i interface{}) {
|
||||
mustDie, ok := i.(die)
|
||||
if ok {
|
||||
panic(string(mustDie))
|
||||
}
|
||||
}
|
||||
18
internal/controller/error.go
Normal file
18
internal/controller/error.go
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
// ErrSyntheticRequeue can be returned from a Syncer to force a retry artificially for the current key.
|
||||
// This can also be done by re-adding the key to queue, but this is more convenient and has better logging.
|
||||
const ErrSyntheticRequeue = constErr("synthetic requeue request")
|
||||
|
||||
var _ error = constErr("")
|
||||
|
||||
type constErr string
|
||||
|
||||
func (e constErr) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
76
internal/controller/filter.go
Normal file
76
internal/controller/filter.go
Normal file
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
type Filter interface {
|
||||
Add(obj metav1.Object) bool
|
||||
Update(oldObj, newObj metav1.Object) bool
|
||||
Delete(obj metav1.Object) bool
|
||||
|
||||
Parent(obj metav1.Object) Key
|
||||
}
|
||||
|
||||
var _ Filter = FilterFuncs{}
|
||||
|
||||
type ParentFunc func(obj metav1.Object) Key
|
||||
|
||||
type FilterFuncs struct {
|
||||
ParentFunc ParentFunc
|
||||
AddFunc func(obj metav1.Object) bool
|
||||
UpdateFunc func(oldObj, newObj metav1.Object) bool
|
||||
DeleteFunc func(obj metav1.Object) bool
|
||||
}
|
||||
|
||||
func (f FilterFuncs) Parent(obj metav1.Object) Key {
|
||||
if f.ParentFunc == nil {
|
||||
return Key{
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
}
|
||||
}
|
||||
return f.ParentFunc(obj)
|
||||
}
|
||||
|
||||
func (f FilterFuncs) Add(obj metav1.Object) bool {
|
||||
if f.AddFunc == nil {
|
||||
return false
|
||||
}
|
||||
return f.AddFunc(obj)
|
||||
}
|
||||
|
||||
func (f FilterFuncs) Update(oldObj, newObj metav1.Object) bool {
|
||||
if f.UpdateFunc == nil {
|
||||
return false
|
||||
}
|
||||
return f.UpdateFunc(oldObj, newObj)
|
||||
}
|
||||
|
||||
func (f FilterFuncs) Delete(obj metav1.Object) bool {
|
||||
if f.DeleteFunc == nil {
|
||||
return false
|
||||
}
|
||||
return f.DeleteFunc(obj)
|
||||
}
|
||||
|
||||
func FilterByNames(parentFunc ParentFunc, names ...string) Filter {
|
||||
set := sets.NewString(names...)
|
||||
has := func(obj metav1.Object) bool {
|
||||
return set.Has(obj.GetName())
|
||||
}
|
||||
return FilterFuncs{
|
||||
ParentFunc: parentFunc,
|
||||
AddFunc: has,
|
||||
UpdateFunc: func(oldObj, newObj metav1.Object) bool {
|
||||
return has(newObj)
|
||||
},
|
||||
DeleteFunc: has,
|
||||
}
|
||||
}
|
||||
25
internal/controller/informer.go
Normal file
25
internal/controller/informer.go
Normal file
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import "k8s.io/client-go/tools/cache"
|
||||
|
||||
type InformerGetter interface {
|
||||
Informer() cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
type InformerOption struct {
|
||||
SkipSync bool
|
||||
SkipEvents bool
|
||||
|
||||
// TODO maybe add a field like:
|
||||
// ResyncPeriod time.Duration
|
||||
// to support using AddEventHandlerWithResyncPeriod
|
||||
// this field would be mutually exclusive with SkipEvents
|
||||
// I suspect we do not need this level of flexibility and resyncs can mask bugs in controller logic
|
||||
// A related change could be an Option such as WithResyncSchedule to allow for cron style control loops
|
||||
// It is unclear to me if we would ever need that since we assume that all events come from a Kube watch
|
||||
}
|
||||
59
internal/controller/manager.go
Normal file
59
internal/controller/manager.go
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
Start(ctx context.Context)
|
||||
WithController(controller Controller, workers int) Manager
|
||||
}
|
||||
|
||||
func NewManager() Manager {
|
||||
return &controllerManager{}
|
||||
}
|
||||
|
||||
// runnableController represents single controller runnable configuration.
|
||||
type runnableController struct {
|
||||
controller Controller
|
||||
workers int
|
||||
}
|
||||
|
||||
type controllerManager struct {
|
||||
controllers []runnableController
|
||||
}
|
||||
|
||||
var _ Manager = &controllerManager{}
|
||||
|
||||
func (c *controllerManager) WithController(controller Controller, workers int) Manager {
|
||||
c.controllers = append(c.controllers, runnableController{
|
||||
controller: controller,
|
||||
workers: workers,
|
||||
})
|
||||
return c
|
||||
}
|
||||
|
||||
// Start will run all managed controllers and block until all controllers shutdown.
|
||||
// When the context passed is cancelled, all controllers are signalled to shutdown.
|
||||
func (c *controllerManager) Start(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(c.controllers))
|
||||
for i := range c.controllers {
|
||||
idx := i
|
||||
go func() {
|
||||
r := c.controllers[idx]
|
||||
defer klog.InfoS("controller terminated", "controller", r.controller.Name())
|
||||
defer wg.Done()
|
||||
r.controller.Run(ctx, r.workers)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
154
internal/controller/option.go
Normal file
154
internal/controller/option.go
Normal file
@@ -0,0 +1,154 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type Option func(*controller)
|
||||
|
||||
func WithMaxRetries(maxRetries int) Option {
|
||||
return func(c *controller) {
|
||||
c.maxRetries = maxRetries
|
||||
}
|
||||
}
|
||||
|
||||
func WithInitialEvent(key Key) Option {
|
||||
return toNaiveRunOpt(func(c *controller) {
|
||||
c.queueWrapper.Add(key)
|
||||
})
|
||||
}
|
||||
|
||||
func WithRateLimiter(limiter workqueue.RateLimiter) Option {
|
||||
return func(c *controller) {
|
||||
c.queue = workqueue.NewNamedRateLimitingQueue(limiter, c.Name())
|
||||
c.queueWrapper = &queueWrapper{queue: c.queue}
|
||||
}
|
||||
}
|
||||
|
||||
func WithRecorder(recorder events.EventRecorder) Option {
|
||||
return func(c *controller) {
|
||||
c.recorder = recorder
|
||||
}
|
||||
}
|
||||
|
||||
func WithInformer(getter InformerGetter, filter Filter, opt InformerOption) Option {
|
||||
informer := getter.Informer() // immediately signal that we intend to use this informer in case it is lazily initialized
|
||||
return toRunOpt(func(c *controller) {
|
||||
if opt.SkipSync && opt.SkipEvents {
|
||||
panic(die("cannot skip syncing and event handlers at the same time"))
|
||||
}
|
||||
|
||||
if !opt.SkipSync {
|
||||
c.cacheSyncs = append(c.cacheSyncs, informer.HasSynced)
|
||||
}
|
||||
|
||||
if opt.SkipEvents {
|
||||
return
|
||||
}
|
||||
|
||||
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
object := metaOrDie(obj)
|
||||
if filter.Add(object) {
|
||||
klog.V(4).InfoS("handling add",
|
||||
"controller", c.Name(),
|
||||
"namespace", object.GetNamespace(),
|
||||
"name", object.GetName(),
|
||||
"selfLink", object.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
||||
"kind", fmt.Sprintf("%T", object),
|
||||
)
|
||||
c.add(filter, object)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
oldObject := metaOrDie(oldObj)
|
||||
newObject := metaOrDie(newObj)
|
||||
if filter.Update(oldObject, newObject) {
|
||||
klog.V(4).InfoS("handling update",
|
||||
"controller", c.Name(),
|
||||
"namespace", newObject.GetNamespace(),
|
||||
"name", newObject.GetName(),
|
||||
"selfLink", newObject.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
||||
"kind", fmt.Sprintf("%T", newObject),
|
||||
)
|
||||
c.add(filter, newObject)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
//nolint: goerr113
|
||||
utilruntime.HandleError(fmt.Errorf("%s: could not get object from tombstone: %+v", c.Name(), obj))
|
||||
return
|
||||
}
|
||||
accessor, err = meta.Accessor(tombstone.Obj)
|
||||
if err != nil {
|
||||
//nolint: goerr113
|
||||
utilruntime.HandleError(fmt.Errorf("%s: tombstone contained object that is not an accessor: %+v", c.Name(), obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
if filter.Delete(accessor) {
|
||||
klog.V(4).InfoS("handling delete",
|
||||
"controller", c.Name(),
|
||||
"namespace", accessor.GetNamespace(),
|
||||
"name", accessor.GetName(),
|
||||
"selfLink", accessor.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
|
||||
"kind", fmt.Sprintf("%T", accessor),
|
||||
)
|
||||
c.add(filter, accessor)
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// toRunOpt guarantees that an Option only runs once on the first call to Run (and not New), even if a controller is stopped and restarted.
|
||||
func toRunOpt(opt Option) Option {
|
||||
return toOnceOpt(toNaiveRunOpt(opt))
|
||||
}
|
||||
|
||||
// toNaiveRunOpt guarantees that an Option only runs on calls to Run (and not New), even if a controller is stopped and restarted.
|
||||
func toNaiveRunOpt(opt Option) Option {
|
||||
return func(c *controller) {
|
||||
if c.run {
|
||||
opt(c)
|
||||
return
|
||||
}
|
||||
c.runOpts = append(c.runOpts, opt)
|
||||
}
|
||||
}
|
||||
|
||||
// toOnceOpt guarantees that an Option only runs once.
|
||||
func toOnceOpt(opt Option) Option {
|
||||
var once sync.Once
|
||||
return func(c *controller) {
|
||||
once.Do(func() {
|
||||
opt(c)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func metaOrDie(obj interface{}) metav1.Object {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
return accessor
|
||||
}
|
||||
27
internal/controller/option_test.go
Normal file
27
internal/controller/option_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
type getter bool
|
||||
|
||||
func (g *getter) Informer() cache.SharedIndexInformer {
|
||||
*g = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestInformerCalled(t *testing.T) {
|
||||
g := getter(false)
|
||||
_ = New(Config{}, WithInformer(&g, FilterByNames(nil), InformerOption{}))
|
||||
if !g {
|
||||
t.Error("expected InformerGetter.Informer() to be called")
|
||||
}
|
||||
}
|
||||
41
internal/controller/queue.go
Normal file
41
internal/controller/queue.go
Normal file
@@ -0,0 +1,41 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
type Queue interface {
|
||||
// Add immediately adds a key to the queue and marks it as needing processing.
|
||||
Add(key Key)
|
||||
|
||||
// AddRateLimited adds a key to the queue after the rate limiter says it is ok.
|
||||
AddRateLimited(key Key)
|
||||
|
||||
// AddAfter adds a key to the queue after the indicated duration has passed.
|
||||
AddAfter(key Key, duration time.Duration)
|
||||
}
|
||||
|
||||
var _ Queue = &queueWrapper{}
|
||||
|
||||
type queueWrapper struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func (q *queueWrapper) Add(key Key) {
|
||||
q.queue.Add(key)
|
||||
}
|
||||
|
||||
func (q *queueWrapper) AddRateLimited(key Key) {
|
||||
q.queue.AddRateLimited(key)
|
||||
}
|
||||
|
||||
func (q *queueWrapper) AddAfter(key Key, duration time.Duration) {
|
||||
q.queue.AddAfter(key, duration)
|
||||
}
|
||||
29
internal/controller/recorder.go
Normal file
29
internal/controller/recorder.go
Normal file
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var _ events.EventRecorder = klogRecorder{}
|
||||
|
||||
type klogRecorder struct{}
|
||||
|
||||
func (n klogRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
|
||||
klog.V(4).InfoS("recording event",
|
||||
"regarding", regarding,
|
||||
"related", related,
|
||||
"eventtype", eventtype,
|
||||
"reason", reason,
|
||||
"action", action,
|
||||
"message", fmt.Sprintf(note, args...),
|
||||
)
|
||||
}
|
||||
44
internal/controller/sync.go
Normal file
44
internal/controller/sync.go
Normal file
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/client-go/tools/events"
|
||||
)
|
||||
|
||||
var _ Syncer = SyncFunc(nil)
|
||||
|
||||
type Syncer interface {
|
||||
Sync(ctx Context) error
|
||||
}
|
||||
|
||||
type SyncFunc func(ctx Context) error
|
||||
|
||||
func (s SyncFunc) Sync(ctx Context) error {
|
||||
return s(ctx)
|
||||
}
|
||||
|
||||
type Context struct {
|
||||
Context context.Context
|
||||
Name string
|
||||
Key Key
|
||||
Queue Queue
|
||||
Recorder events.EventRecorder
|
||||
}
|
||||
|
||||
type Key struct {
|
||||
Namespace string
|
||||
Name string
|
||||
|
||||
// TODO determine if it makes sense to add a field like:
|
||||
// Extra interface{}
|
||||
// This would allow a custom ParentFunc to pass extra data through to the Syncer
|
||||
// The boxed type would have to be comparable (i.e. usable as a map key)
|
||||
}
|
||||
|
||||
type SyncWrapperFunc func(syncer Syncer) Syncer
|
||||
18
internal/controller/testing.go
Normal file
18
internal/controller/testing.go
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2020 VMware, Inc.
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestSync(t *testing.T, controller Controller, ctx Context) error {
|
||||
t.Helper() // force testing import to discourage external use
|
||||
return controller.sync(ctx)
|
||||
}
|
||||
|
||||
func TestWrap(t *testing.T, controller Controller, wrapper SyncWrapperFunc) {
|
||||
t.Helper() // force testing import to discourage external use
|
||||
controller.wrap(wrapper)
|
||||
}
|
||||
Reference in New Issue
Block a user