mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-08 06:15:40 +00:00
Migrate ServerStatusRequest controller and resource to kubebuilder (#2838)
* Convert ServerStatusRequest controller to controller-runtime Signed-off-by: Carlisia <carlisia@vmware.com> * Add select stm Signed-off-by: Carlisia <carlisia@vmware.com> * Fixed status patch bug Signed-off-by: Carlisia <carlisia@vmware.com> * Add mgr start Signed-off-by: Carlisia <carlisia@vmware.com> * Trying to sync Signed-off-by: Carlisia <carlisia@vmware.com> * Clean async now Signed-off-by: Carlisia <carlisia@vmware.com> * Clean up + move context out Signed-off-by: Carlisia <carlisia@vmware.com> * Bug: not closing the channel Signed-off-by: Carlisia <carlisia@vmware.com> * Clean up some tests Signed-off-by: Carlisia <carlisia@vmware.com> * Much better way to fetch an update using a backoff loop Signed-off-by: Carlisia <carlisia@vmware.com> * Even better way to retry: use apimachinery lib Signed-off-by: Carlisia <carlisia@vmware.com> * Refactor controller + add test Signed-off-by: Carlisia <carlisia@vmware.com> * partially fix unit tests Signed-off-by: Ashish Amarnath <ashisham@vmware.com> * Fix and add tests Signed-off-by: Carlisia <carlisia@vmware.com> * Add changelog Signed-off-by: Carlisia <carlisia@vmware.com> * Add ability to disable the controller + cleanups Signed-off-by: Carlisia <carlisia@vmware.com> * Fix bug w/ disabling controllers + fix test + clean up Signed-off-by: Carlisia <carlisia@vmware.com> * Move role.yaml to the correct folder Signed-off-by: Carlisia <carlisia@vmware.com> * Add sample serverstatusrequest.yaml Signed-off-by: Carlisia <carlisia@vmware.com> * Add requeue + better formatting Signed-off-by: Carlisia <carlisia@vmware.com> * Increase # of max concurrent reconciles Signed-off-by: Carlisia <carlisia@vmware.com> Co-authored-by: Ashish Amarnath <ashisham@vmware.com>
This commit is contained in:
@@ -17,104 +17,105 @@ limitations under the License.
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/velero"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov1client "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned/typed/velero/v1"
|
||||
velerov1informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions/velero/v1"
|
||||
velerov1listers "github.com/vmware-tanzu/velero/pkg/generated/listers/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
|
||||
"github.com/vmware-tanzu/velero/pkg/serverstatusrequest"
|
||||
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
|
||||
)
|
||||
|
||||
const statusRequestResyncPeriod = 5 * time.Minute
|
||||
const (
|
||||
ttl = time.Minute
|
||||
statusRequestResyncPeriod = 5 * time.Minute
|
||||
)
|
||||
|
||||
type statusRequestController struct {
|
||||
*genericController
|
||||
|
||||
client velerov1client.ServerStatusRequestsGetter
|
||||
lister velerov1listers.ServerStatusRequestLister
|
||||
pluginRegistry clientmgmt.Registry
|
||||
clock clock.Clock
|
||||
type PluginLister interface {
|
||||
// List returns all PluginIdentifiers for kind.
|
||||
List(kind framework.PluginKind) []framework.PluginIdentifier
|
||||
}
|
||||
|
||||
func NewServerStatusRequestController(
|
||||
logger logrus.FieldLogger,
|
||||
client velerov1client.ServerStatusRequestsGetter,
|
||||
informer velerov1informers.ServerStatusRequestInformer,
|
||||
pluginRegistry clientmgmt.Registry,
|
||||
) *statusRequestController {
|
||||
c := &statusRequestController{
|
||||
genericController: newGenericController("serverstatusrequest", logger),
|
||||
client: client,
|
||||
lister: informer.Lister(),
|
||||
pluginRegistry: pluginRegistry,
|
||||
// ServerStatusRequestReconciler reconciles a ServerStatusRequest object
|
||||
type ServerStatusRequestReconciler struct {
|
||||
Scheme *runtime.Scheme
|
||||
Client client.Client
|
||||
Ctx context.Context
|
||||
ServerStatus velero.ServerStatus
|
||||
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
|
||||
c.syncHandler = c.processItem
|
||||
c.resyncFunc = c.enqueueAllItems
|
||||
c.resyncPeriod = statusRequestResyncPeriod
|
||||
|
||||
informer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
req := obj.(*velerov1api.ServerStatusRequest)
|
||||
key := kubeutil.NamespaceAndName(req)
|
||||
|
||||
c.logger.WithFields(logrus.Fields{
|
||||
"serverStatusRequest": key,
|
||||
"phase": req.Status.Phase,
|
||||
}).Debug("Enqueueing server status request")
|
||||
|
||||
c.queue.Add(key)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
Log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func (c *statusRequestController) processItem(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
|
||||
log.Debug("Running processItem")
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error splitting queue key")
|
||||
}
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=velero.io,resources=serverstatusrequests/status,verbs=get;update;patch
|
||||
func (r *ServerStatusRequestReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
log := r.Log.WithFields(logrus.Fields{
|
||||
"controller": "serverstatusrequest",
|
||||
"serverStatusRequest": req.NamespacedName,
|
||||
})
|
||||
|
||||
// Fetch the ServerStatusRequest instance.
|
||||
log.Debug("Getting ServerStatusRequest")
|
||||
req, err := c.lister.ServerStatusRequests(ns).Get(name)
|
||||
// server status request no longer exists
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithError(err).Debug("ServerStatusRequest not found")
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting ServerStatusRequest")
|
||||
statusRequest := &velerov1api.ServerStatusRequest{}
|
||||
if err := r.Client.Get(r.Ctx, req.NamespacedName, statusRequest); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.WithError(err).Error("ServerStatusRequest not found")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
log.WithError(err).Error("Error getting ServerStatusRequest")
|
||||
// Error reading the object - requeue the request.
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
return serverstatusrequest.Process(req.DeepCopy(), c.client, c.pluginRegistry, c.clock, log)
|
||||
log = r.Log.WithFields(logrus.Fields{
|
||||
"controller": "serverstatusrequest",
|
||||
"serverStatusRequest": req.NamespacedName,
|
||||
"phase": statusRequest.Status.Phase,
|
||||
})
|
||||
|
||||
switch statusRequest.Status.Phase {
|
||||
case "", velerov1api.ServerStatusRequestPhaseNew:
|
||||
log.Info("Processing new ServerStatusRequest")
|
||||
|
||||
if err := r.ServerStatus.PatchStatusProcessed(r.Client, statusRequest, r.Ctx); err != nil {
|
||||
log.WithError(err).Error("Unable to update the request")
|
||||
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, err
|
||||
}
|
||||
case velerov1api.ServerStatusRequestPhaseProcessed:
|
||||
log.Debug("Checking whether ServerStatusRequest has expired")
|
||||
expiration := statusRequest.Status.ProcessedTimestamp.Add(ttl)
|
||||
if expiration.After(r.ServerStatus.Clock.Now()) {
|
||||
log.Debug("ServerStatusRequest has not expired")
|
||||
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, nil
|
||||
}
|
||||
|
||||
log.Debug("ServerStatusRequest has expired, deleting it")
|
||||
if err := r.Client.Delete(r.Ctx, statusRequest); err != nil {
|
||||
log.WithError(err).Error("Unable to delete the request")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
default:
|
||||
return ctrl.Result{}, errors.New("unexpected ServerStatusRequest phase")
|
||||
}
|
||||
|
||||
// Requeue is mostly to handle deleting any expired status requests that were not
|
||||
// deleted as part of the normal client flow for whatever reason.
|
||||
return ctrl.Result{RequeueAfter: statusRequestResyncPeriod}, nil
|
||||
}
|
||||
|
||||
func (c *statusRequestController) enqueueAllItems() {
|
||||
items, err := c.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
c.logger.WithError(err).Error("Error listing all server status requests")
|
||||
return
|
||||
}
|
||||
|
||||
for _, req := range items {
|
||||
c.enqueue(req)
|
||||
}
|
||||
func (r *ServerStatusRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&velerov1api.ServerStatusRequest{}).
|
||||
WithOptions(controller.Options{
|
||||
MaxConcurrentReconciles: 10,
|
||||
}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
270
pkg/controller/server_status_request_controller_test.go
Normal file
270
pkg/controller/server_status_request_controller_test.go
Normal file
@@ -0,0 +1,270 @@
|
||||
/*
|
||||
Copyright 2020 the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/velero"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
"github.com/vmware-tanzu/velero/pkg/buildinfo"
|
||||
"github.com/vmware-tanzu/velero/pkg/plugin/framework"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
)
|
||||
|
||||
func statusRequestBuilder(resourceVersion string) *builder.ServerStatusRequestBuilder {
|
||||
return builder.ForServerStatusRequest(velerov1api.DefaultNamespace, "sr-1", resourceVersion)
|
||||
}
|
||||
|
||||
var _ = Describe("Server Status Request Reconciler", func() {
|
||||
BeforeEach(func() {})
|
||||
AfterEach(func() {})
|
||||
|
||||
It("Should successfully patch a server status request object status phase", func() {
|
||||
// now will be used to set the fake clock's time; capture
|
||||
// it here so it can be referenced in the test case defs.
|
||||
now, err := time.Parse(time.RFC1123, time.RFC1123)
|
||||
Expect(err).To(BeNil())
|
||||
now = now.Local()
|
||||
|
||||
tests := []struct {
|
||||
req *velerov1api.ServerStatusRequest
|
||||
reqPluginLister *fakePluginLister
|
||||
expected *velerov1api.ServerStatusRequest
|
||||
expectedRequeue ctrl.Result
|
||||
expectedErrMsg string
|
||||
}{
|
||||
{
|
||||
// server status request with phase=empty will be processed
|
||||
req: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
reqPluginLister: &fakePluginLister{
|
||||
plugins: []framework.PluginIdentifier{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseProcessed).
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
expectedRequeue: ctrl.Result{Requeue: false, RequeueAfter: statusRequestResyncPeriod},
|
||||
},
|
||||
{
|
||||
// server status request with phase=new will be processed
|
||||
req: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseNew).
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
reqPluginLister: &fakePluginLister{
|
||||
plugins: []framework.PluginIdentifier{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseProcessed).
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
expectedRequeue: ctrl.Result{Requeue: false, RequeueAfter: statusRequestResyncPeriod},
|
||||
},
|
||||
{
|
||||
// server status request with phase=Processed does not get deleted if not expired
|
||||
req: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseProcessed).
|
||||
ProcessedTimestamp(now). // not yet expired
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myotherown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
reqPluginLister: &fakePluginLister{
|
||||
plugins: []framework.PluginIdentifier{
|
||||
{
|
||||
Name: "custom.io/myotherown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseProcessed).
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
expectedRequeue: ctrl.Result{Requeue: false, RequeueAfter: statusRequestResyncPeriod},
|
||||
},
|
||||
{
|
||||
// server status request with phase=Processed gets deleted if expire
|
||||
req: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase(velerov1api.ServerStatusRequestPhaseProcessed).
|
||||
ProcessedTimestamp(now.Add(-61 * time.Second)). // expired
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myotherown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
reqPluginLister: &fakePluginLister{
|
||||
plugins: []framework.PluginIdentifier{
|
||||
{
|
||||
Name: "custom.io/myotherown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: nil,
|
||||
expectedRequeue: ctrl.Result{Requeue: false, RequeueAfter: statusRequestResyncPeriod},
|
||||
},
|
||||
{
|
||||
// server status request with invalid phase returns an error and does not requeue
|
||||
req: statusRequestBuilder("1").
|
||||
ServerVersion(buildinfo.Version).
|
||||
Phase("an-invalid-phase").
|
||||
ProcessedTimestamp(now).
|
||||
Plugins([]velerov1api.PluginInfo{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
}).
|
||||
Result(),
|
||||
reqPluginLister: &fakePluginLister{
|
||||
plugins: []framework.PluginIdentifier{
|
||||
{
|
||||
Name: "custom.io/myown",
|
||||
Kind: "VolumeSnapshotter",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErrMsg: "unexpected ServerStatusRequest phase",
|
||||
expectedRequeue: ctrl.Result{Requeue: false, RequeueAfter: 0},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
// Setup reconciler
|
||||
Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed())
|
||||
serverStatusInfo := velero.ServerStatus{
|
||||
PluginRegistry: test.reqPluginLister,
|
||||
Clock: clock.NewFakeClock(now),
|
||||
}
|
||||
r := ServerStatusRequestReconciler{
|
||||
Client: fake.NewFakeClientWithScheme(scheme.Scheme, test.req),
|
||||
ServerStatus: serverStatusInfo,
|
||||
Ctx: context.Background(),
|
||||
Log: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
actualResult, err := r.Reconcile(ctrl.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: velerov1api.DefaultNamespace,
|
||||
Name: test.req.Name,
|
||||
},
|
||||
})
|
||||
|
||||
Expect(actualResult).To(BeEquivalentTo(test.expectedRequeue))
|
||||
if test.expectedErrMsg == "" {
|
||||
Expect(err).To(BeNil())
|
||||
} else {
|
||||
Expect(err.Error()).To(BeEquivalentTo(test.expectedErrMsg))
|
||||
return
|
||||
}
|
||||
|
||||
instance := &velerov1api.ServerStatusRequest{}
|
||||
err = r.Client.Get(ctx, kbclient.ObjectKey{Name: test.req.Name, Namespace: test.req.Namespace}, instance)
|
||||
|
||||
// Assertions
|
||||
if test.expected == nil {
|
||||
Expect(apierrors.IsNotFound(err)).To(BeTrue())
|
||||
} else {
|
||||
Expect(err).To(BeNil())
|
||||
Eventually(instance.Status.Phase == test.expected.Status.Phase, timeout).Should(BeTrue())
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
type fakePluginLister struct {
|
||||
plugins []framework.PluginIdentifier
|
||||
}
|
||||
|
||||
func (l *fakePluginLister) List(kind framework.PluginKind) []framework.PluginIdentifier {
|
||||
var plugins []framework.PluginIdentifier
|
||||
for _, plugin := range l.plugins {
|
||||
if plugin.Kind == kind {
|
||||
plugins = append(plugins, plugin)
|
||||
}
|
||||
}
|
||||
|
||||
return plugins
|
||||
}
|
||||
Reference in New Issue
Block a user