Controller refactor code modifications.

1. Add some refactored controllers initiation code into enabledRuntimeControllers.
2. Add reconciler struct initiation function for DownloadRequest and ServerStatusRequest controllers.

Signed-off-by: Xun Jiang <blackpiglet@gmail.com>
This commit is contained in:
Xun Jiang
2022-08-22 20:25:15 +08:00
parent b6cca3f7d3
commit 71e5027bfb
7 changed files with 141 additions and 104 deletions

View File

@@ -0,0 +1 @@
Controller refactor code modifications.

View File

@@ -737,9 +737,13 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
controller.Restore: restoreControllerRunInfo,
}
// Note: all runtime type controllers that can be disabled are grouped separately, below:
enabledRuntimeControllers := make(map[string]struct{})
enabledRuntimeControllers[controller.ServerStatusRequest] = struct{}{}
enabledRuntimeControllers[controller.DownloadRequest] = struct{}{}
enabledRuntimeControllers := map[string]struct{}{
controller.ServerStatusRequest: {},
controller.DownloadRequest: {},
controller.Schedule: {},
controller.ResticRepo: {},
controller.BackupDeletion: {},
}
if s.config.restoreOnly {
s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers")
@@ -802,50 +806,53 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupStorageLocation)
}
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
if _, ok := enabledRuntimeControllers[controller.Schedule]; ok {
if err := controller.NewScheduleReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.metrics).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.Schedule)
}
}
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
if _, ok := enabledRuntimeControllers[controller.ResticRepo]; ok {
if err := controller.NewResticRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.defaultResticMaintenanceFrequency, s.repoManager).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ResticRepo)
}
}
if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.repoManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
backupStoreGetter,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupDeletion)
if _, ok := enabledRuntimeControllers[controller.BackupDeletion]; ok {
if err := controller.NewBackupDeletionReconciler(
s.logger,
s.mgr.GetClient(),
backupTracker,
s.repoManager,
s.metrics,
s.discoveryHelper,
newPluginManager,
backupStoreGetter,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupDeletion)
}
}
if _, ok := enabledRuntimeControllers[controller.ServerStatusRequest]; ok {
r := controller.ServerStatusRequestReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Ctx: s.ctx,
PluginRegistry: s.pluginRegistry,
Clock: clock.RealClock{},
Log: s.logger,
}
if err := r.SetupWithManager(s.mgr); err != nil {
if err := controller.NewServerStatusRequestReconciler(
s.mgr.GetClient(),
s.ctx,
s.pluginRegistry,
clock.RealClock{},
s.logger,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.ServerStatusRequest)
}
}
if _, ok := enabledRuntimeControllers[controller.DownloadRequest]; ok {
r := controller.DownloadRequestReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Clock: clock.RealClock{},
NewPluginManager: newPluginManager,
BackupStoreGetter: backupStoreGetter,
Log: s.logger,
}
r := controller.NewDownloadRequestReconciler(
s.mgr.GetClient(),
clock.RealClock{},
newPluginManager,
backupStoreGetter,
s.logger,
)
if err := r.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.DownloadRequest)
}

View File

@@ -80,14 +80,14 @@ func TestRemoveControllers(t *testing.T) {
errorExpected bool
}{
{
name: "Remove one disabable controller",
name: "Remove one disable controller",
disabledControllers: []string{
controller.Backup,
},
errorExpected: false,
},
{
name: "Remove all disabable controllers",
name: "Remove all disable controllers",
disabledControllers: []string{
controller.Backup,
controller.BackupDeletion,
@@ -102,7 +102,7 @@ func TestRemoveControllers(t *testing.T) {
errorExpected: false,
},
{
name: "Remove with a non-disabable controller included",
name: "Remove with a non-disable controller included",
disabledControllers: []string{
controller.Backup,
controller.BackupStorageLocation,
@@ -110,7 +110,7 @@ func TestRemoveControllers(t *testing.T) {
errorExpected: true,
},
{
name: "Remove with a misspelled/inexisting controller name",
name: "Remove with a misspelled/non-existing controller name",
disabledControllers: []string{
"go",
},
@@ -122,16 +122,16 @@ func TestRemoveControllers(t *testing.T) {
enabledControllers := map[string]func() controllerRunInfo{
controller.BackupSync: func() controllerRunInfo { return controllerRunInfo{} },
controller.Backup: func() controllerRunInfo { return controllerRunInfo{} },
controller.Schedule: func() controllerRunInfo { return controllerRunInfo{} },
controller.GarbageCollection: func() controllerRunInfo { return controllerRunInfo{} },
controller.BackupDeletion: func() controllerRunInfo { return controllerRunInfo{} },
controller.Restore: func() controllerRunInfo { return controllerRunInfo{} },
controller.ResticRepo: func() controllerRunInfo { return controllerRunInfo{} },
controller.DownloadRequest: func() controllerRunInfo { return controllerRunInfo{} },
}
enabledRuntimeControllers := map[string]struct{}{
controller.ServerStatusRequest: {},
controller.Schedule: {},
controller.BackupDeletion: {},
controller.ResticRepo: {},
controller.DownloadRequest: {},
}
totalNumOriginalControllers := len(enabledControllers) + len(enabledRuntimeControllers)

View File

@@ -23,7 +23,6 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,23 +32,39 @@ import (
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
)
// DownloadRequestReconciler reconciles a DownloadRequest object
type DownloadRequestReconciler struct {
Scheme *runtime.Scheme
Client kbclient.Client
Clock clock.Clock
// downloadRequestReconciler reconciles a DownloadRequest object
type downloadRequestReconciler struct {
client kbclient.Client
clock clock.Clock
// use variables to refer to these functions so they can be
// replaced with fakes for testing.
NewPluginManager func(logrus.FieldLogger) clientmgmt.Manager
BackupStoreGetter persistence.ObjectBackupStoreGetter
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupStoreGetter persistence.ObjectBackupStoreGetter
Log logrus.FieldLogger
log logrus.FieldLogger
}
// NewDownloadRequestReconciler initializes and returns downloadRequestReconciler struct.
func NewDownloadRequestReconciler(
client kbclient.Client,
clock clock.Clock,
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
log logrus.FieldLogger,
) *downloadRequestReconciler {
return &downloadRequestReconciler{
client: client,
clock: clock,
newPluginManager: newPluginManager,
backupStoreGetter: backupStoreGetter,
log: log,
}
}
// +kubebuilder:rbac:groups=velero.io,resources=downloadrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=downloadrequests/status,verbs=get;update;patch
func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithFields(logrus.Fields{
func (r *downloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.log.WithFields(logrus.Fields{
"controller": "download-request",
"downloadRequest": req.NamespacedName,
})
@@ -57,7 +72,7 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// Fetch the DownloadRequest instance.
log.Debug("Getting DownloadRequest")
downloadRequest := &velerov1api.DownloadRequest{}
if err := r.Client.Get(ctx, req.NamespacedName, downloadRequest); err != nil {
if err := r.client.Get(ctx, req.NamespacedName, downloadRequest); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find DownloadRequest")
return ctrl.Result{}, nil
@@ -70,19 +85,19 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
original := downloadRequest.DeepCopy()
defer func() {
// Always attempt to Patch the downloadRequest object and status after each reconciliation.
if err := r.Client.Patch(ctx, downloadRequest, kbclient.MergeFrom(original)); err != nil {
if err := r.client.Patch(ctx, downloadRequest, kbclient.MergeFrom(original)); err != nil {
log.WithError(err).Error("Error updating download request")
return
}
}()
if downloadRequest.Status != (velerov1api.DownloadRequestStatus{}) && downloadRequest.Status.Expiration != nil {
if downloadRequest.Status.Expiration.Time.Before(r.Clock.Now()) {
if downloadRequest.Status.Expiration.Time.Before(r.clock.Now()) {
// Delete any request that is expired, regardless of the phase: it is not
// worth proceeding and trying/retrying to find it.
log.Debug("DownloadRequest has expired - deleting")
if err := r.Client.Delete(ctx, downloadRequest); err != nil {
if err := r.client.Delete(ctx, downloadRequest); err != nil {
log.WithError(err).Error("Error deleting an expired download request")
return ctrl.Result{}, errors.WithStack(err)
}
@@ -103,12 +118,12 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if downloadRequest.Status.Phase == "" || downloadRequest.Status.Phase == velerov1api.DownloadRequestPhaseNew {
// Update the expiration.
downloadRequest.Status.Expiration = &metav1.Time{Time: r.Clock.Now().Add(persistence.DownloadURLTTL)}
downloadRequest.Status.Expiration = &metav1.Time{Time: r.clock.Now().Add(persistence.DownloadURLTTL)}
if downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreLog ||
downloadRequest.Spec.Target.Kind == velerov1api.DownloadTargetKindRestoreResults {
restore := &velerov1api.Restore{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: downloadRequest.Namespace,
Name: downloadRequest.Spec.Target.Name,
}, restore); err != nil {
@@ -118,7 +133,7 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
backup := &velerov1api.Backup{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: downloadRequest.Namespace,
Name: backupName,
}, backup); err != nil {
@@ -126,17 +141,17 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
location := &velerov1api.BackupStorageLocation{}
if err := r.Client.Get(ctx, kbclient.ObjectKey{
if err := r.client.Get(ctx, kbclient.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Spec.StorageLocation,
}, location); err != nil {
return ctrl.Result{}, errors.WithStack(err)
}
pluginManager := r.NewPluginManager(log)
pluginManager := r.newPluginManager(log)
defer pluginManager.CleanupClients()
backupStore, err := r.BackupStoreGetter.Get(location, pluginManager, log)
backupStore, err := r.backupStoreGetter.Get(location, pluginManager, log)
if err != nil {
log.WithError(err).Error("Error getting a backup store")
return ctrl.Result{}, errors.WithStack(err)
@@ -149,7 +164,7 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
downloadRequest.Status.Phase = velerov1api.DownloadRequestPhaseProcessed
// Update the expiration again to extend the time we wait (the TTL) to start after successfully processing the URL.
downloadRequest.Status.Expiration = &metav1.Time{Time: r.Clock.Now().Add(persistence.DownloadURLTTL)}
downloadRequest.Status.Expiration = &metav1.Time{Time: r.clock.Now().Add(persistence.DownloadURLTTL)}
}
// Requeue is mostly to handle deleting any expired requests that were not
@@ -157,7 +172,7 @@ func (r *DownloadRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{Requeue: true}, nil
}
func (r *DownloadRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *downloadRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.DownloadRequest{}).
Complete(r)

View File

@@ -87,7 +87,7 @@ var _ = Describe("Download Request Reconciler", func() {
test.downloadRequest.Status.Expiration = &metav1.Time{Time: rClock.Now().Add(-1 * time.Minute)}
}
fakeClient := fake.NewFakeClientWithScheme(scheme.Scheme)
fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()
err = fakeClient.Create(context.TODO(), test.downloadRequest)
Expect(err).To(BeNil())
@@ -109,13 +109,13 @@ var _ = Describe("Download Request Reconciler", func() {
// Setup reconciler
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
r := DownloadRequestReconciler{
Client: fakeClient,
Clock: rClock,
NewPluginManager: func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
BackupStoreGetter: NewFakeObjectBackupStoreGetter(backupStores),
Log: velerotest.NewLogger(),
}
r := NewDownloadRequestReconciler(
fakeClient,
rClock,
func(logrus.FieldLogger) clientmgmt.Manager { return pluginManager },
NewFakeObjectBackupStoreGetter(backupStores),
velerotest.NewLogger(),
)
if test.backupLocation != nil && test.expectGetsURL {
backupStores[test.backupLocation.Name].On("GetDownloadURL", test.downloadRequest.Spec.Target).Return("a-url", nil)
@@ -136,7 +136,7 @@ var _ = Describe("Download Request Reconciler", func() {
}
instance := &velerov1api.DownloadRequest{}
err = r.Client.Get(ctx, kbclient.ObjectKey{Name: test.downloadRequest.Name, Namespace: test.downloadRequest.Namespace}, instance)
err = r.client.Get(ctx, kbclient.ObjectKey{Name: test.downloadRequest.Name, Namespace: test.downloadRequest.Namespace}, instance)
if test.expired {
Expect(instance).ToNot(Equal(test.downloadRequest))
@@ -153,7 +153,7 @@ var _ = Describe("Download Request Reconciler", func() {
if test.expectGetsURL {
Expect(string(instance.Status.Phase)).To(Equal(string(velerov1api.DownloadRequestPhaseProcessed)))
Expect(instance.Status.DownloadURL).To(Equal("a-url"))
Expect(velerotest.TimesAreEqual(instance.Status.Expiration.Time, r.Clock.Now().Add(signedURLTTL))).To(BeTrue())
Expect(velerotest.TimesAreEqual(instance.Status.Expiration.Time, r.clock.Now().Add(signedURLTTL))).To(BeTrue())
}
},

View File

@@ -24,7 +24,6 @@ import (
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -46,21 +45,36 @@ type PluginLister interface {
List(kind framework.PluginKind) []framework.PluginIdentifier
}
// ServerStatusRequestReconciler reconciles a ServerStatusRequest object
type ServerStatusRequestReconciler struct {
Scheme *runtime.Scheme
Client client.Client
Ctx context.Context
PluginRegistry PluginLister
Clock clock.Clock
// serverStatusRequestReconciler reconciles a ServerStatusRequest object
type serverStatusRequestReconciler struct {
client client.Client
ctx context.Context
pluginRegistry PluginLister
clock clock.Clock
Log logrus.FieldLogger
log logrus.FieldLogger
}
// NewServerStatusRequestReconciler initializes and returns serverStatusRequestReconciler struct.
func NewServerStatusRequestReconciler(
client client.Client,
ctx context.Context,
pluginRegistry PluginLister,
clock clock.Clock,
log logrus.FieldLogger) *serverStatusRequestReconciler {
return &serverStatusRequestReconciler{
client: client,
ctx: ctx,
pluginRegistry: pluginRegistry,
clock: clock,
log: log,
}
}
// +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests/status,verbs=get;update;patch
func (r *ServerStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithFields(logrus.Fields{
func (r *serverStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.log.WithFields(logrus.Fields{
"controller": ServerStatusRequest,
"serverStatusRequest": req.NamespacedName,
})
@@ -68,7 +82,7 @@ func (r *ServerStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.
// Fetch the ServerStatusRequest instance.
log.Debug("Getting ServerStatusRequest")
statusRequest := &velerov1api.ServerStatusRequest{}
if err := r.Client.Get(r.Ctx, req.NamespacedName, statusRequest); err != nil {
if err := r.client.Get(r.ctx, req.NamespacedName, statusRequest); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find ServerStatusRequest")
return ctrl.Result{}, nil
@@ -78,7 +92,7 @@ func (r *ServerStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, err
}
log = r.Log.WithFields(logrus.Fields{
log = r.log.WithFields(logrus.Fields{
"controller": ServerStatusRequest,
"serverStatusRequest": req.NamespacedName,
"phase": statusRequest.Status.Phase,
@@ -90,23 +104,23 @@ func (r *ServerStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.
original := statusRequest.DeepCopy()
statusRequest.Status.ServerVersion = buildinfo.Version
statusRequest.Status.Phase = velerov1api.ServerStatusRequestPhaseProcessed
statusRequest.Status.ProcessedTimestamp = &metav1.Time{Time: r.Clock.Now()}
statusRequest.Status.Plugins = velero.GetInstalledPluginInfo(r.PluginRegistry)
statusRequest.Status.ProcessedTimestamp = &metav1.Time{Time: r.clock.Now()}
statusRequest.Status.Plugins = velero.GetInstalledPluginInfo(r.pluginRegistry)
if err := r.Client.Patch(r.Ctx, statusRequest, client.MergeFrom(original)); err != nil {
if err := r.client.Patch(r.ctx, statusRequest, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Error updating ServerStatusRequest status")
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, err
}
case velerov1api.ServerStatusRequestPhaseProcessed:
log.Debug("Checking whether ServerStatusRequest has expired")
expiration := statusRequest.Status.ProcessedTimestamp.Add(ttl)
if expiration.After(r.Clock.Now()) {
if expiration.After(r.clock.Now()) {
log.Debug("ServerStatusRequest has not expired")
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, nil
}
log.Debug("ServerStatusRequest has expired, deleting it")
if err := r.Client.Delete(r.Ctx, statusRequest); err != nil {
if err := r.client.Delete(r.ctx, statusRequest); err != nil {
log.WithError(err).Error("Unable to delete the request")
return ctrl.Result{}, nil
}
@@ -119,7 +133,7 @@ func (r *ServerStatusRequestReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, nil
}
func (r *ServerStatusRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *serverStatusRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1api.ServerStatusRequest{}).
WithOptions(controller.Options{

View File

@@ -62,15 +62,15 @@ var _ = Describe("Server Status Request Reconciler", func() {
func(test request) {
// Setup reconciler
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
r := ServerStatusRequestReconciler{
Client: fake.NewFakeClientWithScheme(scheme.Scheme, test.req),
Ctx: context.Background(),
PluginRegistry: test.reqPluginLister,
Clock: clock.NewFakeClock(now),
Log: velerotest.NewLogger(),
}
r := NewServerStatusRequestReconciler(
fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(test.req).Build(),
context.Background(),
test.reqPluginLister,
clock.NewFakeClock(now),
velerotest.NewLogger(),
)
actualResult, err := r.Reconcile(r.Ctx, ctrl.Request{
actualResult, err := r.Reconcile(r.ctx, ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: velerov1api.DefaultNamespace,
Name: test.req.Name,
@@ -86,7 +86,7 @@ var _ = Describe("Server Status Request Reconciler", func() {
}
instance := &velerov1api.ServerStatusRequest{}
err = r.Client.Get(ctx, kbclient.ObjectKey{Name: test.req.Name, Namespace: test.req.Namespace}, instance)
err = r.client.Get(ctx, kbclient.ObjectKey{Name: test.req.Name, Namespace: test.req.Namespace}, instance)
// Assertions
if test.expected == nil {