Initial commit

Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
This commit is contained in:
Andy Goldstein
2017-08-02 13:27:17 -04:00
commit 2fe501f527
2024 changed files with 948288 additions and 0 deletions

View File

@@ -0,0 +1,343 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"sync"
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/scheme"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/util/collections"
"github.com/heptio/ark/pkg/util/encode"
)
const backupVersion = 1
type backupController struct {
backupper backup.Backupper
backupService cloudprovider.BackupService
bucket string
lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
syncHandler func(backupName string) error
queue workqueue.RateLimitingInterface
clock clock.Clock
}
func NewBackupController(
backupInformer informers.BackupInformer,
client arkv1client.BackupsGetter,
backupper backup.Backupper,
backupService cloudprovider.BackupService,
bucket string,
) Interface {
c := &backupController{
backupper: backupper,
backupService: backupService,
bucket: bucket,
lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"),
clock: &clock.RealClock{},
}
c.syncHandler = c.processBackup
backupInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
backup := obj.(*api.Backup)
switch backup.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
default:
glog.V(4).Infof("Backup %s/%s has phase %s - skipping", backup.Namespace, backup.Name, backup.Status.Phase)
return
}
key, err := cache.MetaNamespaceKeyFunc(backup)
if err != nil {
glog.Errorf("error creating queue key for %#v: %v", backup, err)
return
}
c.queue.Add(key)
},
},
)
return c
}
// Run is a blocking function that runs the specified number of worker goroutines
// to process items in the work queue. It will return when it receives on the
// ctx.Done() channel.
func (controller *backupController) Run(ctx context.Context, numWorkers int) error {
var wg sync.WaitGroup
defer func() {
glog.Infof("Waiting for workers to finish their work")
controller.queue.ShutDown()
// We have to wait here in the deferred function instead of at the bottom of the function body
// because we have to shut down the queue in order for the workers to shut down gracefully, and
// we want to shut down the queue via defer and not at the end of the body.
wg.Wait()
glog.Infof("All workers have finished")
}()
glog.Info("Starting BackupController")
defer glog.Infof("Shutting down BackupController")
glog.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), controller.listerSynced) {
return errors.New("timed out waiting for caches to sync")
}
glog.Info("Caches are synced")
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
wait.Until(controller.runWorker, time.Second, ctx.Done())
wg.Done()
}()
}
<-ctx.Done()
return nil
}
func (controller *backupController) runWorker() {
// continually take items off the queue (waits if it's
// empty) until we get a shutdown signal from the queue
for controller.processNextWorkItem() {
}
}
func (controller *backupController) processNextWorkItem() bool {
key, quit := controller.queue.Get()
if quit {
return false
}
// always call done on this item, since if it fails we'll add
// it back with rate-limiting below
defer controller.queue.Done(key)
err := controller.syncHandler(key.(string))
if err == nil {
// If you had no error, tell the queue to stop tracking history for your key. This will reset
// things like failure counts for per-item rate limiting.
controller.queue.Forget(key)
return true
}
glog.Errorf("syncHandler error: %v", err)
// we had an error processing the item so add it back
// into the queue for re-processing with rate-limiting
controller.queue.AddRateLimited(key)
return true
}
func (controller *backupController) processBackup(key string) error {
glog.V(4).Infof("processBackup for key %q", key)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error splitting key %q: %v", key, err)
return err
}
glog.V(4).Infof("Getting backup %s", key)
backup, err := controller.lister.Backups(ns).Get(name)
if err != nil {
glog.V(4).Infof("error getting backup %s: %v", key, err)
return err
}
// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
switch backup.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
default:
return nil
}
glog.V(4).Infof("Cloning backup %s", key)
// don't modify items in the cache
backup, err = cloneBackup(backup)
if err != nil {
glog.V(4).Infof("error cloning backup %s: %v", key, err)
return err
}
// set backup version
backup.Status.Version = backupVersion
// included resources defaulting
if len(backup.Spec.IncludedResources) == 0 {
backup.Spec.IncludedResources = []string{"*"}
}
// included namespace defaulting
if len(backup.Spec.IncludedNamespaces) == 0 {
backup.Spec.IncludedNamespaces = []string{"*"}
}
// calculate expiration
if backup.Spec.TTL.Duration > 0 {
backup.Status.Expiration = metav1.NewTime(controller.clock.Now().Add(backup.Spec.TTL.Duration))
}
// validation
if backup.Status.ValidationErrors = controller.getValidationErrors(backup); len(backup.Status.ValidationErrors) > 0 {
backup.Status.Phase = api.BackupPhaseFailedValidation
} else {
backup.Status.Phase = api.BackupPhaseInProgress
}
// update status
updatedBackup, err := controller.client.Backups(ns).Update(backup)
if err != nil {
glog.V(4).Infof("error updating status to %s: %v", backup.Status.Phase, err)
return err
}
backup = updatedBackup
if backup.Status.Phase == api.BackupPhaseFailedValidation {
return nil
}
glog.V(4).Infof("running backup for %s", key)
// execution & upload of backup
if err := controller.runBackup(backup, controller.bucket); err != nil {
glog.V(4).Infof("backup %s failed: %v", key, err)
backup.Status.Phase = api.BackupPhaseFailed
}
glog.V(4).Infof("updating backup %s final status", key)
if _, err = controller.client.Backups(ns).Update(backup); err != nil {
glog.V(4).Infof("error updating backup %s final status: %v", key, err)
}
return nil
}
func cloneBackup(in interface{}) (*api.Backup, error) {
clone, err := scheme.Scheme.DeepCopy(in)
if err != nil {
return nil, err
}
out, ok := clone.(*api.Backup)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", clone)
}
return out, nil
}
func (controller *backupController) getValidationErrors(itm *api.Backup) []string {
var validationErrors []string
for err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
}
for err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedNamespaces, itm.Spec.ExcludedNamespaces) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
return validationErrors
}
func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return err
}
defer func() {
var errs []error
errs = append(errs, err)
if closeErr := backupFile.Close(); closeErr != nil {
errs = append(errs, closeErr)
}
if removeErr := os.Remove(backupFile.Name()); removeErr != nil {
errs = append(errs, removeErr)
}
err = kuberrs.NewAggregate(errs)
}()
if err := controller.backupper.Backup(backup, backupFile); err != nil {
return err
}
// note: updating this here so the uploaded JSON shows "completed". If
// the upload fails, we'll alter the phase in the calling func.
glog.V(4).Infof("backup %s/%s completed", backup.Namespace, backup.Name)
backup.Status.Phase = api.BackupPhaseCompleted
buf := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", buf); err != nil {
return err
}
// re-set the file offset to 0 for reading
_, err = backupFile.Seek(0, 0)
if err != nil {
return err
}
return controller.backupService.UploadBackup(bucket, backup.Name, bytes.NewReader(buf.Bytes()), backupFile)
}

View File

@@ -0,0 +1,253 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"io"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/fake"
"github.com/heptio/ark/pkg/generated/clientset/scheme"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
. "github.com/heptio/ark/pkg/util/test"
)
type fakeBackupper struct {
mock.Mock
}
func (b *fakeBackupper) Backup(backup *v1.Backup, data io.Writer) error {
args := b.Called(backup, data)
return args.Error(0)
}
func TestProcessBackup(t *testing.T) {
tests := []struct {
name string
key string
expectError bool
expectedIncludes []string
expectedExcludes []string
backup *TestBackup
expectBackup bool
}{
{
name: "bad key",
key: "bad/key/here",
expectError: true,
},
{
name: "lister failed",
key: "heptio-ark/backup1",
expectError: true,
},
{
name: "do not process phase FailedValidation",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseFailedValidation),
expectBackup: false,
},
{
name: "do not process phase InProgress",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseInProgress),
expectBackup: false,
},
{
name: "do not process phase Completed",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseCompleted),
expectBackup: false,
},
{
name: "do not process phase Failed",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseFailed),
expectBackup: false,
},
{
name: "do not process phase other",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase("arg"),
expectBackup: false,
},
{
name: "invalid included/excluded resources fails validation",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedResources("foo").WithExcludedResources("foo"),
expectBackup: false,
},
{
name: "invalid included/excluded namespaces fails validation",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedNamespaces("foo").WithExcludedNamespaces("foo"),
expectBackup: false,
},
{
name: "make sure specified included and excluded resources are honored",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedResources("i", "j").WithExcludedResources("k", "l"),
expectedIncludes: []string{"i", "j"},
expectedExcludes: []string{"k", "l"},
expectBackup: true,
},
{
name: "if includednamespaces are specified, don't default to *",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedNamespaces("ns-1"),
expectedIncludes: []string{"*"},
expectBackup: true,
},
{
name: "ttl",
key: "heptio-ark/backup1",
backup: NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithTTL(10 * time.Minute),
expectedIncludes: []string{"*"},
expectBackup: true,
},
}
// flag.Set("logtostderr", "true")
// flag.Set("v", "4")
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
backupper := &fakeBackupper{}
cloudBackups := &fakeBackupService{}
sharedInformers := informers.NewSharedInformerFactory(client, 0)
c := NewBackupController(
sharedInformers.Ark().V1().Backups(),
client.ArkV1(),
backupper,
cloudBackups,
"bucket",
).(*backupController)
c.clock = clock.NewFakeClock(time.Now())
var expiration time.Time
var expectedNSes []string
if test.backup != nil {
// add directly to the informer's store so the lister can function and so we don't have to
// start the shared informers.
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup.Backup)
if test.backup.Spec.TTL.Duration > 0 {
expiration = c.clock.Now().Add(test.backup.Spec.TTL.Duration)
}
// set up a Backup object to represent what we expect to be passed to backupper.Backup()
copy, err := scheme.Scheme.Copy(test.backup.Backup)
assert.NoError(t, err, "copy error")
backup := copy.(*v1.Backup)
backup.Spec.IncludedResources = test.expectedIncludes
backup.Spec.ExcludedResources = test.expectedExcludes
if test.backup.Spec.IncludedNamespaces == nil {
expectedNSes = []string{"*"}
} else {
expectedNSes = test.backup.Spec.IncludedNamespaces
}
backup.Spec.IncludedNamespaces = expectedNSes
backup.Status.Phase = v1.BackupPhaseInProgress
backup.Status.Expiration.Time = expiration
backup.Status.Version = 1
backupper.On("Backup", backup, mock.Anything).Return(nil)
cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything).Return(nil)
}
// this is necessary so the Update() call returns the appropriate object
client.PrependReactor("update", "backups", func(action core.Action) (bool, runtime.Object, error) {
obj := action.(core.UpdateAction).GetObject()
// need to deep copy so we can test the backup state for each call to update
copy, err := scheme.Scheme.DeepCopy(obj)
if err != nil {
return false, nil, err
}
ret := copy.(runtime.Object)
return true, ret, nil
})
// method under test
err := c.processBackup(test.key)
if test.expectError {
require.Error(t, err, "processBackup should error")
return
}
require.NoErrorf(t, err, "processBackup unexpected error: %v", err)
if !test.expectBackup {
assert.Empty(t, backupper.Calls)
assert.Empty(t, cloudBackups.Calls)
return
}
expectedActions := []core.Action{
core.NewUpdateAction(
v1.SchemeGroupVersion.WithResource("backups"),
v1.DefaultNamespace,
NewTestBackup().
WithName(test.backup.Name).
WithPhase(v1.BackupPhaseInProgress).
WithIncludedResources(test.expectedIncludes...).
WithExcludedResources(test.expectedExcludes...).
WithIncludedNamespaces(expectedNSes...).
WithTTL(test.backup.Spec.TTL.Duration).
WithExpiration(expiration).
WithVersion(1).
Backup,
),
core.NewUpdateAction(
v1.SchemeGroupVersion.WithResource("backups"),
v1.DefaultNamespace,
NewTestBackup().
WithName(test.backup.Name).
WithPhase(v1.BackupPhaseCompleted).
WithIncludedResources(test.expectedIncludes...).
WithExcludedResources(test.expectedExcludes...).
WithIncludedNamespaces(expectedNSes...).
WithTTL(test.backup.Spec.TTL.Duration).
WithExpiration(expiration).
WithVersion(1).
Backup,
),
}
assert.Equal(t, expectedActions, client.Actions())
})
}
}

View File

@@ -0,0 +1,77 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
)
type backupSyncController struct {
client arkv1client.BackupsGetter
backupService cloudprovider.BackupService
bucket string
syncPeriod time.Duration
}
func NewBackupSyncController(client arkv1client.BackupsGetter, backupService cloudprovider.BackupService, bucket string, syncPeriod time.Duration) Interface {
if syncPeriod < time.Minute {
glog.Infof("Backup sync period %v is too short. Setting to 1 minute", syncPeriod)
syncPeriod = time.Minute
}
return &backupSyncController{
client: client,
backupService: backupService,
bucket: bucket,
syncPeriod: syncPeriod,
}
}
// Run is a blocking function that continually runs the object storage -> Ark API
// sync process according to the controller's syncPeriod. It will return when it
// receives on the ctx.Done() channel.
func (c *backupSyncController) Run(ctx context.Context, workers int) error {
glog.Info("Running backup sync controller")
wait.Until(c.run, c.syncPeriod, ctx.Done())
return nil
}
func (c *backupSyncController) run() {
glog.Info("Syncing backups from object storage")
backups, err := c.backupService.GetAllBackups(c.bucket)
if err != nil {
glog.Errorf("error listing backups: %v", err)
return
}
glog.Infof("Found %d backups", len(backups))
for _, cloudBackup := range backups {
glog.Infof("Syncing backup %s/%s", cloudBackup.Namespace, cloudBackup.Name)
cloudBackup.ResourceVersion = ""
if _, err := c.client.Backups(cloudBackup.Namespace).Create(cloudBackup); err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("error syncing backup %s/%s from object storage: %v", cloudBackup.Namespace, cloudBackup.Name, err)
}
}
}

View File

@@ -0,0 +1,93 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
core "k8s.io/client-go/testing"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/fake"
. "github.com/heptio/ark/pkg/util/test"
)
func TestRun(t *testing.T) {
tests := []struct {
name string
cloudBackups map[string][]*api.Backup
backupSvcErr error
}{
{
name: "no cloud backups",
},
{
name: "backup service returns error on GetAllBackups",
cloudBackups: map[string][]*api.Backup{
"nonexistent-bucket": []*api.Backup{
NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
},
},
},
{
name: "normal case",
cloudBackups: map[string][]*api.Backup{
"bucket": []*api.Backup{
NewTestBackup().WithNamespace("ns-1").WithName("backup-1").Backup,
NewTestBackup().WithNamespace("ns-1").WithName("backup-2").Backup,
NewTestBackup().WithNamespace("ns-2").WithName("backup-3").Backup,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
bs = &fakeBackupService{backupsByBucket: test.cloudBackups}
client = fake.NewSimpleClientset()
)
c := NewBackupSyncController(
client.ArkV1(),
bs,
"bucket",
time.Duration(0),
).(*backupSyncController)
c.run()
expectedActions := make([]core.Action, 0)
// we only expect creates for items within the target bucket
for _, cloudBackup := range test.cloudBackups["bucket"] {
action := core.NewCreateAction(
api.SchemeGroupVersion.WithResource("backups"),
cloudBackup.Namespace,
cloudBackup,
)
expectedActions = append(expectedActions, action)
}
assert.Equal(t, expectedActions, client.Actions())
})
}
}

View File

@@ -0,0 +1,149 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"github.com/heptio/ark/pkg/cloudprovider"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
)
// gcController removes expired backup content from object storage.
type gcController struct {
backupService cloudprovider.BackupService
snapshotService cloudprovider.SnapshotService
bucket string
syncPeriod time.Duration
clock clock.Clock
lister listers.BackupLister
listerSynced cache.InformerSynced
client arkv1client.BackupsGetter
}
// NewGCController constructs a new gcController.
func NewGCController(
backupService cloudprovider.BackupService,
snapshotService cloudprovider.SnapshotService,
bucket string,
syncPeriod time.Duration,
backupInformer informers.BackupInformer,
client arkv1client.BackupsGetter,
) Interface {
if syncPeriod < time.Minute {
glog.Infof("GC sync period %v is too short. Setting to 1 minute", syncPeriod)
syncPeriod = time.Minute
}
return &gcController{
backupService: backupService,
snapshotService: snapshotService,
bucket: bucket,
syncPeriod: syncPeriod,
clock: clock.RealClock{},
lister: backupInformer.Lister(),
listerSynced: backupInformer.Informer().HasSynced,
client: client,
}
}
var _ Interface = &gcController{}
// Run is a blocking function that runs a single worker to garbage-collect backups
// from object/block storage and the Ark API. It will return when it receives on the
// ctx.Done() channel.
func (c *gcController) Run(ctx context.Context, workers int) error {
glog.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), c.listerSynced) {
return errors.New("timed out waiting for caches to sync")
}
glog.Info("Caches are synced")
wait.Until(c.run, c.syncPeriod, ctx.Done())
return nil
}
func (c *gcController) run() {
c.cleanBackups()
}
// cleanBackups deletes expired backups.
func (c *gcController) cleanBackups() {
backups, err := c.backupService.GetAllBackups(c.bucket)
if err != nil {
glog.Errorf("error getting all backups: %v", err)
return
}
now := c.clock.Now()
glog.Infof("garbage-collecting backups that have expired as of %v", now)
// GC backup files and associated snapshots/API objects. Note that deletion from object
// storage should happen first because otherwise there's a possibility the backup sync
// controller would re-create the API object after deletion.
for _, backup := range backups {
if backup.Status.Expiration.Time.Before(now) {
glog.Infof("Removing backup %s/%s", backup.Namespace, backup.Name)
if err := c.backupService.DeleteBackup(c.bucket, backup.Name); err != nil {
glog.Errorf("error deleting backup %s/%s: %v", backup.Namespace, backup.Name, err)
}
for _, volumeBackup := range backup.Status.VolumeBackups {
glog.Infof("Removing snapshot %s associated with backup %s/%s", volumeBackup.SnapshotID, backup.Namespace, backup.Name)
if err := c.snapshotService.DeleteSnapshot(volumeBackup.SnapshotID); err != nil {
glog.Errorf("error deleting snapshot %v: %v", volumeBackup.SnapshotID, err)
}
}
glog.Infof("Removing backup API object %s/%s", backup.Namespace, backup.Name)
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, &metav1.DeleteOptions{}); err != nil {
glog.Errorf("error deleting backup API object %s/%s: %v", backup.Namespace, backup.Name, err)
}
} else {
glog.Infof("Backup %s/%s has not expired yet, skipping", backup.Namespace, backup.Name)
}
}
// also GC any Backup API objects without files in object storage
apiBackups, err := c.lister.List(labels.NewSelector())
if err != nil {
glog.Errorf("error getting all backup API objects: %v", err)
}
for _, backup := range apiBackups {
if backup.Status.Expiration.Time.Before(now) {
glog.Infof("Removing backup API object %s/%s", backup.Namespace, backup.Name)
if err := c.client.Backups(backup.Namespace).Delete(backup.Name, &metav1.DeleteOptions{}); err != nil {
glog.Errorf("error deleting backup API object %s/%s: %v", backup.Namespace, backup.Name, err)
}
} else {
glog.Infof("Backup %s/%s has not expired yet, skipping", backup.Namespace, backup.Name)
}
}
}

View File

@@ -0,0 +1,318 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"bytes"
"errors"
"io"
"io/ioutil"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
. "github.com/heptio/ark/pkg/util/test"
)
type gcTest struct {
name string
bucket string
backups map[string][]*api.Backup
snapshots sets.String
expectedBackupsRemaining map[string]sets.String
expectedSnapshotsRemaining sets.String
}
func TestGarbageCollect(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
tests := []gcTest{
gcTest{
name: "basic-expired",
bucket: "bucket-1",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1*time.Second)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2"),
expectedBackupsRemaining: make(map[string]sets.String),
expectedSnapshotsRemaining: sets.NewString(),
},
gcTest{
name: "basic-unexpired",
bucket: "bucket-1",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1*time.Minute)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2"),
expectedBackupsRemaining: map[string]sets.String{
"bucket-1": sets.NewString("backup-1"),
},
expectedSnapshotsRemaining: sets.NewString("snapshot-1", "snapshot-2"),
},
gcTest{
name: "one expired, one unexpired",
bucket: "bucket-1",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1*time.Minute)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
NewTestBackup().WithName("backup-2").
WithExpiration(fakeClock.Now().Add(1*time.Minute)).
WithSnapshot("pv-3", "snapshot-3").
WithSnapshot("pv-4", "snapshot-4").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2", "snapshot-3", "snapshot-4"),
expectedBackupsRemaining: map[string]sets.String{
"bucket-1": sets.NewString("backup-2"),
},
expectedSnapshotsRemaining: sets.NewString("snapshot-3", "snapshot-4"),
},
gcTest{
name: "none expired in target bucket",
bucket: "bucket-2",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1*time.Minute)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
},
"bucket-2": []*api.Backup{
NewTestBackup().WithName("backup-2").
WithExpiration(fakeClock.Now().Add(1*time.Minute)).
WithSnapshot("pv-3", "snapshot-3").
WithSnapshot("pv-4", "snapshot-4").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2", "snapshot-3", "snapshot-4"),
expectedBackupsRemaining: map[string]sets.String{
"bucket-1": sets.NewString("backup-1"),
"bucket-2": sets.NewString("backup-2"),
},
expectedSnapshotsRemaining: sets.NewString("snapshot-1", "snapshot-2", "snapshot-3", "snapshot-4"),
},
gcTest{
name: "orphan snapshots",
bucket: "bucket-1",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1*time.Minute)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2", "snapshot-3", "snapshot-4"),
expectedBackupsRemaining: make(map[string]sets.String),
expectedSnapshotsRemaining: sets.NewString("snapshot-3", "snapshot-4"),
},
}
for _, test := range tests {
backupService := &fakeBackupService{}
snapshotService := &FakeSnapshotService{}
t.Run(test.name, func(t *testing.T) {
backupService.backupsByBucket = make(map[string][]*api.Backup)
for bucket, backups := range test.backups {
data := make([]*api.Backup, 0, len(backups))
for _, backup := range backups {
data = append(data, backup)
}
backupService.backupsByBucket[bucket] = data
}
snapshotService.SnapshotsTaken = test.snapshots
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
)
controller := NewGCController(
backupService,
snapshotService,
test.bucket,
1*time.Millisecond,
sharedInformers.Ark().V1().Backups(),
client.ArkV1(),
).(*gcController)
controller.clock = fakeClock
controller.cleanBackups()
// verify every bucket has the backups we expect
for bucket, backups := range backupService.backupsByBucket {
// if actual and expected are both empty, no further verification needed
if len(backups) == 0 && len(test.expectedBackupsRemaining[bucket]) == 0 {
continue
}
// get all the actual backups remaining in this bucket
backupNames := sets.NewString()
for _, backup := range backupService.backupsByBucket[bucket] {
backupNames.Insert(backup.Name)
}
assert.Equal(t, test.expectedBackupsRemaining[bucket], backupNames)
}
assert.Equal(t, test.expectedSnapshotsRemaining, snapshotService.SnapshotsTaken)
})
}
}
func TestGarbageCollectPicksUpBackupUponExpiration(t *testing.T) {
var (
backupService = &fakeBackupService{}
snapshotService = &FakeSnapshotService{}
fakeClock = clock.NewFakeClock(time.Now())
assert = assert.New(t)
)
scenario := gcTest{
name: "basic-expired",
bucket: "bucket-1",
backups: map[string][]*api.Backup{
"bucket-1": []*api.Backup{
NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1*time.Second)).
WithSnapshot("pv-1", "snapshot-1").
WithSnapshot("pv-2", "snapshot-2").
Backup,
},
},
snapshots: sets.NewString("snapshot-1", "snapshot-2"),
}
backupService.backupsByBucket = make(map[string][]*api.Backup)
for bucket, backups := range scenario.backups {
data := make([]*api.Backup, 0, len(backups))
for _, backup := range backups {
data = append(data, backup)
}
backupService.backupsByBucket[bucket] = data
}
snapshotService.SnapshotsTaken = scenario.snapshots
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
)
controller := NewGCController(
backupService,
snapshotService,
scenario.bucket,
1*time.Millisecond,
sharedInformers.Ark().V1().Backups(),
client.ArkV1(),
).(*gcController)
controller.clock = fakeClock
// PASS 1
controller.cleanBackups()
assert.Equal(scenario.backups, backupService.backupsByBucket, "backups should not be garbage-collected yet.")
assert.Equal(scenario.snapshots, snapshotService.SnapshotsTaken, "snapshots should not be garbage-collected yet.")
// PASS 2
fakeClock.Step(1 * time.Minute)
controller.cleanBackups()
assert.Equal(0, len(backupService.backupsByBucket[scenario.bucket]), "backups should have been garbage-collected.")
assert.Equal(0, len(snapshotService.SnapshotsTaken), "snapshots should have been garbage-collected.")
}
type fakeBackupService struct {
backupsByBucket map[string][]*api.Backup
mock.Mock
}
func (s *fakeBackupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
backups, found := s.backupsByBucket[bucket]
if !found {
return nil, errors.New("bucket not found")
}
return backups, nil
}
func (bs *fakeBackupService) UploadBackup(bucket, name string, metadata, backup io.ReadSeeker) error {
args := bs.Called(bucket, name, metadata, backup)
return args.Error(0)
}
func (s *fakeBackupService) DownloadBackup(bucket, name string) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader([]byte("hello world"))), nil
}
func (s *fakeBackupService) DeleteBackup(bucket, backupName string) error {
backups, err := s.GetAllBackups(bucket)
if err != nil {
return err
}
deleteIdx := -1
for i, backup := range backups {
if backup.Name == backupName {
deleteIdx = i
break
}
}
if deleteIdx == -1 {
return errors.New("backup not found")
}
s.backupsByBucket[bucket] = append(s.backupsByBucket[bucket][0:deleteIdx], s.backupsByBucket[bucket][deleteIdx+1:]...)
return nil
}

View File

@@ -0,0 +1,25 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import "context"
// Interface represents a runnable component.
type Interface interface {
// Run runs the component.
Run(ctx context.Context, workers int) error
}

View File

@@ -0,0 +1,333 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/generated/clientset/scheme"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/restore"
)
type restoreController struct {
restoreClient arkv1client.RestoresGetter
backupClient arkv1client.BackupsGetter
restorer restore.Restorer
backupService cloudprovider.BackupService
bucket string
backupLister listers.BackupLister
backupListerSynced cache.InformerSynced
restoreLister listers.RestoreLister
restoreListerSynced cache.InformerSynced
syncHandler func(restoreName string) error
queue workqueue.RateLimitingInterface
}
func NewRestoreController(
restoreInformer informers.RestoreInformer,
restoreClient arkv1client.RestoresGetter,
backupClient arkv1client.BackupsGetter,
restorer restore.Restorer,
backupService cloudprovider.BackupService,
bucket string,
backupInformer informers.BackupInformer,
) Interface {
c := &restoreController{
restoreClient: restoreClient,
backupClient: backupClient,
restorer: restorer,
backupService: backupService,
bucket: bucket,
backupLister: backupInformer.Lister(),
backupListerSynced: backupInformer.Informer().HasSynced,
restoreLister: restoreInformer.Lister(),
restoreListerSynced: restoreInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "restore"),
}
c.syncHandler = c.processRestore
restoreInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
restore := obj.(*api.Restore)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
// only process new restores
default:
glog.V(4).Infof("Restore %s/%s has phase %s - skipping", restore.Namespace, restore.Name, restore.Status.Phase)
return
}
key, err := cache.MetaNamespaceKeyFunc(restore)
if err != nil {
glog.Errorf("error creating queue key for %#v: %v", restore, err)
return
}
c.queue.Add(key)
},
},
)
return c
}
// Run is a blocking function that runs the specified number of worker goroutines
// to process items in the work queue. It will return when it receives on the
// ctx.Done() channel.
func (controller *restoreController) Run(ctx context.Context, numWorkers int) error {
var wg sync.WaitGroup
defer func() {
glog.Infof("Waiting for workers to finish their work")
controller.queue.ShutDown()
// We have to wait here in the deferred function instead of at the bottom of the function body
// because we have to shut down the queue in order for the workers to shut down gracefully, and
// we want to shut down the queue via defer and not at the end of the body.
wg.Wait()
glog.Infof("All workers have finished")
}()
glog.Info("Starting RestoreController")
defer glog.Info("Shutting down RestoreController")
glog.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), controller.backupListerSynced, controller.restoreListerSynced) {
return errors.New("timed out waiting for caches to sync")
}
glog.Info("Caches are synced")
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
wait.Until(controller.runWorker, time.Second, ctx.Done())
wg.Done()
}()
}
<-ctx.Done()
return nil
}
func (controller *restoreController) runWorker() {
// continually take items off the queue (waits if it's
// empty) until we get a shutdown signal from the queue
for controller.processNextWorkItem() {
}
}
func (controller *restoreController) processNextWorkItem() bool {
key, quit := controller.queue.Get()
if quit {
return false
}
// always call done on this item, since if it fails we'll add
// it back with rate-limiting below
defer controller.queue.Done(key)
err := controller.syncHandler(key.(string))
if err == nil {
// If you had no error, tell the queue to stop tracking history for your key. This will reset
// things like failure counts for per-item rate limiting.
controller.queue.Forget(key)
return true
}
glog.Errorf("syncHandler error: %v", err)
// we had an error processing the item so add it back
// into the queue for re-processing with rate-limiting
controller.queue.AddRateLimited(key)
return true
}
func (controller *restoreController) processRestore(key string) error {
glog.V(4).Infof("processRestore for key %q", key)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error splitting key %q: %v", key, err)
return err
}
glog.V(4).Infof("Getting restore %s", key)
restore, err := controller.restoreLister.Restores(ns).Get(name)
if err != nil {
glog.V(4).Infof("error getting restore %s: %v", key, err)
return err
}
// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
switch restore.Status.Phase {
case "", api.RestorePhaseNew:
// only process new restores
default:
return nil
}
glog.V(4).Infof("Cloning restore %s", key)
// don't modify items in the cache
restore, err = cloneRestore(restore)
if err != nil {
glog.V(4).Infof("error cloning restore %s: %v", key, err)
return err
}
// validation
if restore.Status.ValidationErrors = controller.getValidationErrors(restore); len(restore.Status.ValidationErrors) > 0 {
restore.Status.Phase = api.RestorePhaseFailedValidation
} else {
restore.Status.Phase = api.RestorePhaseInProgress
}
if len(restore.Spec.Namespaces) == 0 {
restore.Spec.Namespaces = []string{"*"}
}
// update status
updatedRestore, err := controller.restoreClient.Restores(ns).Update(restore)
if err != nil {
glog.V(4).Infof("error updating status to %s: %v", restore.Status.Phase, err)
return err
}
restore = updatedRestore
if restore.Status.Phase == api.RestorePhaseFailedValidation {
return nil
}
glog.V(4).Infof("running restore for %s", key)
// execution & upload of restore
restore.Status.Warnings, restore.Status.Errors = controller.runRestore(restore, controller.bucket)
glog.V(4).Infof("restore %s completed", key)
restore.Status.Phase = api.RestorePhaseCompleted
glog.V(4).Infof("updating restore %s final status", key)
if _, err = controller.restoreClient.Restores(ns).Update(restore); err != nil {
glog.V(4).Infof("error updating restore %s final status: %v", key, err)
}
return nil
}
func cloneRestore(in interface{}) (*api.Restore, error) {
clone, err := scheme.Scheme.DeepCopy(in)
if err != nil {
return nil, err
}
out, ok := clone.(*api.Restore)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", clone)
}
return out, nil
}
func (controller *restoreController) getValidationErrors(itm *api.Restore) []string {
var validationErrors []string
if itm.Spec.BackupName == "" {
validationErrors = append(validationErrors, "BackupName must be non-empty and correspond to the name of a backup in object storage.")
}
return validationErrors
}
func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (warnings, errors api.RestoreResult) {
backup, err := controller.backupLister.Backups(api.DefaultNamespace).Get(restore.Spec.BackupName)
if err != nil {
glog.Errorf("error getting backup: %v", err)
errors.Cluster = append(errors.Ark, err.Error())
return
}
tmpFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket)
if err != nil {
glog.Errorf("error downloading backup: %v", err)
errors.Cluster = append(errors.Ark, err.Error())
return
}
defer func() {
if err := tmpFile.Close(); err != nil {
errors.Cluster = append(errors.Ark, err.Error())
}
if err := os.Remove(tmpFile.Name()); err != nil {
errors.Cluster = append(errors.Ark, err.Error())
}
}()
return controller.restorer.Restore(restore, backup, tmpFile)
}
func downloadToTempFile(backupName string, backupService cloudprovider.BackupService, bucket string) (*os.File, error) {
readCloser, err := backupService.DownloadBackup(bucket, backupName)
if err != nil {
return nil, err
}
defer readCloser.Close()
file, err := ioutil.TempFile("", backupName)
if err != nil {
return nil, err
}
n, err := io.Copy(file, readCloser)
if err != nil {
return nil, err
}
glog.V(4).Infof("copied %d bytes", n)
if _, err := file.Seek(0, 0); err != nil {
glog.V(4).Infof("error seeking: %v", err)
return nil, err
}
return file, nil
}

View File

@@ -0,0 +1,247 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"errors"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
. "github.com/heptio/ark/pkg/util/test"
)
func TestProcessRestore(t *testing.T) {
tests := []struct {
name string
restoreKey string
restore *api.Restore
backup *api.Backup
restorerError error
expectedErr bool
expectedRestoreUpdates []*api.Restore
expectedRestorerCall *api.Restore
}{
{
name: "invalid key returns error",
restoreKey: "invalid/key/value",
expectedErr: true,
},
{
name: "missing restore returns error",
restoreKey: "foo/bar",
expectedErr: true,
},
{
name: "restore with phase InProgress does not get processed",
restore: NewTestRestore("foo", "bar", api.RestorePhaseInProgress).Restore,
expectedErr: false,
},
{
name: "restore with phase Completed does not get processed",
restore: NewTestRestore("foo", "bar", api.RestorePhaseCompleted).Restore,
expectedErr: false,
},
{
name: "restore with phase FailedValidation does not get processed",
restore: NewTestRestore("foo", "bar", api.RestorePhaseFailedValidation).Restore,
expectedErr: false,
},
{
name: "new restore with empty backup name fails validation",
restore: NewTestRestore("foo", "bar", api.RestorePhaseNew).WithRestorableNamespace("ns-1").Restore,
expectedErr: false,
expectedRestoreUpdates: []*api.Restore{
NewTestRestore("foo", "bar", api.RestorePhaseFailedValidation).
WithRestorableNamespace("ns-1").
WithValidationError("BackupName must be non-empty and correspond to the name of a backup in object storage.").Restore,
},
},
{
name: "restore with non-existent backup name fails",
restore: NewTestRestore("foo", "bar", api.RestorePhaseNew).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
expectedErr: false,
expectedRestoreUpdates: []*api.Restore{
NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
NewTestRestore("foo", "bar", api.RestorePhaseCompleted).
WithBackup("backup-1").
WithRestorableNamespace("ns-1").
WithErrors(api.RestoreResult{
Cluster: []string{"backup.ark.heptio.com \"backup-1\" not found"},
}).
Restore,
},
},
{
name: "restorer throwing an error causes the restore to fail",
restore: NewTestRestore("foo", "bar", api.RestorePhaseNew).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
backup: NewTestBackup().WithName("backup-1").Backup,
restorerError: errors.New("blarg"),
expectedErr: false,
expectedRestoreUpdates: []*api.Restore{
NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
NewTestRestore("foo", "bar", api.RestorePhaseCompleted).
WithBackup("backup-1").
WithRestorableNamespace("ns-1").
WithErrors(api.RestoreResult{
Namespaces: map[string][]string{
"ns-1": {"blarg"},
},
}).Restore,
},
expectedRestorerCall: NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
},
{
name: "valid restore gets executed",
restore: NewTestRestore("foo", "bar", api.RestorePhaseNew).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
backup: NewTestBackup().WithName("backup-1").Backup,
expectedErr: false,
expectedRestoreUpdates: []*api.Restore{
NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
NewTestRestore("foo", "bar", api.RestorePhaseCompleted).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
},
expectedRestorerCall: NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("ns-1").Restore,
},
{
name: "restore with no restorable namespaces gets defaulted to *",
restore: NewTestRestore("foo", "bar", api.RestorePhaseNew).WithBackup("backup-1").Restore,
backup: NewTestBackup().WithName("backup-1").Backup,
expectedErr: false,
expectedRestoreUpdates: []*api.Restore{
NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("*").Restore,
NewTestRestore("foo", "bar", api.RestorePhaseCompleted).WithBackup("backup-1").WithRestorableNamespace("*").Restore,
},
expectedRestorerCall: NewTestRestore("foo", "bar", api.RestorePhaseInProgress).WithBackup("backup-1").WithRestorableNamespace("*").Restore,
},
}
// flag.Set("logtostderr", "true")
// flag.Set("v", "4")
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
restorer = &fakeRestorer{}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
backupSvc = &fakeBackupService{}
)
c := NewRestoreController(
sharedInformers.Ark().V1().Restores(),
client.ArkV1(),
client.ArkV1(),
restorer,
backupSvc,
"bucket",
sharedInformers.Ark().V1().Backups(),
).(*restoreController)
if test.restore != nil {
sharedInformers.Ark().V1().Restores().Informer().GetStore().Add(test.restore)
// this is necessary so the Update() call returns the appropriate object
client.PrependReactor("update", "restores", func(action core.Action) (bool, runtime.Object, error) {
obj := action.(core.UpdateAction).GetObject()
// need to deep copy so we can test the backup state for each call to update
copy, err := scheme.Scheme.DeepCopy(obj)
if err != nil {
return false, nil, err
}
ret := copy.(runtime.Object)
return true, ret, nil
})
}
if test.backup != nil {
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup)
}
var warnings, errors api.RestoreResult
if test.restorerError != nil {
errors.Namespaces = map[string][]string{"ns-1": {test.restorerError.Error()}}
}
restorer.On("Restore", mock.Anything, mock.Anything, mock.Anything).Return(warnings, errors)
var (
key = test.restoreKey
err error
)
if key == "" && test.restore != nil {
key, err = cache.MetaNamespaceKeyFunc(test.restore)
if err != nil {
panic(err)
}
}
err = c.processRestore(key)
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
if test.expectedRestoreUpdates != nil {
var expectedActions []core.Action
for _, upd := range test.expectedRestoreUpdates {
action := core.NewUpdateAction(
api.SchemeGroupVersion.WithResource("restores"),
upd.Namespace,
upd)
expectedActions = append(expectedActions, action)
}
assert.Equal(t, expectedActions, client.Actions())
}
if test.expectedRestorerCall == nil {
assert.Empty(t, restorer.Calls)
assert.Zero(t, restorer.calledWithArg)
} else {
assert.Equal(t, 1, len(restorer.Calls))
// explicitly capturing the argument passed to Restore myself because
// I want to validate the called arg as of the time of calling, but
// the mock stores the pointer, which gets modified after
assert.Equal(t, *test.expectedRestorerCall, restorer.calledWithArg)
}
})
}
}
type fakeRestorer struct {
mock.Mock
calledWithArg api.Restore
}
func (r *fakeRestorer) Restore(restore *api.Restore, backup *api.Backup, backupReader io.Reader) (api.RestoreResult, api.RestoreResult) {
res := r.Called(restore, backup, backupReader)
r.calledWithArg = *restore
return res.Get(0).(api.RestoreResult), res.Get(1).(api.RestoreResult)
}

View File

@@ -0,0 +1,381 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/golang/glog"
"github.com/robfig/cron"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/scheme"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
)
type scheduleController struct {
schedulesClient arkv1client.SchedulesGetter
backupsClient arkv1client.BackupsGetter
schedulesLister listers.ScheduleLister
schedulesListerSynced cache.InformerSynced
syncHandler func(scheduleName string) error
queue workqueue.RateLimitingInterface
syncPeriod time.Duration
clock clock.Clock
}
func NewScheduleController(
schedulesClient arkv1client.SchedulesGetter,
backupsClient arkv1client.BackupsGetter,
schedulesInformer informers.ScheduleInformer,
syncPeriod time.Duration,
) *scheduleController {
if syncPeriod < time.Minute {
glog.Infof("Schedule sync period %v is too short. Setting to 1 minute", syncPeriod)
syncPeriod = time.Minute
}
c := &scheduleController{
schedulesClient: schedulesClient,
backupsClient: backupsClient,
schedulesLister: schedulesInformer.Lister(),
schedulesListerSynced: schedulesInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "schedule"),
syncPeriod: syncPeriod,
clock: clock.RealClock{},
}
c.syncHandler = c.processSchedule
schedulesInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
schedule := obj.(*api.Schedule)
switch schedule.Status.Phase {
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
// add to work queue
default:
glog.V(4).Infof("Schedule %s/%s has phase %s - skipping", schedule.Namespace, schedule.Name, schedule.Status.Phase)
return
}
key, err := cache.MetaNamespaceKeyFunc(schedule)
if err != nil {
glog.Errorf("error creating queue key for %#v: %v", schedule, err)
return
}
c.queue.Add(key)
},
},
)
return c
}
// Run is a blocking function that runs the specified number of worker goroutines
// to process items in the work queue. It will return when it receives on the
// ctx.Done() channel.
func (controller *scheduleController) Run(ctx context.Context, numWorkers int) error {
var wg sync.WaitGroup
defer func() {
glog.Infof("Waiting for workers to finish their work")
controller.queue.ShutDown()
// We have to wait here in the deferred function instead of at the bottom of the function body
// because we have to shut down the queue in order for the workers to shut down gracefully, and
// we want to shut down the queue via defer and not at the end of the body.
wg.Wait()
glog.Infof("All workers have finished")
}()
glog.Info("Starting ScheduleController")
defer glog.Info("Shutting down ScheduleController")
glog.Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), controller.schedulesListerSynced) {
return errors.New("timed out waiting for caches to sync")
}
glog.Info("Caches are synced")
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
wait.Until(controller.runWorker, time.Second, ctx.Done())
wg.Done()
}()
}
go wait.Until(controller.enqueueAllEnabledSchedules, controller.syncPeriod, ctx.Done())
<-ctx.Done()
return nil
}
func (controller *scheduleController) enqueueAllEnabledSchedules() {
schedules, err := controller.schedulesLister.Schedules(api.DefaultNamespace).List(labels.NewSelector())
if err != nil {
glog.Errorf("error listing schedules: %v", err)
return
}
for _, schedule := range schedules {
if schedule.Status.Phase != api.SchedulePhaseEnabled {
continue
}
key, err := cache.MetaNamespaceKeyFunc(schedule)
if err != nil {
glog.Errorf("error creating queue key for %#v: %v", schedule, err)
continue
}
controller.queue.Add(key)
}
}
func (controller *scheduleController) runWorker() {
// continually take items off the queue (waits if it's
// empty) until we get a shutdown signal from the queue
for controller.processNextWorkItem() {
}
}
func (controller *scheduleController) processNextWorkItem() bool {
key, quit := controller.queue.Get()
if quit {
return false
}
// always call done on this item, since if it fails we'll add
// it back with rate-limiting below
defer controller.queue.Done(key)
err := controller.syncHandler(key.(string))
if err == nil {
// If you had no error, tell the queue to stop tracking history for your key. This will reset
// things like failure counts for per-item rate limiting.
controller.queue.Forget(key)
return true
}
glog.Errorf("syncHandler error: %v", err)
// we had an error processing the item so add it back
// into the queue for re-processing with rate-limiting
controller.queue.AddRateLimited(key)
return true
}
func (controller *scheduleController) processSchedule(key string) error {
glog.V(4).Infof("processSchedule for key %q", key)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error splitting key %q: %v", key, err)
return err
}
glog.V(4).Infof("Getting schedule %s", key)
schedule, err := controller.schedulesLister.Schedules(ns).Get(name)
if err != nil {
// schedule no longer exists
if apierrors.IsNotFound(err) {
glog.V(4).Infof("schedule %s not found: %v", key, err)
return nil
}
glog.V(4).Infof("error getting schedule %s: %v", key, err)
return err
}
switch schedule.Status.Phase {
case "", api.SchedulePhaseNew, api.SchedulePhaseEnabled:
// valid phase for processing
default:
return nil
}
glog.V(4).Infof("Cloning schedule %s", key)
// don't modify items in the cache
schedule, err = cloneSchedule(schedule)
if err != nil {
glog.V(4).Infof("error cloning schedule %s: %v", key, err)
return err
}
// validation - even if the item is Enabled, we can't trust it
// so re-validate
currentPhase := schedule.Status.Phase
cronSchedule, errs := parseCronSchedule(schedule)
if len(errs) > 0 {
schedule.Status.Phase = api.SchedulePhaseFailedValidation
schedule.Status.ValidationErrors = errs
} else {
schedule.Status.Phase = api.SchedulePhaseEnabled
}
// update status if it's changed
if currentPhase != schedule.Status.Phase {
updatedSchedule, err := controller.schedulesClient.Schedules(ns).Update(schedule)
if err != nil {
glog.V(4).Infof("error updating status to %s: %v", schedule.Status.Phase, err)
return err
}
schedule = updatedSchedule
}
if schedule.Status.Phase != api.SchedulePhaseEnabled {
return nil
}
// check for the schedule being due to run, and submit a Backup if so
if err := controller.submitBackupIfDue(schedule, cronSchedule); err != nil {
glog.V(4).Infof("error processing Schedule %v/%v: err=%v", schedule.Namespace, schedule.Name, err)
return err
}
return nil
}
func cloneSchedule(in interface{}) (*api.Schedule, error) {
clone, err := scheme.Scheme.DeepCopy(in)
if err != nil {
return nil, err
}
out, ok := clone.(*api.Schedule)
if !ok {
return nil, fmt.Errorf("unexpected type: %T", clone)
}
return out, nil
}
func parseCronSchedule(itm *api.Schedule) (cron.Schedule, []string) {
var validationErrors []string
var schedule cron.Schedule
// cron.Parse panics if schedule is empty
if len(itm.Spec.Schedule) == 0 {
validationErrors = append(validationErrors, "Schedule must be a non-empty valid Cron expression")
return nil, validationErrors
}
// adding a recover() around cron.Parse because it panics on empty string and is possible
// that it panics under other scenarios as well.
func() {
defer func() {
if r := recover(); r != nil {
glog.V(4).Infof("panic parsing schedule %v/%v, cron schedule=%v: %v", itm.Namespace, itm.Name, itm.Spec.Schedule, r)
validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", r))
}
}()
if res, err := cron.Parse(itm.Spec.Schedule); err != nil {
glog.V(4).Infof("error parsing schedule %v/%v, cron schedule=%v: %v", itm.Namespace, itm.Name, itm.Spec.Schedule, err)
validationErrors = append(validationErrors, fmt.Sprintf("invalid schedule: %v", err))
} else {
schedule = res
}
}()
if len(validationErrors) > 0 {
return nil, validationErrors
}
return schedule, nil
}
func (controller *scheduleController) submitBackupIfDue(item *api.Schedule, cronSchedule cron.Schedule) error {
now := controller.clock.Now()
isDue, nextRunTime := getNextRunTime(item, cronSchedule, now)
if !isDue {
glog.Infof("Next run time for %v/%v is %v, skipping...", item.Namespace, item.Name, nextRunTime)
return nil
}
// Don't attempt to "catch up" if there are any missed or failed runs - simply
// trigger a Backup if it's time.
//
// It might also make sense in the future to explicitly check for currently-running
// backups so that we don't overlap runs (for disk snapshots in particular, this can
// lead to performance issues).
glog.Infof("Next run time for %v/%v is %v, submitting Backup...", item.Namespace, item.Name, nextRunTime)
backup := getBackup(item, now)
if _, err := controller.backupsClient.Backups(backup.Namespace).Create(backup); err != nil {
glog.V(4).Infof("error creating Backup: %v", err)
return err
}
schedule, err := cloneSchedule(item)
if err != nil {
glog.V(4).Infof("error cloning Schedule %v/%v: %v", item.Namespace, item.Name, err)
return err
}
schedule.Status.LastBackup = metav1.NewTime(now)
if _, err := controller.schedulesClient.Schedules(schedule.Namespace).Update(schedule); err != nil {
glog.V(4).Infof("error updating LastBackup for Schedule %v/%v: %v", schedule.Namespace, schedule.Name, err)
return err
}
return nil
}
func getNextRunTime(schedule *api.Schedule, cronSchedule cron.Schedule, asOf time.Time) (bool, time.Time) {
// get the latest run time (if the schedule hasn't run yet, this will be the zero value which will trigger
// an immediate backup)
lastBackupTime := schedule.Status.LastBackup.Time
nextRunTime := cronSchedule.Next(lastBackupTime)
return asOf.After(nextRunTime), nextRunTime
}
func getBackup(item *api.Schedule, timestamp time.Time) *api.Backup {
backup := &api.Backup{
Spec: item.Spec.Template,
ObjectMeta: metav1.ObjectMeta{
Namespace: item.Namespace,
Name: fmt.Sprintf("%s-%s", item.Name, timestamp.Format("20060102150405")),
Labels: map[string]string{
"ark-schedule": item.Name,
},
},
}
return backup
}

View File

@@ -0,0 +1,372 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"testing"
"time"
"github.com/robfig/cron"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/kubernetes/scheme"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
. "github.com/heptio/ark/pkg/util/test"
)
func TestProcessSchedule(t *testing.T) {
tests := []struct {
name string
scheduleKey string
schedule *api.Schedule
fakeClockTime string
expectedErr bool
expectedSchedulePhaseUpdate *api.Schedule
expectedScheduleLastBackupUpdate *api.Schedule
expectedBackupCreate *api.Backup
}{
{
name: "invalid key returns error",
scheduleKey: "invalid/key/value",
expectedErr: true,
},
{
name: "missing schedule returns early without an error",
scheduleKey: "foo/bar",
expectedErr: false,
},
{
name: "schedule with phase FailedValidation does not get processed",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseFailedValidation).Schedule,
expectedErr: false,
},
{
name: "schedule with phase New gets validated and failed if invalid",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseNew).Schedule,
expectedErr: false,
expectedSchedulePhaseUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseFailedValidation).
WithValidationError("Schedule must be a non-empty valid Cron expression").Schedule,
},
{
name: "schedule with phase <blank> gets validated and failed if invalid",
schedule: NewTestSchedule("ns", "name").Schedule,
expectedErr: false,
expectedSchedulePhaseUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseFailedValidation).
WithValidationError("Schedule must be a non-empty valid Cron expression").Schedule,
},
{
name: "schedule with phase Enabled gets re-validated and failed if invalid",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).Schedule,
expectedErr: false,
expectedSchedulePhaseUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseFailedValidation).
WithValidationError("Schedule must be a non-empty valid Cron expression").Schedule,
},
{
name: "schedule with phase New gets validated and triggers a backup",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseNew).WithCronSchedule("@every 5m").Schedule,
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedSchedulePhaseUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).WithCronSchedule("@every 5m").Schedule,
expectedBackupCreate: NewTestBackup().WithNamespace("ns").WithName("name-20170101120000").WithLabel("ark-schedule", "name").Backup,
expectedScheduleLastBackupUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).
WithCronSchedule("@every 5m").WithLastBackupTime("2017-01-01 12:00:00").Schedule,
},
{
name: "schedule with phase Enabled gets re-validated and triggers a backup if valid",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).WithCronSchedule("@every 5m").Schedule,
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedBackupCreate: NewTestBackup().WithNamespace("ns").WithName("name-20170101120000").WithLabel("ark-schedule", "name").Backup,
expectedScheduleLastBackupUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).
WithCronSchedule("@every 5m").WithLastBackupTime("2017-01-01 12:00:00").Schedule,
},
{
name: "schedule that's already run gets LastBackup updated",
schedule: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).
WithCronSchedule("@every 5m").WithLastBackupTime("2000-01-01 00:00:00").Schedule,
fakeClockTime: "2017-01-01 12:00:00",
expectedErr: false,
expectedBackupCreate: NewTestBackup().WithNamespace("ns").WithName("name-20170101120000").WithLabel("ark-schedule", "name").Backup,
expectedScheduleLastBackupUpdate: NewTestSchedule("ns", "name").WithPhase(api.SchedulePhaseEnabled).
WithCronSchedule("@every 5m").WithLastBackupTime("2017-01-01 12:00:00").Schedule,
},
}
// flag.Set("logtostderr", "true")
// flag.Set("v", "4")
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
sharedInformers = informers.NewSharedInformerFactory(client, 0)
)
c := NewScheduleController(
client.ArkV1(),
client.ArkV1(),
sharedInformers.Ark().V1().Schedules(),
time.Duration(0),
)
var (
testTime time.Time
err error
)
if test.fakeClockTime != "" {
testTime, err = time.Parse("2006-01-02 15:04:05", test.fakeClockTime)
require.NoErrorf(t, err, "unable to parse test.fakeClockTime: %v", err)
}
c.clock = clock.NewFakeClock(testTime)
if test.schedule != nil {
sharedInformers.Ark().V1().Schedules().Informer().GetStore().Add(test.schedule)
// this is necessary so the Update() call returns the appropriate object
client.PrependReactor("update", "schedules", func(action core.Action) (bool, runtime.Object, error) {
obj := action.(core.UpdateAction).GetObject()
// need to deep copy so we can test the schedule state for each call to update
copy, err := scheme.Scheme.DeepCopy(obj)
if err != nil {
return false, nil, err
}
ret := copy.(runtime.Object)
return true, ret, nil
})
}
key := test.scheduleKey
if key == "" && test.schedule != nil {
key, err = cache.MetaNamespaceKeyFunc(test.schedule)
require.NoErrorf(t, err, "error getting key from test.schedule: %v", err)
}
err = c.processSchedule(key)
assert.Equal(t, test.expectedErr, err != nil, "got error %v", err)
expectedActions := make([]core.Action, 0)
if upd := test.expectedSchedulePhaseUpdate; upd != nil {
action := core.NewUpdateAction(
api.SchemeGroupVersion.WithResource("schedules"),
upd.Namespace,
upd)
expectedActions = append(expectedActions, action)
}
if created := test.expectedBackupCreate; created != nil {
action := core.NewCreateAction(
api.SchemeGroupVersion.WithResource("backups"),
created.Namespace,
created)
expectedActions = append(expectedActions, action)
}
if upd := test.expectedScheduleLastBackupUpdate; upd != nil {
action := core.NewUpdateAction(
api.SchemeGroupVersion.WithResource("schedules"),
upd.Namespace,
upd)
expectedActions = append(expectedActions, action)
}
assert.Equal(t, expectedActions, client.Actions())
})
}
}
func TestGetNextRunTime(t *testing.T) {
tests := []struct {
name string
schedule *api.Schedule
lastRanOffset string
expectedDue bool
expectedNextRunTimeOffset string
}{
{
name: "first run",
schedule: &api.Schedule{Spec: api.ScheduleSpec{Schedule: "@every 5m"}},
expectedDue: true,
expectedNextRunTimeOffset: "5m",
},
{
name: "just ran",
schedule: &api.Schedule{Spec: api.ScheduleSpec{Schedule: "@every 5m"}},
lastRanOffset: "0s",
expectedDue: false,
expectedNextRunTimeOffset: "5m",
},
{
name: "almost but not quite time to run",
schedule: &api.Schedule{Spec: api.ScheduleSpec{Schedule: "@every 5m"}},
lastRanOffset: "4m59s",
expectedDue: false,
expectedNextRunTimeOffset: "5m",
},
{
name: "time to run again",
schedule: &api.Schedule{Spec: api.ScheduleSpec{Schedule: "@every 5m"}},
lastRanOffset: "5m",
expectedDue: true,
expectedNextRunTimeOffset: "5m",
},
{
name: "several runs missed",
schedule: &api.Schedule{Spec: api.ScheduleSpec{Schedule: "@every 5m"}},
lastRanOffset: "5h",
expectedDue: true,
expectedNextRunTimeOffset: "5m",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cronSchedule, err := cron.Parse(test.schedule.Spec.Schedule)
require.NoErrorf(t, err, "unable to parse test.schedule.Spec.Schedule: %v", err)
testClock := clock.NewFakeClock(time.Now())
if test.lastRanOffset != "" {
offsetDuration, err := time.ParseDuration(test.lastRanOffset)
require.NoErrorf(t, err, "unable to parse test.lastRanOffset: %v", err)
test.schedule.Status.LastBackup = metav1.Time{Time: testClock.Now().Add(-offsetDuration)}
}
nextRunTimeOffset, err := time.ParseDuration(test.expectedNextRunTimeOffset)
if err != nil {
panic(err)
}
expectedNextRunTime := test.schedule.Status.LastBackup.Add(nextRunTimeOffset)
due, nextRunTime := getNextRunTime(test.schedule, cronSchedule, testClock.Now())
assert.Equal(t, test.expectedDue, due)
// ignore diffs of under a second. the cron library does some rounding.
assert.WithinDuration(t, expectedNextRunTime, nextRunTime, time.Second)
})
}
}
func TestGetBackup(t *testing.T) {
tests := []struct {
name string
schedule *api.Schedule
testClockTime string
expectedBackup *api.Backup
}{
{
name: "ensure name is formatted correctly (AM time)",
schedule: &api.Schedule{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: api.ScheduleSpec{
Template: api.BackupSpec{},
},
},
testClockTime: "2017-07-25 09:15:00",
expectedBackup: &api.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar-20170725091500",
},
Spec: api.BackupSpec{},
},
},
{
name: "ensure name is formatted correctly (PM time)",
schedule: &api.Schedule{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: api.ScheduleSpec{
Template: api.BackupSpec{},
},
},
testClockTime: "2017-07-25 14:15:00",
expectedBackup: &api.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar-20170725141500",
},
Spec: api.BackupSpec{},
},
},
{
name: "ensure schedule backup template is copied",
schedule: &api.Schedule{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: api.ScheduleSpec{
Template: api.BackupSpec{
IncludedNamespaces: []string{"ns-1", "ns-2"},
ExcludedNamespaces: []string{"ns-3"},
IncludedResources: []string{"foo", "bar"},
ExcludedResources: []string{"baz"},
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
SnapshotVolumes: true,
TTL: metav1.Duration{Duration: time.Duration(300)},
},
},
},
testClockTime: "2017-07-25 09:15:00",
expectedBackup: &api.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar-20170725091500",
},
Spec: api.BackupSpec{
IncludedNamespaces: []string{"ns-1", "ns-2"},
ExcludedNamespaces: []string{"ns-3"},
IncludedResources: []string{"foo", "bar"},
ExcludedResources: []string{"baz"},
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
SnapshotVolumes: true,
TTL: metav1.Duration{Duration: time.Duration(300)},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testTime, err := time.Parse("2006-01-02 15:04:05", test.testClockTime)
require.NoErrorf(t, err, "unable to parse test.testClockTime: %v", err)
backup := getBackup(test.schedule, clock.NewFakeClock(testTime).Now())
assert.Equal(t, test.expectedBackup.Namespace, backup.Namespace)
assert.Equal(t, test.expectedBackup.Name, backup.Name)
assert.Equal(t, test.expectedBackup.Spec, backup.Spec)
})
}
}