update backup code to work with volume snapshot locations

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2018-09-26 16:18:45 -06:00
parent 4af89fa863
commit df07b7dc9f
13 changed files with 763 additions and 986 deletions

View File

@@ -46,7 +46,7 @@ import (
type Backupper interface {
// Backup takes a backup using the specification in the api.Backup and writes backup and log data
// to the given writers.
Backup(logger logrus.FieldLogger, backup *api.Backup, backupFile io.Writer, actions []ItemAction) error
Backup(logger logrus.FieldLogger, backup *Request, backupFile io.Writer, actions []ItemAction, blockStoreGetter BlockStoreGetter) error
}
// kubernetesBackupper implements Backupper.
@@ -55,7 +55,6 @@ type kubernetesBackupper struct {
discoveryHelper discovery.Helper
podCommandExecutor podexec.PodCommandExecutor
groupBackupperFactory groupBackupperFactory
blockStore cloudprovider.BlockStore
resticBackupperFactory restic.BackupperFactory
resticTimeout time.Duration
}
@@ -93,7 +92,6 @@ func NewKubernetesBackupper(
discoveryHelper discovery.Helper,
dynamicFactory client.DynamicFactory,
podCommandExecutor podexec.PodCommandExecutor,
blockStore cloudprovider.BlockStore,
resticBackupperFactory restic.BackupperFactory,
resticTimeout time.Duration,
) (Backupper, error) {
@@ -102,7 +100,6 @@ func NewKubernetesBackupper(
dynamicFactory: dynamicFactory,
podCommandExecutor: podCommandExecutor,
groupBackupperFactory: &defaultGroupBackupperFactory{},
blockStore: blockStore,
resticBackupperFactory: resticBackupperFactory,
resticTimeout: resticTimeout,
}, nil
@@ -209,41 +206,43 @@ func getResourceHook(hookSpec api.BackupResourceHookSpec, discoveryHelper discov
return h, nil
}
type BlockStoreGetter interface {
GetBlockStore(name string) (cloudprovider.BlockStore, error)
}
// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
// written to backupFile. The finalized api.Backup is written to metadata.
func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backup *api.Backup, backupFile io.Writer, actions []ItemAction) error {
func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backupRequest *Request, backupFile io.Writer, actions []ItemAction, blockStoreGetter BlockStoreGetter) error {
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()
tw := tar.NewWriter(gzippedData)
defer tw.Close()
log := logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log := logger.WithField("backup", kubeutil.NamespaceAndName(backupRequest))
log.Info("Starting backup")
namespaceIncludesExcludes := getNamespaceIncludesExcludes(backup)
log.Infof("Including namespaces: %s", namespaceIncludesExcludes.IncludesString())
log.Infof("Excluding namespaces: %s", namespaceIncludesExcludes.ExcludesString())
backupRequest.NamespaceIncludesExcludes = getNamespaceIncludesExcludes(backupRequest.Backup)
log.Infof("Including namespaces: %s", backupRequest.NamespaceIncludesExcludes.IncludesString())
log.Infof("Excluding namespaces: %s", backupRequest.NamespaceIncludesExcludes.ExcludesString())
resourceIncludesExcludes := getResourceIncludesExcludes(kb.discoveryHelper, backup.Spec.IncludedResources, backup.Spec.ExcludedResources)
log.Infof("Including resources: %s", resourceIncludesExcludes.IncludesString())
log.Infof("Excluding resources: %s", resourceIncludesExcludes.ExcludesString())
backupRequest.ResourceIncludesExcludes = getResourceIncludesExcludes(kb.discoveryHelper, backupRequest.Spec.IncludedResources, backupRequest.Spec.ExcludedResources)
log.Infof("Including resources: %s", backupRequest.ResourceIncludesExcludes.IncludesString())
log.Infof("Excluding resources: %s", backupRequest.ResourceIncludesExcludes.ExcludesString())
resourceHooks, err := getResourceHooks(backup.Spec.Hooks.Resources, kb.discoveryHelper)
var err error
backupRequest.ResourceHooks, err = getResourceHooks(backupRequest.Spec.Hooks.Resources, kb.discoveryHelper)
if err != nil {
return err
}
backedUpItems := make(map[itemKey]struct{})
var errs []error
resolvedActions, err := resolveActions(actions, kb.discoveryHelper)
backupRequest.ResolvedActions, err = resolveActions(actions, kb.discoveryHelper)
if err != nil {
return err
}
podVolumeTimeout := kb.resticTimeout
if val := backup.Annotations[api.PodVolumeOperationTimeoutAnnotation]; val != "" {
if val := backupRequest.Annotations[api.PodVolumeOperationTimeoutAnnotation]; val != "" {
parsed, err := time.ParseDuration(val)
if err != nil {
log.WithError(errors.WithStack(err)).Errorf("Unable to parse pod volume timeout annotation %s, using server value.", val)
@@ -257,7 +256,7 @@ func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backup *api.Bac
var resticBackupper restic.Backupper
if kb.resticBackupperFactory != nil {
resticBackupper, err = kb.resticBackupperFactory.NewBackupper(ctx, backup)
resticBackupper, err = kb.resticBackupperFactory.NewBackupper(ctx, backupRequest.Backup)
if err != nil {
return errors.WithStack(err)
}
@@ -265,22 +264,19 @@ func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backup *api.Bac
gb := kb.groupBackupperFactory.newGroupBackupper(
log,
backup,
namespaceIncludesExcludes,
resourceIncludesExcludes,
backupRequest,
kb.dynamicFactory,
kb.discoveryHelper,
backedUpItems,
make(map[itemKey]struct{}),
cohabitatingResources(),
resolvedActions,
kb.podCommandExecutor,
tw,
resourceHooks,
kb.blockStore,
resticBackupper,
newPVCSnapshotTracker(),
blockStoreGetter,
)
var errs []error
for _, group := range kb.discoveryHelper.Resources() {
if err := gb.backupGroup(group); err != nil {
errs = append(errs, err)

View File

@@ -38,7 +38,6 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/podexec"
"github.com/heptio/ark/pkg/restic"
@@ -372,17 +371,16 @@ func parseLabelSelectorOrDie(s string) labels.Selector {
func TestBackup(t *testing.T) {
tests := []struct {
name string
backup *v1.Backup
expectedNamespaces *collections.IncludesExcludes
expectedResources *collections.IncludesExcludes
expectedLabelSelector string
expectedHooks []resourceHook
backupGroupErrors map[*metav1.APIResourceList]error
expectedError error
name string
backup *v1.Backup
expectedNamespaces *collections.IncludesExcludes
expectedResources *collections.IncludesExcludes
expectedHooks []resourceHook
backupGroupErrors map[*metav1.APIResourceList]error
expectedError error
}{
{
name: "happy path, no actions, no label selector, no hooks, no errors",
name: "happy path, no actions, no hooks, no errors",
backup: &v1.Backup{
Spec: v1.BackupSpec{
// cm - shortcut in legacy api group
@@ -402,25 +400,6 @@ func TestBackup(t *testing.T) {
rbacGroup: nil,
},
},
{
name: "label selector",
backup: &v1.Backup{
Spec: v1.BackupSpec{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"a": "b"},
},
},
},
expectedNamespaces: collections.NewIncludesExcludes(),
expectedResources: collections.NewIncludesExcludes(),
expectedHooks: []resourceHook{},
expectedLabelSelector: "a=b",
backupGroupErrors: map[*metav1.APIResourceList]error{
v1Group: nil,
certificatesGroup: nil,
rbacGroup: nil,
},
},
{
name: "backupGroup errors",
backup: &v1.Backup{},
@@ -488,6 +467,10 @@ func TestBackup(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req := &Request{
Backup: test.backup,
}
discoveryHelper := &arktest.FakeDiscoveryHelper{
Mapper: &arktest.FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{
@@ -503,77 +486,66 @@ func TestBackup(t *testing.T) {
},
}
dynamicFactory := &arktest.FakeDynamicFactory{}
dynamicFactory := new(arktest.FakeDynamicFactory)
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
b, err := NewKubernetesBackupper(
discoveryHelper,
dynamicFactory,
podCommandExecutor,
nil,
nil, // restic backupper factory
0, // restic timeout
)
require.NoError(t, err)
kb := b.(*kubernetesBackupper)
groupBackupperFactory := &mockGroupBackupperFactory{}
defer groupBackupperFactory.AssertExpectations(t)
kb.groupBackupperFactory = groupBackupperFactory
groupBackupper := &mockGroupBackupper{}
defer groupBackupper.AssertExpectations(t)
groupBackupperFactory.On("newGroupBackupper",
mock.Anything, // log
test.backup,
test.expectedNamespaces,
test.expectedResources,
req,
dynamicFactory,
discoveryHelper,
map[itemKey]struct{}{}, // backedUpItems
cohabitatingResources(),
mock.Anything,
kb.podCommandExecutor,
podCommandExecutor,
mock.Anything, // tarWriter
test.expectedHooks,
mock.Anything,
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
mock.Anything, // block store getter
).Return(groupBackupper)
for group, err := range test.backupGroupErrors {
groupBackupper.On("backupGroup", group).Return(err)
}
var backupFile bytes.Buffer
kb := &kubernetesBackupper{
discoveryHelper: discoveryHelper,
dynamicFactory: dynamicFactory,
podCommandExecutor: podCommandExecutor,
groupBackupperFactory: groupBackupperFactory,
}
err = b.Backup(logging.DefaultLogger(logrus.DebugLevel), test.backup, &backupFile, nil)
err := kb.Backup(logging.DefaultLogger(logrus.DebugLevel), req, new(bytes.Buffer), nil, nil)
assert.Equal(t, test.expectedNamespaces, req.NamespaceIncludesExcludes)
assert.Equal(t, test.expectedResources, req.ResourceIncludesExcludes)
assert.Equal(t, test.expectedHooks, req.ResourceHooks)
if test.expectedError != nil {
assert.EqualError(t, err, test.expectedError.Error())
return
}
assert.NoError(t, err)
})
}
}
func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
discoveryHelper := &arktest.FakeDiscoveryHelper{
Mapper: &arktest.FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{},
},
groupBackupperFactory := &mockGroupBackupperFactory{}
kb := &kubernetesBackupper{
discoveryHelper: new(arktest.FakeDiscoveryHelper),
groupBackupperFactory: groupBackupperFactory,
}
b, err := NewKubernetesBackupper(discoveryHelper, nil, nil, nil, nil, 0)
require.NoError(t, err)
kb := b.(*kubernetesBackupper)
groupBackupperFactory := &mockGroupBackupperFactory{}
kb.groupBackupperFactory = groupBackupperFactory
defer groupBackupperFactory.AssertExpectations(t)
// assert that newGroupBackupper() is called with the result of cohabitatingResources()
// passed as an argument.
@@ -582,9 +554,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
discoveryHelper,
kb.discoveryHelper,
mock.Anything,
firstCohabitatingResources,
mock.Anything,
@@ -592,12 +562,9 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&mockGroupBackupper{})
assert.NoError(t, b.Backup(arktest.NewLogger(), &v1.Backup{}, &bytes.Buffer{}, nil))
groupBackupperFactory.AssertExpectations(t)
assert.NoError(t, kb.Backup(arktest.NewLogger(), &Request{Backup: &v1.Backup{}}, &bytes.Buffer{}, nil, nil))
// mutate the cohabitatingResources map that was used in the first backup to simulate
// the first backup process having done so.
@@ -614,9 +581,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
discoveryHelper,
kb.discoveryHelper,
mock.Anything,
secondCohabitatingResources,
mock.Anything,
@@ -624,16 +589,13 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&mockGroupBackupper{})
assert.NoError(t, b.Backup(arktest.NewLogger(), &v1.Backup{}, &bytes.Buffer{}, nil))
assert.NoError(t, kb.Backup(arktest.NewLogger(), &Request{Backup: new(v1.Backup)}, new(bytes.Buffer), nil, nil))
assert.NotEqual(t, firstCohabitatingResources, secondCohabitatingResources)
for _, resource := range secondCohabitatingResources {
assert.False(t, resource.seen)
}
groupBackupperFactory.AssertExpectations(t)
}
type mockGroupBackupperFactory struct {
@@ -642,36 +604,29 @@ type mockGroupBackupperFactory struct {
func (f *mockGroupBackupperFactory) newGroupBackupper(
log logrus.FieldLogger,
backup *v1.Backup,
namespaces, resources *collections.IncludesExcludes,
backup *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) groupBackupper {
args := f.Called(
log,
backup,
namespaces,
resources,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
blockStore,
resticBackupper,
resticSnapshotTracker,
blockStoreGetter,
)
return args.Get(0).(groupBackupper)
}

View File

@@ -27,31 +27,25 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/podexec"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
)
type groupBackupperFactory interface {
newGroupBackupper(
log logrus.FieldLogger,
backup *v1.Backup,
namespaces, resources *collections.IncludesExcludes,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) groupBackupper
}
@@ -59,36 +53,30 @@ type defaultGroupBackupperFactory struct{}
func (f *defaultGroupBackupperFactory) newGroupBackupper(
log logrus.FieldLogger,
backup *v1.Backup,
namespaces, resources *collections.IncludesExcludes,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) groupBackupper {
return &defaultGroupBackupper{
log: log,
backup: backup,
namespaces: namespaces,
resources: resources,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
backedUpItems: backedUpItems,
cohabitatingResources: cohabitatingResources,
actions: actions,
podCommandExecutor: podCommandExecutor,
tarWriter: tarWriter,
resourceHooks: resourceHooks,
blockStore: blockStore,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
log: log,
backupRequest: backupRequest,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
backedUpItems: backedUpItems,
cohabitatingResources: cohabitatingResources,
podCommandExecutor: podCommandExecutor,
tarWriter: tarWriter,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
blockStoreGetter: blockStoreGetter,
resourceBackupperFactory: &defaultResourceBackupperFactory{},
}
}
@@ -99,20 +87,17 @@ type groupBackupper interface {
type defaultGroupBackupper struct {
log logrus.FieldLogger
backup *v1.Backup
namespaces, resources *collections.IncludesExcludes
backupRequest *Request
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
backedUpItems map[itemKey]struct{}
cohabitatingResources map[string]*cohabitatingResource
actions []resolvedAction
podCommandExecutor podexec.PodCommandExecutor
tarWriter tarWriter
resourceHooks []resourceHook
blockStore cloudprovider.BlockStore
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
resourceBackupperFactory resourceBackupperFactory
blockStoreGetter BlockStoreGetter
}
// backupGroup backs up a single API group.
@@ -122,20 +107,16 @@ func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) erro
log = gb.log.WithField("group", group.GroupVersion)
rb = gb.resourceBackupperFactory.newResourceBackupper(
log,
gb.backup,
gb.namespaces,
gb.resources,
gb.backupRequest,
gb.dynamicFactory,
gb.discoveryHelper,
gb.backedUpItems,
gb.cohabitatingResources,
gb.actions,
gb.podCommandExecutor,
gb.tarWriter,
gb.resourceHooks,
gb.blockStore,
gb.resticBackupper,
gb.resticSnapshotTracker,
gb.blockStoreGetter,
)
)

View File

@@ -19,104 +19,44 @@ package backup
import (
"testing"
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/podexec"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func TestBackupGroup(t *testing.T) {
backup := &v1.Backup{}
func TestBackupGroupBacksUpCorrectResourcesInCorrectOrder(t *testing.T) {
resourceBackupperFactory := new(mockResourceBackupperFactory)
resourceBackupper := new(mockResourceBackupper)
namespaces := collections.NewIncludesExcludes().Includes("a")
resources := collections.NewIncludesExcludes().Includes("b")
dynamicFactory := &arktest.FakeDynamicFactory{}
defer dynamicFactory.AssertExpectations(t)
discoveryHelper := arktest.NewFakeDiscoveryHelper(true, nil)
backedUpItems := map[itemKey]struct{}{
{resource: "a", namespace: "b", name: "c"}: {},
}
cohabitatingResources := map[string]*cohabitatingResource{
"a": {
resource: "a",
groupResource1: schema.GroupResource{Group: "g1", Resource: "a"},
groupResource2: schema.GroupResource{Group: "g2", Resource: "a"},
},
}
actions := []resolvedAction{
{
ItemAction: newFakeAction("pods"),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"),
},
}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
tarWriter := &fakeTarWriter{}
resourceHooks := []resourceHook{
{name: "myhook"},
}
gb := (&defaultGroupBackupperFactory{}).newGroupBackupper(
arktest.NewLogger(),
backup,
namespaces,
resources,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
).(*defaultGroupBackupper)
resourceBackupperFactory := &mockResourceBackupperFactory{}
defer resourceBackupperFactory.AssertExpectations(t)
gb.resourceBackupperFactory = resourceBackupperFactory
resourceBackupper := &mockResourceBackupper{}
defer resourceBackupper.AssertExpectations(t)
resourceBackupperFactory.On("newResourceBackupper",
mock.Anything,
backup,
namespaces,
resources,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil,
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(resourceBackupper)
gb := &defaultGroupBackupper{
log: arktest.NewLogger(),
resourceBackupperFactory: resourceBackupperFactory,
}
group := &metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
@@ -126,9 +66,7 @@ func TestBackupGroup(t *testing.T) {
},
}
expectedOrder := []string{"pods", "persistentvolumeclaims", "persistentvolumes"}
var actualOrder []string
runFunc := func(args mock.Arguments) {
actualOrder = append(actualOrder, args.Get(1).(metav1.APIResource).Name)
}
@@ -137,11 +75,10 @@ func TestBackupGroup(t *testing.T) {
resourceBackupper.On("backupResource", group, metav1.APIResource{Name: "persistentvolumeclaims"}).Return(nil).Run(runFunc)
resourceBackupper.On("backupResource", group, metav1.APIResource{Name: "persistentvolumes"}).Return(nil).Run(runFunc)
err := gb.backupGroup(group)
require.NoError(t, err)
require.NoError(t, gb.backupGroup(group))
// make sure PVs were last
assert.Equal(t, expectedOrder, actualOrder)
assert.Equal(t, []string{"pods", "persistentvolumeclaims", "persistentvolumes"}, actualOrder)
}
type mockResourceBackupperFactory struct {
@@ -150,37 +87,29 @@ type mockResourceBackupperFactory struct {
func (rbf *mockResourceBackupperFactory) newResourceBackupper(
log logrus.FieldLogger,
backup *v1.Backup,
namespaces *collections.IncludesExcludes,
resources *collections.IncludesExcludes,
backup *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) resourceBackupper {
args := rbf.Called(
log,
backup,
namespaces,
resources,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
blockStore,
resticBackupper,
resticSnapshotTracker,
blockStoreGetter,
)
return args.Get(0).(resourceBackupper)
}

View File

@@ -40,58 +40,48 @@ import (
"github.com/heptio/ark/pkg/kuberesource"
"github.com/heptio/ark/pkg/podexec"
"github.com/heptio/ark/pkg/restic"
"github.com/heptio/ark/pkg/util/collections"
)
type itemBackupperFactory interface {
newItemBackupper(
backup *api.Backup,
namespaces, resources *collections.IncludesExcludes,
backup *Request,
backedUpItems map[itemKey]struct{},
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) ItemBackupper
}
type defaultItemBackupperFactory struct{}
func (f *defaultItemBackupperFactory) newItemBackupper(
backup *api.Backup,
namespaces, resources *collections.IncludesExcludes,
backupRequest *Request,
backedUpItems map[itemKey]struct{},
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) ItemBackupper {
ib := &defaultItemBackupper{
backup: backup,
namespaces: namespaces,
resources: resources,
backedUpItems: backedUpItems,
actions: actions,
tarWriter: tarWriter,
resourceHooks: resourceHooks,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
blockStore: blockStore,
backupRequest: backupRequest,
backedUpItems: backedUpItems,
tarWriter: tarWriter,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
blockStoreGetter: blockStoreGetter,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: podCommandExecutor,
},
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
}
// this is for testing purposes
@@ -105,21 +95,18 @@ type ItemBackupper interface {
}
type defaultItemBackupper struct {
backup *api.Backup
namespaces *collections.IncludesExcludes
resources *collections.IncludesExcludes
backupRequest *Request
backedUpItems map[itemKey]struct{}
actions []resolvedAction
tarWriter tarWriter
resourceHooks []resourceHook
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
blockStore cloudprovider.BlockStore
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
blockStoreGetter BlockStoreGetter
itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
snapshotLocationBlockStores map[string]cloudprovider.BlockStore
}
// backupItem backs up an individual item to tarWriter. The item may be excluded based on the
@@ -140,19 +127,19 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
// backupItem can be invoked by a custom action.
if namespace != "" && !ib.namespaces.ShouldInclude(namespace) {
if namespace != "" && !ib.backupRequest.NamespaceIncludesExcludes.ShouldInclude(namespace) {
log.Info("Excluding item because namespace is excluded")
return nil
}
// NOTE: we specifically allow namespaces to be backed up even if IncludeClusterResources is
// false.
if namespace == "" && groupResource != kuberesource.Namespaces && ib.backup.Spec.IncludeClusterResources != nil && !*ib.backup.Spec.IncludeClusterResources {
if namespace == "" && groupResource != kuberesource.Namespaces && ib.backupRequest.Spec.IncludeClusterResources != nil && !*ib.backupRequest.Spec.IncludeClusterResources {
log.Info("Excluding item because resource is cluster-scoped and backup.spec.includeClusterResources is false")
return nil
}
if !ib.resources.ShouldInclude(groupResource.String()) {
if !ib.backupRequest.ResourceIncludesExcludes.ShouldInclude(groupResource.String()) {
log.Info("Excluding item because resource is excluded")
return nil
}
@@ -176,7 +163,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
log.Info("Backing up resource")
log.Debug("Executing pre hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePre); err != nil {
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hookPhasePre); err != nil {
return err
}
@@ -210,7 +197,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
// if there was an error running actions, execute post hooks and return
log.Debug("Executing post hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePost); err != nil {
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hookPhasePost); err != nil {
backupErrs = append(backupErrs, err)
}
@@ -222,9 +209,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
}
if groupResource == kuberesource.PersistentVolumes {
if ib.blockStore == nil {
log.Debug("Skipping Persistent Volume snapshot because they're not enabled.")
} else if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil {
if err := ib.takePVSnapshot(obj, log); err != nil {
backupErrs = append(backupErrs, err)
}
}
@@ -243,7 +228,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
}
log.Debug("Executing post hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePost); err != nil {
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hookPhasePost); err != nil {
backupErrs = append(backupErrs, err)
}
@@ -294,7 +279,7 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co
return nil, nil
}
return ib.resticBackupper.BackupPodVolumes(ib.backup, pod, log)
return ib.resticBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, log)
}
func (ib *defaultItemBackupper) executeActions(
@@ -304,7 +289,7 @@ func (ib *defaultItemBackupper) executeActions(
name, namespace string,
metadata metav1.Object,
) (runtime.Unstructured, error) {
for _, action := range ib.actions {
for _, action := range ib.backupRequest.ResolvedActions {
if !action.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
log.Debug("Skipping action because it does not apply to this resource")
continue
@@ -322,7 +307,7 @@ func (ib *defaultItemBackupper) executeActions(
log.Info("Executing custom action")
updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup)
updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backupRequest.Backup)
if err != nil {
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
@@ -358,6 +343,30 @@ func (ib *defaultItemBackupper) executeActions(
return obj, nil
}
// blockStore instantiates and initializes a BlockStore given a VolumeSnapshotLocation,
// or returns an existing one if one's already been initialized for the location.
func (ib *defaultItemBackupper) blockStore(snapshotLocation *api.VolumeSnapshotLocation) (cloudprovider.BlockStore, error) {
if bs, ok := ib.snapshotLocationBlockStores[snapshotLocation.Name]; ok {
return bs, nil
}
bs, err := ib.blockStoreGetter.GetBlockStore(snapshotLocation.Spec.Provider)
if err != nil {
return nil, err
}
if err := bs.Init(snapshotLocation.Spec.Config); err != nil {
return nil, err
}
if ib.snapshotLocationBlockStores == nil {
ib.snapshotLocationBlockStores = make(map[string]cloudprovider.BlockStore)
}
ib.snapshotLocationBlockStores[snapshotLocation.Name] = bs
return bs, nil
}
// zoneLabel is the label that stores availability-zone info
// on PVs
const zoneLabel = "failure-domain.beta.kubernetes.io/zone"
@@ -365,10 +374,10 @@ const zoneLabel = "failure-domain.beta.kubernetes.io/zone"
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup *api.Backup, log logrus.FieldLogger) error {
func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.FieldLogger) error {
log.Info("Executing takePVSnapshot")
if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes {
if ib.backupRequest.Spec.SnapshotVolumes != nil && !*ib.backupRequest.Spec.SnapshotVolumes {
log.Info("Backup has volume snapshots disabled; skipping volume snapshot action.")
return nil
}
@@ -402,11 +411,38 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup
log.Infof("label %q is not present on PersistentVolume", zoneLabel)
}
volumeID, err := ib.blockStore.GetVolumeID(obj)
if err != nil {
return errors.Wrapf(err, "error getting volume ID for PersistentVolume")
var (
volumeID string
blockStore cloudprovider.BlockStore
)
for _, snapshotLocation := range ib.backupRequest.SnapshotLocations {
bs, err := ib.blockStore(snapshotLocation)
if err != nil {
log.WithError(err).WithField("volumeSnapshotLocation", snapshotLocation).Error("Error getting block store for volume snapshot location")
continue
}
log := log.WithFields(map[string]interface{}{
"volumeSnapshotLocation": snapshotLocation.Name,
"persistentVolume": metadata.GetName(),
})
if volumeID, err = bs.GetVolumeID(obj); err != nil {
log.WithError(err).Errorf("Error attempting to get volume ID for persistent volume")
continue
}
if volumeID == "" {
log.Infof("No volume ID returned by block store for persistent volume")
continue
}
log.Infof("Got volume ID for persistent volume")
blockStore = bs
break
}
if volumeID == "" {
if blockStore == nil {
log.Info("PersistentVolume is not a supported volume type for snapshots, skipping.")
return nil
}
@@ -414,29 +450,29 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, backup
log = log.WithField("volumeID", volumeID)
tags := map[string]string{
"ark.heptio.com/backup": backup.Name,
"ark.heptio.com/backup": ib.backupRequest.Name,
"ark.heptio.com/pv": metadata.GetName(),
}
log.Info("Snapshotting PersistentVolume")
snapshotID, err := ib.blockStore.CreateSnapshot(volumeID, pvFailureDomainZone, tags)
snapshotID, err := blockStore.CreateSnapshot(volumeID, pvFailureDomainZone, tags)
if err != nil {
// log+error on purpose - log goes to the per-backup log file, error goes to the backup
log.WithError(err).Error("error creating snapshot")
return errors.WithMessage(err, "error creating snapshot")
}
volumeType, iops, err := ib.blockStore.GetVolumeInfo(volumeID, pvFailureDomainZone)
volumeType, iops, err := blockStore.GetVolumeInfo(volumeID, pvFailureDomainZone)
if err != nil {
log.WithError(err).Error("error getting volume info")
return errors.WithMessage(err, "error getting volume info")
}
if backup.Status.VolumeBackups == nil {
backup.Status.VolumeBackups = make(map[string]*api.VolumeBackupInfo)
if ib.backupRequest.Status.VolumeBackups == nil {
ib.backupRequest.Status.VolumeBackups = make(map[string]*api.VolumeBackupInfo)
}
backup.Status.VolumeBackups[name] = &api.VolumeBackupInfo{
ib.backupRequest.Status.VolumeBackups[name] = &api.VolumeBackupInfo{
SnapshotID: snapshotID,
Type: volumeType,
Iops: iops,

View File

@@ -26,6 +26,7 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/cloudprovider"
resticmocks "github.com/heptio/ark/pkg/restic/mocks"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
@@ -107,10 +108,13 @@ func TestBackupItemSkips(t *testing.T) {
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
req := &Request{
NamespaceIncludesExcludes: test.namespaces,
ResourceIncludesExcludes: test.resources,
}
ib := &defaultItemBackupper{
namespaces: test.namespaces,
resources: test.resources,
backupRequest: req,
backedUpItems: test.backedUpItems,
}
@@ -134,13 +138,15 @@ func TestBackupItemSkips(t *testing.T) {
func TestBackupItemSkipsClusterScopedResourceWhenIncludeClusterResourcesFalse(t *testing.T) {
f := false
ib := &defaultItemBackupper{
backup: &v1.Backup{
Spec: v1.BackupSpec{
IncludeClusterResources: &f,
backupRequest: &Request{
Backup: &v1.Backup{
Spec: v1.BackupSpec{
IncludeClusterResources: &f,
},
},
NamespaceIncludesExcludes: collections.NewIncludesExcludes(),
ResourceIncludesExcludes: collections.NewIncludesExcludes(),
},
namespaces: collections.NewIncludesExcludes(),
resources: collections.NewIncludesExcludes(),
}
u := arktest.UnstructuredOrDie(`{"apiVersion":"v1","kind":"Foo","metadata":{"name":"bar"}}`)
@@ -350,15 +356,20 @@ func TestBackupItemNoSkips(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
actions []resolvedAction
action *fakeAction
backup = &v1.Backup{}
backup = new(Request)
groupResource = schema.ParseGroupResource("resource.group")
backedUpItems = make(map[itemKey]struct{})
resources = collections.NewIncludesExcludes()
w = &fakeTarWriter{}
)
backup.Backup = new(v1.Backup)
backup.NamespaceIncludesExcludes = collections.NewIncludesExcludes()
backup.ResourceIncludesExcludes = collections.NewIncludesExcludes()
backup.SnapshotLocations = []*v1.VolumeSnapshotLocation{
new(v1.VolumeSnapshotLocation),
}
if test.groupResource != "" {
groupResource = schema.ParseGroupResource(test.groupResource)
}
@@ -384,7 +395,7 @@ func TestBackupItemNoSkips(t *testing.T) {
action = &fakeAction{
additionalItems: test.customActionAdditionalItemIdentifiers,
}
actions = []resolvedAction{
backup.ResolvedActions = []resolvedAction{
{
ItemAction: action,
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
@@ -394,8 +405,6 @@ func TestBackupItemNoSkips(t *testing.T) {
}
}
resourceHooks := []resourceHook{}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
@@ -404,20 +413,18 @@ func TestBackupItemNoSkips(t *testing.T) {
discoveryHelper := arktest.NewFakeDiscoveryHelper(true, nil)
blockStoreGetter := &blockStoreGetter{}
b := (&defaultItemBackupperFactory{}).newItemBackupper(
backup,
namespaces,
resources,
backedUpItems,
actions,
podCommandExecutor,
w,
resourceHooks,
dynamicFactory,
discoveryHelper,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
blockStoreGetter,
).(*defaultItemBackupper)
var blockStore *arktest.FakeBlockStore
@@ -427,7 +434,8 @@ func TestBackupItemNoSkips(t *testing.T) {
VolumeID: "vol-abc123",
Error: test.snapshotError,
}
b.blockStore = blockStore
blockStoreGetter.blockStore = blockStore
}
if test.trackedPVCs != nil {
@@ -446,8 +454,8 @@ func TestBackupItemNoSkips(t *testing.T) {
b.additionalItemBackupper = additionalItemBackupper
obj := &unstructured.Unstructured{Object: item}
itemHookHandler.On("handleHooks", mock.Anything, groupResource, obj, resourceHooks, hookPhasePre).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, groupResource, obj, resourceHooks, hookPhasePost).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, groupResource, obj, backup.ResourceHooks, hookPhasePre).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, groupResource, obj, backup.ResourceHooks, hookPhasePost).Return(nil)
for i, item := range test.customActionAdditionalItemIdentifiers {
if test.additionalItemError != nil && i > 0 {
@@ -511,7 +519,7 @@ func TestBackupItemNoSkips(t *testing.T) {
}
require.Equal(t, 1, len(action.backups), "unexpected custom action backups: %#v", action.backups)
assert.Equal(t, backup, &(action.backups[0]), "backup")
assert.Equal(t, backup.Backup, &(action.backups[0]), "backup")
}
if test.snapshottableVolumes != nil {
@@ -541,6 +549,17 @@ func TestBackupItemNoSkips(t *testing.T) {
}
}
type blockStoreGetter struct {
blockStore cloudprovider.BlockStore
}
func (b *blockStoreGetter) GetBlockStore(name string) (cloudprovider.BlockStore, error) {
if b.blockStore != nil {
return b.blockStore, nil
}
return nil, errors.New("plugin not found")
}
type addAnnotationAction struct{}
func (a *addAnnotationAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []ResourceIdentifier, error) {
@@ -578,28 +597,29 @@ func TestItemActionModificationsToItemPersist(t *testing.T) {
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
req = &Request{
NamespaceIncludesExcludes: collections.NewIncludesExcludes(),
ResourceIncludesExcludes: collections.NewIncludesExcludes(),
ResolvedActions: []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
},
}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
req,
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
nil,
newPVCSnapshotTracker(),
nil,
).(*defaultItemBackupper)
)
@@ -633,29 +653,29 @@ func TestResticAnnotationsPersist(t *testing.T) {
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
req = &Request{
NamespaceIncludesExcludes: collections.NewIncludesExcludes(),
ResourceIncludesExcludes: collections.NewIncludesExcludes(),
ResolvedActions: []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
},
}
resticBackupper = &resticmocks.Backupper{}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
req,
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
resticBackupper,
newPVCSnapshotTracker(),
nil,
).(*defaultItemBackupper)
)
@@ -793,7 +813,13 @@ func TestTakePVSnapshot(t *testing.T) {
VolumeID: test.expectedVolumeID,
}
ib := &defaultItemBackupper{blockStore: blockStore}
ib := &defaultItemBackupper{
backupRequest: &Request{
Backup: backup,
SnapshotLocations: []*v1.VolumeSnapshotLocation{new(v1.VolumeSnapshotLocation)},
},
blockStoreGetter: &blockStoreGetter{blockStore: blockStore},
}
pv, err := arktest.GetAsMap(test.pv)
if err != nil {
@@ -801,7 +827,7 @@ func TestTakePVSnapshot(t *testing.T) {
}
// method under test
err = ib.takePVSnapshot(&unstructured.Unstructured{Object: pv}, backup, arktest.NewLogger())
err = ib.takePVSnapshot(&unstructured.Unstructured{Object: pv}, arktest.NewLogger())
gotErr := err != nil

19
pkg/backup/request.go Normal file
View File

@@ -0,0 +1,19 @@
package backup
import (
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/util/collections"
)
// Request is a request for a backup, with all references to other objects
// materialized (e.g. backup/snapshot locations, includes/excludes, etc.)
type Request struct {
*arkv1api.Backup
StorageLocation *arkv1api.BackupStorageLocation
SnapshotLocations []*arkv1api.VolumeSnapshotLocation
NamespaceIncludesExcludes *collections.IncludesExcludes
ResourceIncludesExcludes *collections.IncludesExcludes
ResourceHooks []resourceHook
ResolvedActions []resolvedAction
}

View File

@@ -27,9 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/kuberesource"
"github.com/heptio/ark/pkg/podexec"
@@ -40,20 +38,16 @@ import (
type resourceBackupperFactory interface {
newResourceBackupper(
log logrus.FieldLogger,
backup *api.Backup,
namespaces *collections.IncludesExcludes,
resources *collections.IncludesExcludes,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) resourceBackupper
}
@@ -61,38 +55,31 @@ type defaultResourceBackupperFactory struct{}
func (f *defaultResourceBackupperFactory) newResourceBackupper(
log logrus.FieldLogger,
backup *api.Backup,
namespaces *collections.IncludesExcludes,
resources *collections.IncludesExcludes,
backupRequest *Request,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
backedUpItems map[itemKey]struct{},
cohabitatingResources map[string]*cohabitatingResource,
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) resourceBackupper {
return &defaultResourceBackupper{
log: log,
backup: backup,
namespaces: namespaces,
resources: resources,
backupRequest: backupRequest,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
backedUpItems: backedUpItems,
actions: actions,
cohabitatingResources: cohabitatingResources,
podCommandExecutor: podCommandExecutor,
tarWriter: tarWriter,
resourceHooks: resourceHooks,
blockStore: blockStore,
resticBackupper: resticBackupper,
resticSnapshotTracker: resticSnapshotTracker,
itemBackupperFactory: &defaultItemBackupperFactory{},
blockStoreGetter: blockStoreGetter,
itemBackupperFactory: &defaultItemBackupperFactory{},
}
}
@@ -102,21 +89,17 @@ type resourceBackupper interface {
type defaultResourceBackupper struct {
log logrus.FieldLogger
backup *api.Backup
namespaces *collections.IncludesExcludes
resources *collections.IncludesExcludes
backupRequest *Request
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
backedUpItems map[itemKey]struct{}
cohabitatingResources map[string]*cohabitatingResource
actions []resolvedAction
podCommandExecutor podexec.PodCommandExecutor
tarWriter tarWriter
resourceHooks []resourceHook
blockStore cloudprovider.BlockStore
resticBackupper restic.Backupper
resticSnapshotTracker *pvcSnapshotTracker
itemBackupperFactory itemBackupperFactory
blockStoreGetter BlockStoreGetter
}
// backupResource backs up all the objects for a given group-version-resource.
@@ -142,8 +125,8 @@ func (rb *defaultResourceBackupper) backupResource(
// If the resource we are backing up is NOT namespaces, and it is cluster-scoped, check to see if
// we should include it based on the IncludeClusterResources setting.
if gr != kuberesource.Namespaces && clusterScoped {
if rb.backup.Spec.IncludeClusterResources == nil {
if !rb.namespaces.IncludeEverything() {
if rb.backupRequest.Spec.IncludeClusterResources == nil {
if !rb.backupRequest.NamespaceIncludesExcludes.IncludeEverything() {
// when IncludeClusterResources == nil (auto), only directly
// back up cluster-scoped resources if we're doing a full-cluster
// (all namespaces) backup. Note that in the case of a subset of
@@ -154,13 +137,13 @@ func (rb *defaultResourceBackupper) backupResource(
log.Info("Skipping resource because it's cluster-scoped and only specific namespaces are included in the backup")
return nil
}
} else if !*rb.backup.Spec.IncludeClusterResources {
} else if !*rb.backupRequest.Spec.IncludeClusterResources {
log.Info("Skipping resource because it's cluster-scoped")
return nil
}
}
if !rb.resources.ShouldInclude(grString) {
if !rb.backupRequest.ResourceIncludesExcludes.ShouldInclude(grString) {
log.Infof("Resource is excluded")
return nil
}
@@ -179,22 +162,18 @@ func (rb *defaultResourceBackupper) backupResource(
}
itemBackupper := rb.itemBackupperFactory.newItemBackupper(
rb.backup,
rb.namespaces,
rb.resources,
rb.backupRequest,
rb.backedUpItems,
rb.actions,
rb.podCommandExecutor,
rb.tarWriter,
rb.resourceHooks,
rb.dynamicFactory,
rb.discoveryHelper,
rb.blockStore,
rb.resticBackupper,
rb.resticSnapshotTracker,
rb.blockStoreGetter,
)
namespacesToList := getNamespacesToList(rb.namespaces)
namespacesToList := getNamespacesToList(rb.backupRequest.NamespaceIncludesExcludes)
// Check if we're backing up namespaces, and only certain ones
if gr == kuberesource.Namespaces && namespacesToList[0] != "" {
@@ -204,8 +183,8 @@ func (rb *defaultResourceBackupper) backupResource(
}
var labelSelector labels.Selector
if rb.backup.Spec.LabelSelector != nil {
labelSelector, err = metav1.LabelSelectorAsSelector(rb.backup.Spec.LabelSelector)
if rb.backupRequest.Spec.LabelSelector != nil {
labelSelector, err = metav1.LabelSelectorAsSelector(rb.backupRequest.Spec.LabelSelector)
if err != nil {
// This should never happen...
return errors.Wrap(err, "invalid label selector")
@@ -246,7 +225,7 @@ func (rb *defaultResourceBackupper) backupResource(
}
var labelSelector string
if selector := rb.backup.Spec.LabelSelector; selector != nil {
if selector := rb.backupRequest.Spec.LabelSelector; selector != nil {
labelSelector = metav1.FormatLabelSelector(selector)
}
@@ -276,7 +255,7 @@ func (rb *defaultResourceBackupper) backupResource(
continue
}
if gr == kuberesource.Namespaces && !rb.namespaces.ShouldInclude(metadata.GetName()) {
if gr == kuberesource.Namespaces && !rb.backupRequest.NamespaceIncludesExcludes.ShouldInclude(metadata.GetName()) {
log.WithField("name", metadata.GetName()).Info("skipping namespace because it is excluded")
continue
}

View File

@@ -21,7 +21,6 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/kuberesource"
"github.com/heptio/ark/pkg/podexec"
@@ -220,10 +219,23 @@ func TestBackupResource(t *testing.T) {
}
for _, test := range tests {
backup := &v1.Backup{
Spec: v1.BackupSpec{
IncludeClusterResources: test.includeClusterResources,
req := &Request{
Backup: &v1.Backup{
Spec: v1.BackupSpec{
IncludeClusterResources: test.includeClusterResources,
},
},
ResolvedActions: []resolvedAction{
{
ItemAction: newFakeAction("pods"),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"),
},
},
ResourceHooks: []resourceHook{
{name: "myhook"},
},
ResourceIncludesExcludes: test.resources,
NamespaceIncludesExcludes: test.namespaces,
}
dynamicFactory := &arktest.FakeDynamicFactory{}
@@ -240,17 +252,6 @@ func TestBackupResource(t *testing.T) {
"networkpolicies": newCohabitatingResource("networkpolicies", "extensions", "networking.k8s.io"),
}
actions := []resolvedAction{
{
ItemAction: newFakeAction("pods"),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"),
},
}
resourceHooks := []resourceHook{
{name: "myhook"},
}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
@@ -259,20 +260,16 @@ func TestBackupResource(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
rb := (&defaultResourceBackupperFactory{}).newResourceBackupper(
arktest.NewLogger(),
backup,
test.namespaces,
test.resources,
req,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
nil,
).(*defaultResourceBackupper)
itemBackupperFactory := &mockItemBackupperFactory{}
@@ -284,14 +281,10 @@ func TestBackupResource(t *testing.T) {
defer itemBackupper.AssertExpectations(t)
itemBackupperFactory.On("newItemBackupper",
backup,
test.namespaces,
test.resources,
req,
backedUpItems,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
dynamicFactory,
discoveryHelper,
mock.Anything,
@@ -382,19 +375,29 @@ func TestBackupResourceCohabitation(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
backup := &v1.Backup{
Spec: v1.BackupSpec{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
req := &Request{
Backup: &v1.Backup{
Spec: v1.BackupSpec{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
},
},
NamespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
ResourceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
ResolvedActions: []resolvedAction{
{
ItemAction: newFakeAction("pods"),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"),
},
},
ResourceHooks: []resourceHook{
{name: "myhook"},
},
}
namespaces := collections.NewIncludesExcludes().Includes("*")
resources := collections.NewIncludesExcludes().Includes("*")
dynamicFactory := &arktest.FakeDynamicFactory{}
defer dynamicFactory.AssertExpectations(t)
@@ -409,17 +412,6 @@ func TestBackupResourceCohabitation(t *testing.T) {
"networkpolicies": newCohabitatingResource("networkpolicies", "extensions", "networking.k8s.io"),
}
actions := []resolvedAction{
{
ItemAction: newFakeAction("pods"),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"),
},
}
resourceHooks := []resourceHook{
{name: "myhook"},
}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
@@ -427,20 +419,16 @@ func TestBackupResourceCohabitation(t *testing.T) {
rb := (&defaultResourceBackupperFactory{}).newResourceBackupper(
arktest.NewLogger(),
backup,
namespaces,
resources,
req,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
nil,
).(*defaultResourceBackupper)
itemBackupperFactory := &mockItemBackupperFactory{}
@@ -451,19 +439,15 @@ func TestBackupResourceCohabitation(t *testing.T) {
defer itemBackupper.AssertExpectations(t)
itemBackupperFactory.On("newItemBackupper",
backup,
namespaces,
resources,
req,
backedUpItems,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
dynamicFactory,
discoveryHelper,
mock.Anything, // snapshot service
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
nil,
).Return(itemBackupper)
client := &arktest.FakeDynamicClient{}
@@ -471,7 +455,7 @@ func TestBackupResourceCohabitation(t *testing.T) {
// STEP 1: make sure the initial backup goes through
dynamicFactory.On("ClientForGroupVersionResource", test.groupVersion1, test.apiResource, "").Return(client, nil)
client.On("List", metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(backup.Spec.LabelSelector)}).Return(&unstructured.UnstructuredList{}, nil)
client.On("List", metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(req.Backup.Spec.LabelSelector)}).Return(&unstructured.UnstructuredList{}, nil)
// STEP 2: do the backup
err := rb.backupResource(test.apiGroup1, test.apiResource)
@@ -485,10 +469,11 @@ func TestBackupResourceCohabitation(t *testing.T) {
}
func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
backup := &v1.Backup{}
namespaces := collections.NewIncludesExcludes().Includes("ns-1")
resources := collections.NewIncludesExcludes().Includes("*")
req := &Request{
Backup: &v1.Backup{},
NamespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("ns-1"),
ResourceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
}
backedUpItems := map[itemKey]struct{}{}
@@ -499,10 +484,6 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
cohabitatingResources := map[string]*cohabitatingResource{}
actions := []resolvedAction{}
resourceHooks := []resourceHook{}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
@@ -510,20 +491,16 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
rb := (&defaultResourceBackupperFactory{}).newResourceBackupper(
arktest.NewLogger(),
backup,
namespaces,
resources,
req,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
nil,
).(*defaultResourceBackupper)
itemBackupperFactory := &mockItemBackupperFactory{}
@@ -534,27 +511,19 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
defer itemHookHandler.AssertExpectations(t)
itemBackupper := &defaultItemBackupper{
backup: backup,
namespaces: namespaces,
resources: resources,
backupRequest: req,
backedUpItems: backedUpItems,
actions: actions,
tarWriter: tarWriter,
resourceHooks: resourceHooks,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
itemHookHandler: itemHookHandler,
}
itemBackupperFactory.On("newItemBackupper",
backup,
namespaces,
resources,
req,
backedUpItems,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
dynamicFactory,
discoveryHelper,
mock.Anything,
@@ -570,8 +539,8 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
ns1 := arktest.UnstructuredOrDie(`{"apiVersion":"v1","kind":"Namespace","metadata":{"name":"ns-1"}}`)
client.On("Get", "ns-1", metav1.GetOptions{}).Return(ns1, nil)
itemHookHandler.On("handleHooks", mock.Anything, schema.GroupResource{Group: "", Resource: "namespaces"}, ns1, resourceHooks, hookPhasePre).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, schema.GroupResource{Group: "", Resource: "namespaces"}, ns1, resourceHooks, hookPhasePost).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, schema.GroupResource{Group: "", Resource: "namespaces"}, ns1, req.ResourceHooks, hookPhasePre).Return(nil)
itemHookHandler.On("handleHooks", mock.Anything, schema.GroupResource{Group: "", Resource: "namespaces"}, ns1, req.ResourceHooks, hookPhasePost).Return(nil)
err := rb.backupResource(v1Group, namespacesResource)
require.NoError(t, err)
@@ -581,19 +550,20 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) {
}
func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) {
backup := &v1.Backup{
Spec: v1.BackupSpec{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
req := &Request{
Backup: &v1.Backup{
Spec: v1.BackupSpec{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"foo": "bar",
},
},
},
},
NamespaceIncludesExcludes: collections.NewIncludesExcludes().Excludes("ns-1"),
ResourceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
}
namespaces := collections.NewIncludesExcludes().Excludes("ns-1")
resources := collections.NewIncludesExcludes().Includes("*")
backedUpItems := map[itemKey]struct{}{}
dynamicFactory := &arktest.FakeDynamicFactory{}
@@ -603,10 +573,6 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) {
cohabitatingResources := map[string]*cohabitatingResource{}
actions := []resolvedAction{}
resourceHooks := []resourceHook{}
podCommandExecutor := &arktest.MockPodCommandExecutor{}
defer podCommandExecutor.AssertExpectations(t)
@@ -614,20 +580,16 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) {
rb := (&defaultResourceBackupperFactory{}).newResourceBackupper(
arktest.NewLogger(),
backup,
namespaces,
resources,
req,
dynamicFactory,
discoveryHelper,
backedUpItems,
cohabitatingResources,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
nil, // snapshot service
nil, // restic backupper
newPVCSnapshotTracker(),
nil,
).(*defaultResourceBackupper)
itemBackupperFactory := &mockItemBackupperFactory{}
@@ -641,14 +603,10 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) {
defer itemBackupper.AssertExpectations(t)
itemBackupperFactory.On("newItemBackupper",
backup,
namespaces,
resources,
req,
backedUpItems,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
dynamicFactory,
discoveryHelper,
mock.Anything,
@@ -667,7 +625,7 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) {
list := &unstructured.UnstructuredList{
Items: []unstructured.Unstructured{*ns1, *ns2},
}
client.On("List", metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(backup.Spec.LabelSelector)}).Return(list, nil)
client.On("List", metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(req.Backup.Spec.LabelSelector)}).Return(list, nil)
itemBackupper.On("backupItem", mock.AnythingOfType("*logrus.Entry"), ns2, kuberesource.Namespaces).Return(nil)
@@ -680,33 +638,26 @@ type mockItemBackupperFactory struct {
}
func (ibf *mockItemBackupperFactory) newItemBackupper(
backup *v1.Backup,
namespaces, resources *collections.IncludesExcludes,
backup *Request,
backedUpItems map[itemKey]struct{},
actions []resolvedAction,
podCommandExecutor podexec.PodCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
blockStore cloudprovider.BlockStore,
resticBackupper restic.Backupper,
resticSnapshotTracker *pvcSnapshotTracker,
blockStoreGetter BlockStoreGetter,
) ItemBackupper {
args := ibf.Called(
backup,
namespaces,
resources,
backedUpItems,
actions,
podCommandExecutor,
tarWriter,
resourceHooks,
dynamicFactory,
discoveryHelper,
blockStore,
resticBackupper,
resticSnapshotTracker,
blockStoreGetter,
)
return args.Get(0).(ItemBackupper)
}

View File

@@ -53,7 +53,6 @@ import (
"github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/buildinfo"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/cmd"
"github.com/heptio/ark/pkg/cmd/util/flag"
"github.com/heptio/ark/pkg/cmd/util/signals"
@@ -176,7 +175,6 @@ type server struct {
kubeClientConfig *rest.Config
kubeClient kubernetes.Interface
arkClient clientset.Interface
blockStore cloudprovider.BlockStore
discoveryClient discovery.DiscoveryInterface
discoveryHelper arkdiscovery.Helper
dynamicClient dynamic.Interface
@@ -291,17 +289,6 @@ func (s *server) run() error {
return err
}
if config.PersistentVolumeProvider == nil {
s.logger.Info("PersistentVolumeProvider config not provided, volume snapshots and restores are disabled")
} else {
s.logger.Info("Configuring cloud provider for snapshot service")
blockStore, err := getBlockStore(*config.PersistentVolumeProvider, s.pluginManager)
if err != nil {
return err
}
s.blockStore = blockStore
}
if err := s.initRestic(); err != nil {
return err
}
@@ -559,23 +546,6 @@ func (s *server) watchConfig(config *api.Config) {
})
}
func getBlockStore(cloudConfig api.CloudProviderConfig, manager plugin.Manager) (cloudprovider.BlockStore, error) {
if cloudConfig.Name == "" {
return nil, errors.New("block storage provider name must not be empty")
}
blockStore, err := manager.GetBlockStore(cloudConfig.Name)
if err != nil {
return nil, err
}
if err := blockStore.Init(cloudConfig.Config); err != nil {
return nil, err
}
return blockStore, nil
}
func (s *server) initRestic() error {
// warn if restic daemonset does not exist
if _, err := s.kubeClient.AppsV1().DaemonSets(s.namespace).Get(restic.DaemonSet, metav1.GetOptions{}); apierrors.IsNotFound(err) {
@@ -671,7 +641,6 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.blockStore,
s.resticManager,
s.config.podVolumeOperationTimeout,
)
@@ -681,7 +650,6 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
s.sharedInformerFactory.Ark().V1().Backups(),
s.arkClient.ArkV1(),
backupper,
s.blockStore != nil,
s.logger,
s.logLevel,
newPluginManager,
@@ -689,7 +657,7 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
s.config.defaultBackupLocation,
s.sharedInformerFactory.Ark().V1().VolumeSnapshotLocations(),
s.config.defaultVolumeSnapshotLocations,
defaultVolumeSnapshotLocations,
s.metrics,
)
wg.Add(1)
@@ -729,7 +697,7 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
s.arkClient.ArkV1(), // deleteBackupRequestClient
s.arkClient.ArkV1(), // backupClient
s.blockStore,
nil,
s.sharedInformerFactory.Ark().V1().Restores(),
s.arkClient.ArkV1(), // restoreClient
backupTracker,
@@ -749,7 +717,7 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
restorer, err := restore.NewKubernetesRestorer(
s.discoveryHelper,
client.NewDynamicFactory(s.dynamicClient),
s.blockStore,
nil,
s.config.restoreResourcePriorities,
s.arkClient.ArkV1(),
s.kubeClient.CoreV1().Namespaces(),
@@ -767,7 +735,7 @@ func (s *server) runControllers(config *api.Config, defaultVolumeSnapshotLocatio
restorer,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
s.blockStore != nil,
false,
s.logger,
s.logLevel,
newPluginManager,

View File

@@ -37,7 +37,7 @@ import (
"k8s.io/client-go/tools/cache"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
pkgbackup "github.com/heptio/ark/pkg/backup"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@@ -55,8 +55,7 @@ const backupVersion = 1
type backupController struct {
*genericController
backupper backup.Backupper
pvProviderExists bool
backupper pkgbackup.Backupper
lister listers.BackupLister
client arkv1client.BackupsGetter
clock clock.Clock
@@ -66,7 +65,7 @@ type backupController struct {
backupLocationLister listers.BackupStorageLocationLister
defaultBackupLocation string
snapshotLocationLister listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
defaultSnapshotLocations map[string]*api.VolumeSnapshotLocation
metrics *metrics.ServerMetrics
newBackupStore func(*api.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error)
}
@@ -74,8 +73,7 @@ type backupController struct {
func NewBackupController(
backupInformer informers.BackupInformer,
client arkv1client.BackupsGetter,
backupper backup.Backupper,
pvProviderExists bool,
backupper pkgbackup.Backupper,
logger logrus.FieldLogger,
backupLogLevel logrus.Level,
newPluginManager func(logrus.FieldLogger) plugin.Manager,
@@ -83,13 +81,12 @@ func NewBackupController(
backupLocationInformer informers.BackupStorageLocationInformer,
defaultBackupLocation string,
volumeSnapshotLocationInformer informers.VolumeSnapshotLocationInformer,
defaultSnapshotLocations map[string]string,
defaultSnapshotLocations map[string]*api.VolumeSnapshotLocation,
metrics *metrics.ServerMetrics,
) Interface {
c := &backupController{
genericController: newGenericController("backup", logger),
backupper: backupper,
pvProviderExists: pvProviderExists,
lister: backupInformer.Lister(),
client: client,
clock: &clock.RealClock{},
@@ -151,7 +148,7 @@ func (c *backupController) processBackup(key string) error {
}
log.Debug("Getting backup")
backup, err := c.lister.Backups(ns).Get(name)
original, err := c.lister.Backups(ns).Get(name)
if err != nil {
return errors.Wrap(err, "error getting backup")
}
@@ -164,68 +161,53 @@ func (c *backupController) processBackup(key string) error {
// informer sees the update. In the latter case, after the informer has seen the update to
// InProgress, we still need this check so we can return nil to indicate we've finished processing
// this key (even though it was a no-op).
switch backup.Status.Phase {
switch original.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
default:
return nil
}
log.Debug("Cloning backup")
// store ref to original for creating patch
original := backup
// don't modify items in the cache
backup = backup.DeepCopy()
log.Debug("Preparing backup request")
request := c.prepareBackupRequest(original)
// set backup version
backup.Status.Version = backupVersion
// calculate expiration
if backup.Spec.TTL.Duration > 0 {
backup.Status.Expiration = metav1.NewTime(c.clock.Now().Add(backup.Spec.TTL.Duration))
}
backupLocation, errs := c.getLocationAndValidate(backup, c.defaultBackupLocation)
errs = append(errs, c.defaultAndValidateSnapshotLocations(backup, c.defaultSnapshotLocations)...)
backup.Status.ValidationErrors = append(backup.Status.ValidationErrors, errs...)
if len(backup.Status.ValidationErrors) > 0 {
backup.Status.Phase = api.BackupPhaseFailedValidation
if len(request.Status.ValidationErrors) > 0 {
request.Status.Phase = api.BackupPhaseFailedValidation
} else {
backup.Status.Phase = api.BackupPhaseInProgress
request.Status.Phase = api.BackupPhaseInProgress
}
// update status
updatedBackup, err := patchBackup(original, backup, c.client)
updatedBackup, err := patchBackup(original, request.Backup, c.client)
if err != nil {
return errors.Wrapf(err, "error updating Backup status to %s", backup.Status.Phase)
return errors.Wrapf(err, "error updating Backup status to %s", request.Status.Phase)
}
// store ref to just-updated item for creating patch
original = updatedBackup
backup = updatedBackup.DeepCopy()
request.Backup = updatedBackup.DeepCopy()
if backup.Status.Phase == api.BackupPhaseFailedValidation {
if request.Status.Phase == api.BackupPhaseFailedValidation {
return nil
}
c.backupTracker.Add(backup.Namespace, backup.Name)
defer c.backupTracker.Delete(backup.Namespace, backup.Name)
c.backupTracker.Add(request.Namespace, request.Name)
defer c.backupTracker.Delete(request.Namespace, request.Name)
log.Debug("Running backup")
// execution & upload of backup
backupScheduleName := backup.GetLabels()["ark-schedule"]
backupScheduleName := request.GetLabels()["ark-schedule"]
c.metrics.RegisterBackupAttempt(backupScheduleName)
if err := c.runBackup(backup, backupLocation); err != nil {
if err := c.runBackup(request); err != nil {
log.WithError(err).Error("backup failed")
backup.Status.Phase = api.BackupPhaseFailed
request.Status.Phase = api.BackupPhaseFailed
c.metrics.RegisterBackupFailed(backupScheduleName)
} else {
c.metrics.RegisterBackupSuccess(backupScheduleName)
}
log.Debug("Updating backup's final status")
if _, err := patchBackup(original, backup, c.client); err != nil {
if _, err := patchBackup(original, request.Backup, c.client); err != nil {
log.WithError(err).Error("error updating backup's final status")
}
@@ -256,84 +238,107 @@ func patchBackup(original, updated *api.Backup, client arkv1client.BackupsGetter
return res, nil
}
func (c *backupController) getLocationAndValidate(itm *api.Backup, defaultBackupLocation string) (*api.BackupStorageLocation, []string) {
var validationErrors []string
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedResources, itm.Spec.ExcludedResources) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
func (c *backupController) prepareBackupRequest(backup *api.Backup) *pkgbackup.Request {
request := &pkgbackup.Request{
Backup: backup.DeepCopy(), // don't modify items in the cache
}
for _, err := range collections.ValidateIncludesExcludes(itm.Spec.IncludedNamespaces, itm.Spec.ExcludedNamespaces) {
validationErrors = append(validationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
// set backup version
request.Status.Version = backupVersion
// calculate expiration
if request.Spec.TTL.Duration > 0 {
request.Status.Expiration = metav1.NewTime(c.clock.Now().Add(request.Spec.TTL.Duration))
}
if itm.Spec.StorageLocation == "" {
itm.Spec.StorageLocation = defaultBackupLocation
// default storage location if not specified
if request.Spec.StorageLocation == "" {
request.Spec.StorageLocation = c.defaultBackupLocation
}
// add the storage location as a label for easy filtering later.
if itm.Labels == nil {
itm.Labels = make(map[string]string)
if request.Labels == nil {
request.Labels = make(map[string]string)
}
itm.Labels[api.StorageLocationLabel] = itm.Spec.StorageLocation
request.Labels[api.StorageLocationLabel] = request.Spec.StorageLocation
var backupLocation *api.BackupStorageLocation
backupLocation, err := c.backupLocationLister.BackupStorageLocations(itm.Namespace).Get(itm.Spec.StorageLocation)
if err != nil {
validationErrors = append(validationErrors, fmt.Sprintf("Error getting backup storage location: %v", err))
// validate the included/excluded resources and namespaces
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedResources, request.Spec.ExcludedResources) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded resource lists: %v", err))
}
return backupLocation, validationErrors
for _, err := range collections.ValidateIncludesExcludes(request.Spec.IncludedNamespaces, request.Spec.ExcludedNamespaces) {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Invalid included/excluded namespace lists: %v", err))
}
// validate the storage location, and store the BackupStorageLocation API obj on the request
if storageLocation, err := c.backupLocationLister.BackupStorageLocations(request.Namespace).Get(request.Spec.StorageLocation); err != nil {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, fmt.Sprintf("Error getting backup storage location: %v", err))
} else {
request.StorageLocation = storageLocation
}
// validate and get the backup's VolumeSnapshotLocations, and store the
// VolumeSnapshotLocation API objs on the request
if locs, errs := c.validateAndGetSnapshotLocations(request.Backup); len(errs) > 0 {
request.Status.ValidationErrors = append(request.Status.ValidationErrors, errs...)
} else {
request.Spec.VolumeSnapshotLocations = nil
for _, loc := range locs {
request.Spec.VolumeSnapshotLocations = append(request.Spec.VolumeSnapshotLocations, loc.Name)
request.SnapshotLocations = append(request.SnapshotLocations, loc)
}
}
return request
}
// defaultAndValidateSnapshotLocations ensures:
// - each location name in Spec VolumeSnapshotLocation exists as a location
// - exactly 1 location per existing or default provider
// - a given default provider's location name is added to the Spec VolumeSnapshotLocation if it does not exist as a VSL
func (c *backupController) defaultAndValidateSnapshotLocations(itm *api.Backup, defaultLocations map[string]string) []string {
var errors []string
perProviderLocationName := make(map[string]string)
var finalLocationNameList []string
for _, locationName := range itm.Spec.VolumeSnapshotLocations {
// validateAndGetSnapshotLocations gets a collection of VolumeSnapshotLocation objects that
// this backup will use (returned as a map of provider name -> VSL), and ensures:
// - each location name in .spec.volumeSnapshotLocations exists as a location
// - exactly 1 location per provider
// - a given provider's default location name is added to .spec.volumeSnapshotLocations if one
// is not explicitly specified for the provider
func (c *backupController) validateAndGetSnapshotLocations(backup *api.Backup) (map[string]*api.VolumeSnapshotLocation, []string) {
errors := []string{}
providerLocations := make(map[string]*api.VolumeSnapshotLocation)
for _, locationName := range backup.Spec.VolumeSnapshotLocations {
// validate each locationName exists as a VolumeSnapshotLocation
location, err := c.snapshotLocationLister.VolumeSnapshotLocations(itm.Namespace).Get(locationName)
location, err := c.snapshotLocationLister.VolumeSnapshotLocations(backup.Namespace).Get(locationName)
if err != nil {
errors = append(errors, fmt.Sprintf("error getting volume snapshot location named %s: %v", locationName, err))
continue
}
// ensure we end up with exactly 1 locationName *per provider*
providerLocationName := perProviderLocationName[location.Spec.Provider]
if providerLocationName != "" {
// ensure we end up with exactly 1 location *per provider*
if providerLocation, ok := providerLocations[location.Spec.Provider]; ok {
// if > 1 location name per provider as in ["aws-us-east-1" | "aws-us-west-1"] (same provider, multiple names)
if providerLocationName != locationName {
errors = append(errors, fmt.Sprintf("more than one VolumeSnapshotLocation name specified for provider %s: %s; unexpected name was %s", location.Spec.Provider, locationName, providerLocationName))
if providerLocation.Name != locationName {
errors = append(errors, fmt.Sprintf("more than one VolumeSnapshotLocation name specified for provider %s: %s; unexpected name was %s", location.Spec.Provider, locationName, providerLocation.Name))
continue
}
} else {
// no dup exists: add locationName to the final list
finalLocationNameList = append(finalLocationNameList, locationName)
// keep track of all valid existing locations, per provider
perProviderLocationName[location.Spec.Provider] = locationName
providerLocations[location.Spec.Provider] = location
}
}
if len(errors) > 0 {
return errors
return nil, errors
}
for provider, defaultLocationName := range defaultLocations {
for provider, defaultLocation := range c.defaultSnapshotLocations {
// if a location name for a given provider does not already exist, add the provider's default
if _, ok := perProviderLocationName[provider]; !ok {
finalLocationNameList = append(finalLocationNameList, defaultLocationName)
if _, ok := providerLocations[provider]; !ok {
providerLocations[provider] = defaultLocation
}
}
itm.Spec.VolumeSnapshotLocations = finalLocationNameList
return nil
return providerLocations, nil
}
func (c *backupController) runBackup(backup *api.Backup, backupLocation *api.BackupStorageLocation) error {
func (c *backupController) runBackup(backup *pkgbackup.Request) error {
log := c.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")
backup.Status.StartTimestamp.Time = c.clock.Now()
@@ -370,17 +375,15 @@ func (c *backupController) runBackup(backup *api.Backup, backupLocation *api.Bac
return err
}
backupStore, err := c.newBackupStore(backupLocation, pluginManager, log)
backupStore, err := c.newBackupStore(backup.StorageLocation, pluginManager, log)
if err != nil {
return err
}
var errs []error
var backupJSONToUpload, backupFileToUpload io.Reader
// Do the actual backup
if err := c.backupper.Backup(log, backup, backupFile, actions); err != nil {
if err := c.backupper.Backup(log, backup, backupFile, actions, pluginManager); err != nil {
errs = append(errs, err)
backup.Status.Phase = api.BackupPhaseFailed
@@ -392,8 +395,9 @@ func (c *backupController) runBackup(backup *api.Backup, backupLocation *api.Bac
// Otherwise, the JSON file in object storage has a CompletionTimestamp of 'null'.
backup.Status.CompletionTimestamp.Time = c.clock.Now()
var backupJSONToUpload, backupFileToUpload io.Reader
backupJSON := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", backupJSON); err != nil {
if err := encode.EncodeTo(backup.Backup, "json", backupJSON); err != nil {
errs = append(errs, errors.Wrap(err, "error encoding backup"))
} else {
// Only upload the json and backup tarball if encoding to json succeeded.

View File

@@ -18,24 +18,24 @@ package controller
import (
"bytes"
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
core "k8s.io/client-go/testing"
"github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/backup"
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/metrics"
@@ -43,7 +43,6 @@ import (
persistencemocks "github.com/heptio/ark/pkg/persistence/mocks"
"github.com/heptio/ark/pkg/plugin"
pluginmocks "github.com/heptio/ark/pkg/plugin/mocks"
"github.com/heptio/ark/pkg/util/collections"
"github.com/heptio/ark/pkg/util/logging"
arktest "github.com/heptio/ark/pkg/util/test"
)
@@ -52,384 +51,313 @@ type fakeBackupper struct {
mock.Mock
}
func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *v1.Backup, backupFile io.Writer, actions []backup.ItemAction) error {
args := b.Called(logger, backup, backupFile, actions)
func (b *fakeBackupper) Backup(logger logrus.FieldLogger, backup *pkgbackup.Request, backupFile io.Writer, actions []pkgbackup.ItemAction, blockStoreGetter pkgbackup.BlockStoreGetter) error {
args := b.Called(logger, backup, backupFile, actions, blockStoreGetter)
return args.Error(0)
}
func TestProcessBackup(t *testing.T) {
func TestProcessBackupNonProcessedItems(t *testing.T) {
tests := []struct {
name string
key string
expectError bool
expectedIncludes []string
expectedExcludes []string
backup *arktest.TestBackup
expectBackup bool
allowSnapshots bool
defaultLocations map[string]string
name string
key string
backup *v1.Backup
expectedErr string
}{
{
name: "bad key",
name: "bad key returns error",
key: "bad/key/here",
expectError: true,
expectedErr: "error splitting queue key: unexpected key format: \"bad/key/here\"",
},
{
name: "lister failed",
key: "heptio-ark/backup1",
expectError: true,
name: "backup not found in lister returns error",
key: "nonexistent/backup",
expectedErr: "error getting backup: backup.ark.heptio.com \"backup\" not found",
},
{
name: "do not process phase FailedValidation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseFailedValidation),
expectBackup: false,
name: "FailedValidation backup is not processed",
key: "heptio-ark/backup-1",
backup: arktest.NewTestBackup().WithName("backup-1").WithPhase(v1.BackupPhaseFailedValidation).Backup,
},
{
name: "do not process phase InProgress",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseInProgress),
expectBackup: false,
name: "InProgress backup is not processed",
key: "heptio-ark/backup-1",
backup: arktest.NewTestBackup().WithName("backup-1").WithPhase(v1.BackupPhaseInProgress).Backup,
},
{
name: "do not process phase Completed",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseCompleted),
expectBackup: false,
name: "Completed backup is not processed",
key: "heptio-ark/backup-1",
backup: arktest.NewTestBackup().WithName("backup-1").WithPhase(v1.BackupPhaseCompleted).Backup,
},
{
name: "do not process phase Failed",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseFailed),
expectBackup: false,
},
{
name: "do not process phase other",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase("arg"),
expectBackup: false,
},
{
name: "invalid included/excluded resources fails validation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedResources("foo").WithExcludedResources("foo"),
expectBackup: false,
},
{
name: "invalid included/excluded namespaces fails validation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedNamespaces("foo").WithExcludedNamespaces("foo"),
expectBackup: false,
},
{
name: "make sure specified included and excluded resources are honored",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedResources("i", "j").WithExcludedResources("k", "l"),
expectedIncludes: []string{"i", "j"},
expectedExcludes: []string{"k", "l"},
expectBackup: true,
},
{
name: "if includednamespaces are specified, don't default to *",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithIncludedNamespaces("ns-1"),
expectBackup: true,
},
{
name: "ttl",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithTTL(10 * time.Minute),
expectBackup: true,
},
{
name: "backup with SnapshotVolumes when allowSnapshots=false fails validation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithSnapshotVolumes(true),
expectBackup: false,
},
{
name: "backup with SnapshotVolumes when allowSnapshots=true gets executed",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithSnapshotVolumes(true),
allowSnapshots: true,
expectBackup: true,
},
{
name: "Backup without a location will have it set to the default",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew),
expectBackup: true,
},
{
name: "Backup with a location completes",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc1"),
expectBackup: true,
},
{
name: "Backup with non-existent location will fail validation",
key: "heptio-ark/backup1",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithStorageLocation("loc2"),
expectBackup: false,
name: "Failed backup is not processed",
key: "heptio-ark/backup-1",
backup: arktest.NewTestBackup().WithName("backup-1").WithPhase(v1.BackupPhaseFailed).Backup,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
client = fake.NewSimpleClientset()
backupper = &fakeBackupper{}
sharedInformers = informers.NewSharedInformerFactory(client, 0)
sharedInformers = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
logger = logging.DefaultLogger(logrus.DebugLevel)
clockTime, _ = time.Parse("Mon Jan 2 15:04:05 2006", "Mon Jan 2 15:04:05 2006")
pluginManager = &pluginmocks.Manager{}
backupStore = &persistencemocks.BackupStore{}
)
defer backupper.AssertExpectations(t)
defer pluginManager.AssertExpectations(t)
defer backupStore.AssertExpectations(t)
c := NewBackupController(
sharedInformers.Ark().V1().Backups(),
client.ArkV1(),
backupper,
test.allowSnapshots,
logger,
logrus.InfoLevel,
func(logrus.FieldLogger) plugin.Manager { return pluginManager },
NewBackupTracker(),
sharedInformers.Ark().V1().BackupStorageLocations(),
"default",
sharedInformers.Ark().V1().VolumeSnapshotLocations(),
test.defaultLocations,
metrics.NewServerMetrics(),
).(*backupController)
c.clock = clock.NewFakeClock(clockTime)
c.newBackupStore = func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
c := &backupController{
genericController: newGenericController("backup-test", logger),
lister: sharedInformers.Ark().V1().Backups().Lister(),
}
var expiration, startTime time.Time
if test.backup != nil {
// add directly to the informer's store so the lister can function and so we don't have to
// start the shared informers.
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup.Backup)
startTime = c.clock.Now()
if test.backup.Spec.TTL.Duration > 0 {
expiration = c.clock.Now().Add(test.backup.Spec.TTL.Duration)
}
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup))
}
if test.expectBackup {
// set up a Backup object to represent what we expect to be passed to backupper.Backup()
backup := test.backup.DeepCopy()
backup.Spec.IncludedResources = test.expectedIncludes
backup.Spec.ExcludedResources = test.expectedExcludes
backup.Spec.IncludedNamespaces = test.backup.Spec.IncludedNamespaces
backup.Spec.SnapshotVolumes = test.backup.Spec.SnapshotVolumes
backup.Status.Phase = v1.BackupPhaseInProgress
backup.Status.Expiration.Time = expiration
backup.Status.StartTimestamp.Time = startTime
backup.Status.Version = 1
backupper.On("Backup",
mock.Anything, // logger
backup,
mock.Anything, // backup file
mock.Anything, // actions
).Return(nil)
defaultLocation := &v1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: "default",
},
Spec: v1.BackupStorageLocationSpec{
Provider: "myCloud",
StorageType: v1.StorageType{
ObjectStorage: &v1.ObjectStorageLocation{
Bucket: "bucket",
},
},
},
}
loc1 := &v1.BackupStorageLocation{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
Name: "loc1",
},
Spec: v1.BackupStorageLocationSpec{
Provider: "myCloud",
StorageType: v1.StorageType{
ObjectStorage: &v1.ObjectStorageLocation{
Bucket: "bucket",
},
},
},
}
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(defaultLocation))
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(loc1))
pluginManager.On("GetBackupItemActions").Return(nil, nil)
// Ensure we have a CompletionTimestamp when uploading.
// Failures will display the bytes in buf.
completionTimestampIsPresent := func(buf *bytes.Buffer) bool {
json := buf.String()
timeString := `"completionTimestamp": "2006-01-02T15:04:05Z"`
return strings.Contains(json, timeString)
}
backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything).Return(nil)
pluginManager.On("CleanupClients").Return()
}
// this is necessary so the Patch() call returns the appropriate object
client.PrependReactor("patch", "backups", func(action core.Action) (bool, runtime.Object, error) {
if test.backup == nil {
return true, nil, nil
}
patch := action.(core.PatchAction).GetPatch()
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patch, &patchMap); err != nil {
t.Logf("error unmarshalling patch: %s\n", err)
return false, nil, err
}
phase, err := collections.GetString(patchMap, "status.phase")
if err != nil {
t.Logf("error getting status.phase: %s\n", err)
return false, nil, err
}
res := test.backup.DeepCopy()
// these are the fields that we expect to be set by
// the controller
res.Status.Version = 1
res.Status.Expiration.Time = expiration
res.Status.Phase = v1.BackupPhase(phase)
// If there's an error, it's mostly likely that the key wasn't found
// which is fine since not all patches will have them.
completionString, err := collections.GetString(patchMap, "status.completionTimestamp")
if err == nil {
completionTime, err := time.Parse(time.RFC3339Nano, completionString)
require.NoError(t, err, "unexpected completionTimestamp parsing error %v", err)
res.Status.CompletionTimestamp.Time = completionTime
}
startString, err := collections.GetString(patchMap, "status.startTimestamp")
if err == nil {
startTime, err := time.Parse(time.RFC3339Nano, startString)
require.NoError(t, err, "unexpected startTimestamp parsing error %v", err)
res.Status.StartTimestamp.Time = startTime
}
return true, res, nil
})
// method under test
err := c.processBackup(test.key)
if test.expectError {
require.Error(t, err, "processBackup should error")
return
}
require.NoError(t, err, "processBackup unexpected error: %v", err)
if !test.expectBackup {
// the AssertExpectations calls above make sure we aren't running anything we shouldn't be
return
}
actions := client.Actions()
require.Equal(t, 2, len(actions))
// structs and func for decoding patch content
type StatusPatch struct {
Expiration time.Time `json:"expiration"`
Version int `json:"version"`
Phase v1.BackupPhase `json:"phase"`
StartTimestamp metav1.Time `json:"startTimestamp"`
CompletionTimestamp metav1.Time `json:"completionTimestamp"`
}
type SpecPatch struct {
StorageLocation string `json:"storageLocation"`
}
type ObjectMetaPatch struct {
Labels map[string]string `json:"labels"`
}
type Patch struct {
Status StatusPatch `json:"status"`
Spec SpecPatch `json:"spec,omitempty"`
ObjectMeta ObjectMetaPatch `json:"metadata,omitempty"`
}
decode := func(decoder *json.Decoder) (interface{}, error) {
actual := new(Patch)
err := decoder.Decode(actual)
return *actual, err
}
// validate Patch call 1 (setting version, expiration, phase, and storage location)
var expected Patch
if test.backup.Spec.StorageLocation == "" {
expected = Patch{
Status: StatusPatch{
Version: 1,
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
Spec: SpecPatch{
StorageLocation: "default",
},
ObjectMeta: ObjectMetaPatch{
Labels: map[string]string{
v1.StorageLocationLabel: "default",
},
},
}
if test.expectedErr != "" {
require.Error(t, err)
assert.Equal(t, test.expectedErr, err.Error())
} else {
expected = Patch{
Status: StatusPatch{
Version: 1,
Phase: v1.BackupPhaseInProgress,
Expiration: expiration,
},
ObjectMeta: ObjectMetaPatch{
Labels: map[string]string{
v1.StorageLocationLabel: test.backup.Spec.StorageLocation,
},
},
}
assert.Nil(t, err)
}
arktest.ValidatePatch(t, actions[0], expected, decode)
// validate Patch call 2 (setting phase, startTimestamp, completionTimestamp)
expected = Patch{
Status: StatusPatch{
Phase: v1.BackupPhaseCompleted,
StartTimestamp: metav1.Time{Time: c.clock.Now()},
CompletionTimestamp: metav1.Time{Time: c.clock.Now()},
},
}
arktest.ValidatePatch(t, actions[1], expected, decode)
// Any backup that would actually proceed to validation will cause a segfault because this
// test hasn't set up the necessary controller dependencies for validation/etc. So the lack
// of segfaults during test execution here imply that backups are not being processed, which
// is what we expect.
})
}
}
func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
defaultLocationsAWS := map[string]string{"aws": "aws-us-east-2"}
defaultLocationsFake := map[string]string{"fake-provider": "some-name"}
func TestProcessBackupValidationFailures(t *testing.T) {
defaultBackupLocation := arktest.NewTestBackupStorageLocation().WithName("loc-1").BackupStorageLocation
tests := []struct {
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedErrs []string
}{
{
name: "invalid included/excluded resources fails validation",
backup: arktest.NewTestBackup().WithName("backup-1").WithIncludedResources("foo").WithExcludedResources("foo").Backup,
backupLocation: defaultBackupLocation,
expectedErrs: []string{"Invalid included/excluded resource lists: excludes list cannot contain an item in the includes list: foo"},
},
{
name: "invalid included/excluded namespaces fails validation",
backup: arktest.NewTestBackup().WithName("backup-1").WithIncludedNamespaces("foo").WithExcludedNamespaces("foo").Backup,
backupLocation: defaultBackupLocation,
expectedErrs: []string{"Invalid included/excluded namespace lists: excludes list cannot contain an item in the includes list: foo"},
},
{
name: "non-existent backup location fails validation",
backup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("nonexistent").Backup,
expectedErrs: []string{"Error getting backup storage location: backupstoragelocation.ark.heptio.com \"nonexistent\" not found"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
clientset = fake.NewSimpleClientset(test.backup)
sharedInformers = informers.NewSharedInformerFactory(clientset, 0)
logger = logging.DefaultLogger(logrus.DebugLevel)
)
c := &backupController{
genericController: newGenericController("backup-test", logger),
client: clientset.ArkV1(),
lister: sharedInformers.Ark().V1().Backups().Lister(),
backupLocationLister: sharedInformers.Ark().V1().BackupStorageLocations().Lister(),
defaultBackupLocation: defaultBackupLocation.Name,
}
require.NotNil(t, test.backup)
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup))
if test.backupLocation != nil {
_, err := clientset.ArkV1().BackupStorageLocations(test.backupLocation.Namespace).Create(test.backupLocation)
require.NoError(t, err)
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(test.backupLocation))
}
require.NoError(t, c.processBackup(fmt.Sprintf("%s/%s", test.backup.Namespace, test.backup.Name)))
res, err := clientset.ArkV1().Backups(test.backup.Namespace).Get(test.backup.Name, metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, v1.BackupPhaseFailedValidation, res.Status.Phase)
assert.Equal(t, test.expectedErrs, res.Status.ValidationErrors)
// Any backup that would actually proceed to processing will cause a segfault because this
// test hasn't set up the necessary controller dependencies for running backups. So the lack
// of segfaults during test execution here imply that backups are not being processed, which
// is what we expect.
})
}
}
func TestProcessBackupCompletions(t *testing.T) {
defaultBackupLocation := arktest.NewTestBackupStorageLocation().WithName("loc-1").BackupStorageLocation
now, err := time.Parse(time.RFC1123Z, time.RFC1123Z)
require.NoError(t, err)
now = now.Local()
tests := []struct {
name string
backup *v1.Backup
backupLocation *v1.BackupStorageLocation
expectedResult *v1.Backup
}{
{
name: "backup with no backup location gets the default",
backup: arktest.NewTestBackup().WithName("backup-1").Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"ark.heptio.com/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseCompleted,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
},
},
},
{
name: "backup with a specific backup location keeps it",
backup: arktest.NewTestBackup().WithName("backup-1").WithStorageLocation("alt-loc").Backup,
backupLocation: arktest.NewTestBackupStorageLocation().WithName("alt-loc").BackupStorageLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"ark.heptio.com/storage-location": "alt-loc",
},
},
Spec: v1.BackupSpec{
StorageLocation: "alt-loc",
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseCompleted,
Version: 1,
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
},
},
},
{
name: "backup with a TTL has expiration set",
backup: arktest.NewTestBackup().WithName("backup-1").WithTTL(10 * time.Minute).Backup,
backupLocation: defaultBackupLocation,
expectedResult: &v1.Backup{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.DefaultNamespace,
Name: "backup-1",
Labels: map[string]string{
"ark.heptio.com/storage-location": "loc-1",
},
},
Spec: v1.BackupSpec{
TTL: metav1.Duration{Duration: 10 * time.Minute},
StorageLocation: defaultBackupLocation.Name,
},
Status: v1.BackupStatus{
Phase: v1.BackupPhaseCompleted,
Version: 1,
Expiration: metav1.NewTime(now.Add(10 * time.Minute)),
StartTimestamp: metav1.NewTime(now),
CompletionTimestamp: metav1.NewTime(now),
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var (
clientset = fake.NewSimpleClientset(test.backup)
sharedInformers = informers.NewSharedInformerFactory(clientset, 0)
logger = logging.DefaultLogger(logrus.DebugLevel)
pluginManager = new(pluginmocks.Manager)
backupStore = new(persistencemocks.BackupStore)
backupper = new(fakeBackupper)
)
c := &backupController{
genericController: newGenericController("backup-test", logger),
client: clientset.ArkV1(),
lister: sharedInformers.Ark().V1().Backups().Lister(),
backupLocationLister: sharedInformers.Ark().V1().BackupStorageLocations().Lister(),
defaultBackupLocation: defaultBackupLocation.Name,
backupTracker: NewBackupTracker(),
metrics: metrics.NewServerMetrics(),
clock: clock.NewFakeClock(now),
newPluginManager: func(logrus.FieldLogger) plugin.Manager { return pluginManager },
newBackupStore: func(*v1.BackupStorageLocation, persistence.ObjectStoreGetter, logrus.FieldLogger) (persistence.BackupStore, error) {
return backupStore, nil
},
backupper: backupper,
}
pluginManager.On("GetBackupItemActions").Return(nil, nil)
pluginManager.On("CleanupClients").Return(nil)
backupper.On("Backup", mock.Anything, mock.Anything, mock.Anything, []pkgbackup.ItemAction(nil), pluginManager).Return(nil)
// Ensure we have a CompletionTimestamp when uploading.
// Failures will display the bytes in buf.
completionTimestampIsPresent := func(buf *bytes.Buffer) bool {
return strings.Contains(buf.String(), `"completionTimestamp": "2006-01-02T22:04:05Z"`)
}
backupStore.On("PutBackup", test.backup.Name, mock.MatchedBy(completionTimestampIsPresent), mock.Anything, mock.Anything).Return(nil)
// add the test's backup to the informer/lister store
require.NotNil(t, test.backup)
require.NoError(t, sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup))
// add the default backup storage location to the clientset and the informer/lister store
_, err := clientset.ArkV1().BackupStorageLocations(defaultBackupLocation.Namespace).Create(defaultBackupLocation)
require.NoError(t, err)
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(defaultBackupLocation))
// add the test's backup storage location to the clientset and the informer/lister store
// if it's different than the default
if test.backupLocation != nil && test.backupLocation != defaultBackupLocation {
_, err := clientset.ArkV1().BackupStorageLocations(test.backupLocation.Namespace).Create(test.backupLocation)
require.NoError(t, err)
require.NoError(t, sharedInformers.Ark().V1().BackupStorageLocations().Informer().GetStore().Add(test.backupLocation))
}
require.NoError(t, c.processBackup(fmt.Sprintf("%s/%s", test.backup.Namespace, test.backup.Name)))
res, err := clientset.ArkV1().Backups(test.backup.Namespace).Get(test.backup.Name, metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, test.expectedResult, res)
})
}
}
func TestValidateAndGetSnapshotLocations(t *testing.T) {
defaultLocationsAWS := map[string]*v1.VolumeSnapshotLocation{
"aws": arktest.NewTestVolumeSnapshotLocation().WithName("aws-us-east-2").VolumeSnapshotLocation,
}
defaultLocationsFake := map[string]*v1.VolumeSnapshotLocation{
"fake-provider": arktest.NewTestVolumeSnapshotLocation().WithName("some-name").VolumeSnapshotLocation,
}
multipleLocationNames := []string{"aws-us-west-1", "aws-us-east-1"}
@@ -463,7 +391,7 @@ func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
name string
backup *arktest.TestBackup
locations []*arktest.TestVolumeSnapshotLocation
defaultLocations map[string]string
defaultLocations map[string]*v1.VolumeSnapshotLocation
expectedVolumeSnapshotLocationNames []string // adding these in the expected order will allow to test with better msgs in case of a test failure
expectedErrors string
expectedSuccess bool
@@ -493,7 +421,7 @@ func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
name: "no location name for the provider exists: the provider's default should be added",
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew),
defaultLocations: defaultLocationsAWS,
expectedVolumeSnapshotLocationNames: []string{defaultLocationsAWS["aws"]},
expectedVolumeSnapshotLocationNames: []string{defaultLocationsAWS["aws"].Name},
expectedSuccess: true,
},
{
@@ -506,7 +434,7 @@ func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
backup: arktest.NewTestBackup().WithName("backup1").WithPhase(v1.BackupPhaseNew).WithVolumeSnapshotLocations(dupLocationNames),
locations: arktest.NewTestVolumeSnapshotLocation().WithName(dupLocationNames[0]).WithProviderConfig(dupLocationList),
defaultLocations: defaultLocationsFake,
expectedVolumeSnapshotLocationNames: []string{dupLocationNames[0], defaultLocationsFake["fake-provider"]},
expectedVolumeSnapshotLocationNames: []string{dupLocationNames[0], defaultLocationsFake["fake-provider"].Name},
expectedSuccess: true,
},
}
@@ -519,7 +447,8 @@ func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
)
c := &backupController{
snapshotLocationLister: sharedInformers.Ark().V1().VolumeSnapshotLocations().Lister(),
snapshotLocationLister: sharedInformers.Ark().V1().VolumeSnapshotLocations().Lister(),
defaultSnapshotLocations: test.defaultLocations,
}
// set up a Backup object to represent what we expect to be passed to backupper.Backup()
@@ -529,15 +458,23 @@ func TestDefaultAndValidateSnapshotLocations(t *testing.T) {
require.NoError(t, sharedInformers.Ark().V1().VolumeSnapshotLocations().Informer().GetStore().Add(location.VolumeSnapshotLocation))
}
errs := c.defaultAndValidateSnapshotLocations(backup, test.defaultLocations)
providerLocations, errs := c.validateAndGetSnapshotLocations(backup)
if test.expectedSuccess {
for _, err := range errs {
require.NoError(t, errors.New(err), "defaultAndValidateSnapshotLocations unexpected error: %v", err)
require.NoError(t, errors.New(err), "validateAndGetSnapshotLocations unexpected error: %v", err)
}
require.Equal(t, test.expectedVolumeSnapshotLocationNames, backup.Spec.VolumeSnapshotLocations)
var locations []string
for _, loc := range providerLocations {
locations = append(locations, loc.Name)
}
sort.Strings(test.expectedVolumeSnapshotLocationNames)
sort.Strings(locations)
require.Equal(t, test.expectedVolumeSnapshotLocationNames, locations)
} else {
if len(errs) == 0 {
require.Error(t, nil, "defaultAndValidateSnapshotLocations expected error")
require.Error(t, nil, "validateAndGetSnapshotLocations expected error")
}
require.Contains(t, errs, test.expectedErrors)
}

View File

@@ -104,10 +104,6 @@ func (bs *FakeBlockStore) GetVolumeInfo(volumeID, volumeAZ string) (string, *int
}
func (bs *FakeBlockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {
if bs.Error != nil {
return "", bs.Error
}
return bs.VolumeID, nil
}