Refactor schedule controller with kubebuilder

Refactor schedule controller with kubebuilder

fixes #4671

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开)
2022-03-11 10:16:41 +08:00
parent 20c2073428
commit 016ac129c6
11 changed files with 337 additions and 324 deletions

View File

@@ -18,27 +18,22 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/util/kube"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
)
@@ -46,160 +41,100 @@ const (
scheduleSyncPeriod = time.Minute
)
type scheduleController struct {
*genericController
namespace string
schedulesClient velerov1client.SchedulesGetter
backupsClient velerov1client.BackupsGetter
schedulesLister velerov1listers.ScheduleLister
clock clock.Clock
metrics *metrics.ServerMetrics
type scheduleReconciler struct {
client.Client
namespace string
logger logrus.FieldLogger
clock clock.Clock
metrics *metrics.ServerMetrics
}
func NewScheduleController(
func NewScheduleReconciler(
namespace string,
schedulesClient velerov1client.SchedulesGetter,
backupsClient velerov1client.BackupsGetter,
schedulesInformer velerov1informers.ScheduleInformer,
logger logrus.FieldLogger,
client client.Client,
metrics *metrics.ServerMetrics,
) *scheduleController {
c := &scheduleController{
genericController: newGenericController(Schedule, logger),
namespace: namespace,
schedulesClient: schedulesClient,
backupsClient: backupsClient,
schedulesLister: schedulesInformer.Lister(),
clock: clock.RealClock{},
metrics: metrics,
}
c.syncHandler = c.processSchedule
c.resyncFunc = c.enqueueAllEnabledSchedules
c.resyncPeriod = scheduleSyncPeriod
schedulesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
schedule := obj.(*api.Schedule)
switch schedule.Status.Phase {
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
// add to work queue
default:
c.logger.WithFields(logrus.Fields{
"schedule": kubeutil.NamespaceAndName(schedule),
"phase": schedule.Status.Phase,
}).Debug("Schedule is not new, skipping")
return
}
key, err := cache.MetaNamespaceKeyFunc(schedule)
if err != nil {
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
return
}
c.queue.Add(key)
scheduleName := schedule.GetName()
c.logger.Info("Creating schedule ", scheduleName)
//Init Prometheus metrics to 0 to have them flowing up
metrics.InitSchedule(scheduleName)
},
},
)
return c
}
func (c *scheduleController) enqueueAllEnabledSchedules() {
schedules, err := c.schedulesLister.Schedules(c.namespace).List(labels.NewSelector())
if err != nil {
c.logger.WithError(errors.WithStack(err)).Error("Error listing Schedules")
return
}
for _, schedule := range schedules {
if schedule.Status.Phase != api.SchedulePhaseEnabled {
continue
}
key, err := cache.MetaNamespaceKeyFunc(schedule)
if err != nil {
c.logger.WithError(errors.WithStack(err)).WithField("schedule", schedule).Error("Error creating queue key, item not added to queue")
continue
}
c.queue.Add(key)
) *scheduleReconciler {
return &scheduleReconciler{
Client: client,
namespace: namespace,
logger: logger,
clock: clock.RealClock{},
metrics: metrics,
}
}
func (c *scheduleController) processSchedule(key string) error {
log := c.logger.WithField("key", key)
func (c *scheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(c.logger, mgr.GetClient(), &velerov1.ScheduleList{}, scheduleSyncPeriod)
return ctrl.NewControllerManagedBy(mgr).
For(&velerov1.Schedule{}).
Watches(s, nil).
Complete(c)
}
log.Debug("Running processSchedule")
ns, name, err := cache.SplitMetaNamespaceKey(key)
// +kubebuilder:rbac:groups=velero.io,resources=schedules,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=velero.io,resources=schedules/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=velero.io,resources=backups,verbs=create
func (c *scheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := c.logger.WithField("schedule", req.String())
log.Debug("Getting schedule")
schedule := &velerov1.Schedule{}
if err := c.Get(ctx, req.NamespacedName, schedule); err != nil {
log.WithError(err).Error("error getting schedule")
return ctrl.Result{}, nil
}
if schedule.Status.Phase != "" &&
schedule.Status.Phase != velerov1.SchedulePhaseNew &&
schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
log.Debugf("the schedule phase is %s, isn't %s or %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseNew, velerov1.SchedulePhaseEnabled)
return ctrl.Result{}, nil
}
c.metrics.InitSchedule(schedule.Name)
patchHelper, err := patch.NewHelper(schedule, c.Client)
if err != nil {
return errors.Wrap(err, "error splitting queue key")
log.WithError(err).Error("error new patch helper")
return ctrl.Result{}, nil
}
log.Debug("Getting Schedule")
schedule, err := c.schedulesLister.Schedules(ns).Get(name)
if err != nil {
// schedule no longer exists
if apierrors.IsNotFound(err) {
log.WithError(err).Debug("Schedule not found")
return nil
}
return errors.Wrap(err, "error getting Schedule")
}
switch schedule.Status.Phase {
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
// valid phase for processing
default:
return nil
}
log.Debug("Cloning schedule")
// store ref to original for creating patch
original := schedule
// don't modify items in the cache
schedule = schedule.DeepCopy()
// validation - even if the item is Enabled, we can't trust it
// so re-validate
currentPhase := schedule.Status.Phase
cronSchedule, errs := parseCronSchedule(schedule, c.logger)
if len(errs) > 0 {
schedule.Status.Phase = api.SchedulePhaseFailedValidation
schedule.Status.Phase = velerov1.SchedulePhaseFailedValidation
schedule.Status.ValidationErrors = errs
} else {
schedule.Status.Phase = api.SchedulePhaseEnabled
schedule.Status.Phase = velerov1.SchedulePhaseEnabled
}
// update status if it's changed
if currentPhase != schedule.Status.Phase {
updatedSchedule, err := patchSchedule(original, schedule, c.schedulesClient)
if err != nil {
return errors.Wrapf(err, "error updating Schedule phase to %s", schedule.Status.Phase)
if err = patchHelper.Patch(ctx, schedule); err != nil {
log.WithError(err).Errorf("error updating schedule phase to %s", schedule.Status.Phase)
return ctrl.Result{}, nil
}
schedule = updatedSchedule
}
if schedule.Status.Phase != api.SchedulePhaseEnabled {
return nil
if schedule.Status.Phase != velerov1.SchedulePhaseEnabled {
log.Debugf("the schedule's phase is %s, isn't %s, skip", schedule.Status.Phase, velerov1.SchedulePhaseEnabled)
return ctrl.Result{}, nil
}
// check for the schedule being due to run, and submit a Backup if so
if err := c.submitBackupIfDue(schedule, cronSchedule); err != nil {
return err
if err := c.submitBackupIfDue(ctx, schedule, cronSchedule); err != nil {
log.WithError(err).Error("error running submitBackupIfDue")
}
return nil
return ctrl.Result{}, nil
}
func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
func parseCronSchedule(itm *velerov1.Schedule, logger logrus.FieldLogger) (cron.Schedule, []string) {
var validationErrors []string
var schedule cron.Schedule
@@ -239,7 +174,7 @@ func parseCronSchedule(itm *api.Schedule, logger logrus.FieldLogger) (cron.Sched
return schedule, nil
}
func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error {
func (c *scheduleReconciler) submitBackupIfDue(ctx context.Context, item *velerov1.Schedule, cronSchedule cron.Schedule) error {
var (
now = c.clock.Now()
isDue, nextRunTime = getNextRunTime(item, cronSchedule, now)
@@ -259,23 +194,24 @@ func (c *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule
// lead to performance issues).
log.WithField("nextRunTime", nextRunTime).Info("Schedule is due, submitting Backup")
backup := getBackup(item, now)
if _, err := c.backupsClient.Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{}); err != nil {
if err := c.Create(ctx, backup); err != nil {
return errors.Wrap(err, "error creating Backup")
}
original := item
schedule := item.DeepCopy()
patchHelper, err := patch.NewHelper(item, c.Client)
if err != nil {
return errors.Wrap(err, "error creating patch helper")
}
item.Status.LastBackup = &metav1.Time{Time: now}
schedule.Status.LastBackup = &metav1.Time{Time: now}
if _, err := patchSchedule(original, schedule, c.schedulesClient); err != nil {
return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", schedule.Status.LastBackup)
if err := patchHelper.Patch(ctx, item); err != nil {
return errors.Wrapf(err, "error updating Schedule's LastBackup time to %v", item.Status.LastBackup)
}
return nil
}
func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
func getNextRunTime(schedule *velerov1.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
var lastBackupTime time.Time
if schedule.Status.LastBackup != nil {
lastBackupTime = schedule.Status.LastBackup.Time
@@ -288,7 +224,7 @@ func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf tim
return asOf.After(nextRunTime), nextRunTime
}
func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
func getBackup(item *velerov1.Schedule, timestamp time.Time) *velerov1.Backup {
name := item.TimestampedName(timestamp)
backup := builder.
ForBackup(item.Namespace, name).
@@ -297,27 +233,3 @@ func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
return backup
}
func patchSchedule(original, updated *api.Schedule, client velerov1client.SchedulesGetter) (*api.Schedule, error) {
origBytes, err := json.Marshal(original)
if err != nil {
return nil, errors.Wrap(err, "error marshalling original schedule")
}
updatedBytes, err := json.Marshal(updated)
if err != nil {
return nil, errors.Wrap(err, "error marshalling updated schedule")
}
patchBytes, err := jsonpatch.CreateMergePatch(origBytes, updatedBytes)
if err != nil {
return nil, errors.Wrap(err, "error creating json merge patch for schedule")
}
res, err := client.Schedules(original.Namespace).Patch(context.TODO(), original.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return nil, errors.Wrap(err, "error patching schedule")
}
return res, nil
}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package controller
import (
"encoding/json"
"testing"
"time"
@@ -25,21 +24,22 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/fake"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
"github.com/vmware-tanzu/velero/pkg/metrics"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
func TestProcessSchedule(t *testing.T) {
func TestReconcileOfSchedule(t *testing.T) {
require.Nil(t, velerov1.AddToScheme(scheme.Scheme))
newScheduleBuilder := func(phase velerov1api.SchedulePhase) *builder.ScheduleBuilder {
return builder.ForSchedule("ns", "name").Phase(phase)
}
@@ -49,45 +49,34 @@ func TestProcessSchedule(t *testing.T) {
scheduleKey string
schedule *velerov1api.Schedule
fakeClockTime string
expectedErr bool
expectedPhase string
expectedValidationErrors []string
expectedBackupCreate *velerov1api.Backup
expectedLastBackup string
}{
{
name: "invalid key returns error",
scheduleKey: "invalid/key/value",
expectedErr: true,
},
{
name: "missing schedule returns early without an error",
name: "missing schedule triggers no backup",
scheduleKey: "foo/bar",
expectedErr: false,
},
{
name: "schedule with phase FailedValidation does not get processed",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseFailedValidation).Result(),
expectedErr: false,
name: "schedule with phase FailedValidation triggers no backup",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseFailedValidation).Result(),
},
{
name: "schedule with phase New gets validated and failed if invalid",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseNew).Result(),
expectedErr: false,
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
},
{
name: "schedule with phase <blank> gets validated and failed if invalid",
schedule: newScheduleBuilder(velerov1api.SchedulePhase("")).Result(),
expectedErr: false,
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
},
{
name: "schedule with phase Enabled gets re-validated and failed if invalid",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).Result(),
expectedErr: false,
expectedPhase: string(velerov1api.SchedulePhaseFailedValidation),
expectedValidationErrors: []string{"Schedule must be a non-empty valid Cron expression"},
},
@@ -95,7 +84,6 @@ func TestProcessSchedule(t *testing.T) {
name: "schedule with phase New gets validated and triggers a backup",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseNew).CronSchedule("@every 5m").Result(),
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedPhase: string(velerov1api.SchedulePhaseEnabled),
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
expectedLastBackup: "2017-01-01 12:00:00",
@@ -104,7 +92,7 @@ func TestProcessSchedule(t *testing.T) {
name: "schedule with phase Enabled gets re-validated and triggers a backup if valid",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).CronSchedule("@every 5m").Result(),
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedPhase: string(velerov1api.SchedulePhaseEnabled),
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
expectedLastBackup: "2017-01-01 12:00:00",
},
@@ -112,7 +100,6 @@ func TestProcessSchedule(t *testing.T) {
name: "schedule that's already run gets LastBackup updated",
schedule: newScheduleBuilder(velerov1api.SchedulePhaseEnabled).CronSchedule("@every 5m").LastBackupTime("2000-01-01 00:00:00").Result(),
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedBackupCreate: builder.ForBackup("ns", "name-20170101120000").ObjectMeta(builder.WithLabels(velerov1api.ScheduleNameLabel, "name")).Result(),
expectedLastBackup: "2017-01-01 12:00:00",
},
@@ -121,134 +108,48 @@ func TestProcessSchedule(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
logger = velerotest.NewLogger()
)
c := NewScheduleController(
"namespace",
client.VeleroV1(),
client.VeleroV1(),
sharedInformers.Velero().V1().Schedules(),
logger,
metrics.NewServerMetrics(),
)
var (
client = (&fake.ClientBuilder{}).Build()
logger = velerotest.NewLogger()
testTime time.Time
err error
)
reconciler := NewScheduleReconciler("namespace", logger, client, metrics.NewServerMetrics())
if test.fakeClockTime != "" {
testTime, err = time.Parse("2006-01-02 15:04:05", test.fakeClockTime)
require.NoError(t, err, "unable to parse test.fakeClockTime: %v", err)
}
c.clock = clock.NewFakeClock(testTime)
reconciler.clock = clock.NewFakeClock(testTime)
if test.schedule != nil {
sharedInformers.Velero().V1().Schedules().Informer().GetStore().Add(test.schedule)
// this is necessary so the Patch() call returns the appropriate object
client.PrependReactor("patch", "schedules", func(action core.Action) (bool, runtime.Object, error) {
var (
patch = action.(core.PatchAction).GetPatch()
patchMap = make(map[string]interface{})
res = test.schedule.DeepCopy()
)
if err := json.Unmarshal(patch, &patchMap); err != nil {
t.Logf("error unmarshalling patch: %s\n", err)
return false, nil, err
}
// these are the fields that may be updated by the controller
phase, found, err := unstructured.NestedString(patchMap, "status", "phase")
if err == nil && found {
res.Status.Phase = velerov1api.SchedulePhase(phase)
}
lastBackupStr, found, err := unstructured.NestedString(patchMap, "status", "lastBackup")
if err == nil && found {
parsed, err := time.Parse(time.RFC3339, lastBackupStr)
if err != nil {
t.Logf("error parsing status.lastBackup: %s\n", err)
return false, nil, err
}
res.Status.LastBackup = &metav1.Time{Time: parsed}
}
return true, res, nil
})
require.Nil(t, client.Create(ctx, test.schedule))
}
key := test.scheduleKey
if key == "" && test.schedule != nil {
key, err = cache.MetaNamespaceKeyFunc(test.schedule)
require.NoError(t, err, "error getting key from test.schedule: %v", err)
_, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "ns", Name: "name"}})
require.Nil(t, err)
schedule := &velerov1api.Schedule{}
err = client.Get(ctx, types.NamespacedName{"ns", "name"}, schedule)
if len(test.expectedPhase) > 0 {
require.Nil(t, err)
assert.Equal(t, test.expectedPhase, string(schedule.Status.Phase))
}
if len(test.expectedValidationErrors) > 0 {
require.Nil(t, err)
assert.EqualValues(t, test.expectedValidationErrors, schedule.Status.ValidationErrors)
}
if len(test.expectedLastBackup) > 0 {
require.Nil(t, err)
assert.Equal(t, parseTime(test.expectedLastBackup).Unix(), schedule.Status.LastBackup.Unix())
}
err = c.processSchedule(key)
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
actions := client.Actions()
index := 0
type PatchStatus struct {
ValidationErrors []string `json:"validationErrors"`
Phase velerov1api.SchedulePhase `json:"phase"`
LastBackup time.Time `json:"lastBackup"`
}
type Patch struct {
Status PatchStatus `json:"status"`
}
decode := func(decoder *json.Decoder) (interface{}, error) {
actual := new(Patch)
err := decoder.Decode(actual)
return *actual, err
}
if test.expectedPhase != "" {
require.True(t, len(actions) > index, "len(actions) is too small")
expected := Patch{
Status: PatchStatus{
ValidationErrors: test.expectedValidationErrors,
Phase: velerov1api.SchedulePhase(test.expectedPhase),
},
}
velerotest.ValidatePatch(t, actions[index], expected, decode)
index++
}
if created := test.expectedBackupCreate; created != nil {
require.True(t, len(actions) > index, "len(actions) is too small")
action := core.NewCreateAction(
velerov1api.SchemeGroupVersion.WithResource("backups"),
created.Namespace,
created)
assert.Equal(t, action, actions[index])
index++
}
if test.expectedLastBackup != "" {
require.True(t, len(actions) > index, "len(actions) is too small")
expected := Patch{
Status: PatchStatus{
LastBackup: parseTime(test.expectedLastBackup),
},
}
velerotest.ValidatePatch(t, actions[index], expected, decode)
backups := &velerov1api.BackupList{}
require.Nil(t, client.List(ctx, backups))
if test.expectedBackupCreate == nil {
assert.Equal(t, 0, len(backups.Items))
} else {
assert.Equal(t, 1, len(backups.Items))
}
})
}