diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 623e829fe..87dd6a65b 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -26,12 +26,13 @@ import ( "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" kuberrs "k8s.io/apimachinery/pkg/util/errors" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" kubeutil "github.com/heptio/ark/pkg/util/kube" @@ -42,32 +43,16 @@ import ( type Backupper interface { // Backup takes a backup using the specification in the api.Backup and writes backup and log data // to the given writers. - Backup(backup *api.Backup, backupFile, logFile io.Writer) error + Backup(backup *api.Backup, backupFile, logFile io.Writer, actions []ItemAction) error } // kubernetesBackupper implements Backupper. type kubernetesBackupper struct { - dynamicFactory client.DynamicFactory - discoveryHelper discovery.Helper - actions map[schema.GroupResource]Action - podCommandExecutor podCommandExecutor - + dynamicFactory client.DynamicFactory + discoveryHelper discovery.Helper + podCommandExecutor podCommandExecutor groupBackupperFactory groupBackupperFactory -} - -// ResourceIdentifier describes a single item by its group, resource, namespace, and name. -type ResourceIdentifier struct { - schema.GroupResource - Namespace string - Name string -} - -// Action is an actor that performs an operation on an individual item being backed up. -type Action interface { - // Execute allows the Action to perform arbitrary logic with the item being backed up and the - // backup itself. Implementations may return additional ResourceIdentifiers that indicate specific - // items that also need to be backed up. - Execute(log *logrus.Entry, item runtime.Unstructured, backup *api.Backup) ([]ResourceIdentifier, error) + snapshotService cloudprovider.SnapshotService } type itemKey struct { @@ -76,6 +61,20 @@ type itemKey struct { name string } +type resolvedAction struct { + ItemAction + + resourceIncludesExcludes *collections.IncludesExcludes + namespaceIncludesExcludes *collections.IncludesExcludes + selector labels.Selector +} + +// LogSetter is an interface for a type that allows a FieldLogger +// to be set on it. +type LogSetter interface { + SetLog(logrus.FieldLogger) +} + func (i *itemKey) String() string { return fmt.Sprintf("resource=%s,namespace=%s,name=%s", i.resource, i.namespace, i.name) } @@ -84,38 +83,48 @@ func (i *itemKey) String() string { func NewKubernetesBackupper( discoveryHelper discovery.Helper, dynamicFactory client.DynamicFactory, - actions map[string]Action, podCommandExecutor podCommandExecutor, + snapshotService cloudprovider.SnapshotService, ) (Backupper, error) { - resolvedActions, err := resolveActions(discoveryHelper, actions) - if err != nil { - return nil, err - } - return &kubernetesBackupper{ - discoveryHelper: discoveryHelper, - dynamicFactory: dynamicFactory, - actions: resolvedActions, - podCommandExecutor: podCommandExecutor, - + discoveryHelper: discoveryHelper, + dynamicFactory: dynamicFactory, + podCommandExecutor: podCommandExecutor, groupBackupperFactory: &defaultGroupBackupperFactory{}, + snapshotService: snapshotService, }, nil } -// resolveActions resolves the string-based map of group-resources to actions and returns a map of -// schema.GroupResources to actions. -func resolveActions(helper discovery.Helper, actions map[string]Action) (map[schema.GroupResource]Action, error) { - ret := make(map[schema.GroupResource]Action) +func resolveActions(actions []ItemAction, helper discovery.Helper) ([]resolvedAction, error) { + var resolved []resolvedAction - for resource, action := range actions { - gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(resource).WithVersion("")) + for _, action := range actions { + resourceSelector, err := action.AppliesTo() if err != nil { return nil, err } - ret[gvr.GroupResource()] = action + + resources := getResourceIncludesExcludes(helper, resourceSelector.IncludedResources, resourceSelector.ExcludedResources) + namespaces := collections.NewIncludesExcludes().Includes(resourceSelector.IncludedNamespaces...).Excludes(resourceSelector.ExcludedNamespaces...) + + selector := labels.Everything() + if resourceSelector.LabelSelector != "" { + if selector, err = labels.Parse(resourceSelector.LabelSelector); err != nil { + return nil, err + } + } + + res := resolvedAction{ + ItemAction: action, + resourceIncludesExcludes: resources, + namespaceIncludesExcludes: namespaces, + selector: selector, + } + + resolved = append(resolved, res) } - return ret, nil + return resolved, nil } // getResourceIncludesExcludes takes the lists of resources to include and exclude, uses the @@ -172,7 +181,7 @@ func getResourceHooks(hookSpecs []api.BackupResourceHookSpec, discoveryHelper di // Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file // written to backupFile. The finalized api.Backup is written to metadata. -func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io.Writer) error { +func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io.Writer, actions []ItemAction) error { gzippedData := gzip.NewWriter(backupFile) defer gzippedData.Close() @@ -215,6 +224,11 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io "networkpolicies": newCohabitatingResource("networkpolicies", "extensions", "networking.k8s.io"), } + resolvedActions, err := resolveActions(actions, kb.discoveryHelper) + if err != nil { + return err + } + gb := kb.groupBackupperFactory.newGroupBackupper( log, backup, @@ -225,10 +239,11 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io kb.discoveryHelper, backedUpItems, cohabitatingResources, - kb.actions, + resolvedActions, kb.podCommandExecutor, tw, resourceHooks, + kb.snapshotService, ) for _, group := range kb.discoveryHelper.Resources() { diff --git a/pkg/backup/backup_pv_action.go b/pkg/backup/backup_pv_action.go index 41930925b..bd6b7d9be 100644 --- a/pkg/backup/backup_pv_action.go +++ b/pkg/backup/backup_pv_action.go @@ -30,25 +30,33 @@ import ( // backupPVAction inspects a PersistentVolumeClaim for the PersistentVolume // that it references and backs it up type backupPVAction struct { + log logrus.FieldLogger } -func NewBackupPVAction() Action { - return &backupPVAction{} +func NewBackupPVAction(log logrus.FieldLogger) ItemAction { + return &backupPVAction{log: log} } var pvGroupResource = schema.GroupResource{Group: "", Resource: "persistentvolumes"} +func (a *backupPVAction) AppliesTo() (ResourceSelector, error) { + return ResourceSelector{ + IncludedResources: []string{"persistentvolumeclaims"}, + }, nil +} + // Execute finds the PersistentVolume referenced by the provided // PersistentVolumeClaim and backs it up -func (a *backupPVAction) Execute(log *logrus.Entry, item runtime.Unstructured, backup *v1.Backup) ([]ResourceIdentifier, error) { - log.Info("Executing backupPVAction") +func (a *backupPVAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []ResourceIdentifier, error) { + a.log.Info("Executing backupPVAction") + var additionalItems []ResourceIdentifier pvc := item.UnstructuredContent() volumeName, err := collections.GetString(pvc, "spec.volumeName") if err != nil { - return additionalItems, errors.WithMessage(err, "unable to get spec.volumeName") + return nil, nil, errors.WithMessage(err, "unable to get spec.volumeName") } additionalItems = append(additionalItems, ResourceIdentifier{ @@ -56,5 +64,5 @@ func (a *backupPVAction) Execute(log *logrus.Entry, item runtime.Unstructured, b Name: volumeName, }) - return additionalItems, nil + return item, additionalItems, nil } diff --git a/pkg/backup/backup_pv_action_test.go b/pkg/backup/backup_pv_action_test.go index bfca95c1a..82eae87fc 100644 --- a/pkg/backup/backup_pv_action_test.go +++ b/pkg/backup/backup_pv_action_test.go @@ -35,13 +35,13 @@ func TestBackupPVAction(t *testing.T) { backup := &v1.Backup{} - a := NewBackupPVAction() + a := NewBackupPVAction(arktest.NewLogger()) - additional, err := a.Execute(arktest.NewLogger(), pvc, backup) + _, additional, err := a.Execute(pvc, backup) assert.EqualError(t, err, "unable to get spec.volumeName: key volumeName not found") pvc.Object["spec"].(map[string]interface{})["volumeName"] = "myVolume" - additional, err = a.Execute(arktest.NewLogger(), pvc, backup) + _, additional, err = a.Execute(pvc, backup) require.NoError(t, err) require.Len(t, additional, 1) assert.Equal(t, ResourceIdentifier{GroupResource: pvGroupResource, Name: "myVolume"}, additional[0]) diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index c0a3dbaa6..672e47d0d 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -41,6 +41,7 @@ import ( "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" kubeutil "github.com/heptio/ark/pkg/util/kube" @@ -55,49 +56,73 @@ var ( ) type fakeAction struct { + selector ResourceSelector ids []string - backups []*v1.Backup + backups []v1.Backup additionalItems []ResourceIdentifier } -var _ Action = &fakeAction{} +var _ ItemAction = &fakeAction{} -func (a *fakeAction) Execute(log *logrus.Entry, item runtime.Unstructured, backup *v1.Backup) ([]ResourceIdentifier, error) { +func newFakeAction(resource string) *fakeAction { + return (&fakeAction{}).ForResource(resource) +} + +func (a *fakeAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []ResourceIdentifier, error) { metadata, err := meta.Accessor(item) if err != nil { - return a.additionalItems, err + return item, a.additionalItems, err } a.ids = append(a.ids, kubeutil.NamespaceAndName(metadata)) - a.backups = append(a.backups, backup) + a.backups = append(a.backups, *backup) - return a.additionalItems, nil + return item, a.additionalItems, nil +} + +func (a *fakeAction) AppliesTo() (ResourceSelector, error) { + return a.selector, nil +} + +func (a *fakeAction) ForResource(resource string) *fakeAction { + a.selector.IncludedResources = []string{resource} + return a } func TestResolveActions(t *testing.T) { tests := []struct { name string - input map[string]Action - expected map[schema.GroupResource]Action + input []ItemAction + expected []resolvedAction resourcesWithErrors []string expectError bool }{ { name: "empty input", - input: map[string]Action{}, - expected: map[schema.GroupResource]Action{}, + input: []ItemAction{}, + expected: nil, }, { - name: "mapper error", - input: map[string]Action{"badresource": &fakeAction{}}, - expected: map[schema.GroupResource]Action{}, + name: "resolve error", + input: []ItemAction{&fakeAction{selector: ResourceSelector{LabelSelector: "=invalid-selector"}}}, + expected: nil, expectError: true, }, { name: "resolved", - input: map[string]Action{"foo": &fakeAction{}, "bar": &fakeAction{}}, - expected: map[schema.GroupResource]Action{ - {Group: "somegroup", Resource: "foodies"}: &fakeAction{}, - {Group: "anothergroup", Resource: "barnacles"}: &fakeAction{}, + input: []ItemAction{newFakeAction("foo"), newFakeAction("bar")}, + expected: []resolvedAction{ + { + ItemAction: newFakeAction("foo"), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("foodies.somegroup"), + namespaceIncludesExcludes: collections.NewIncludesExcludes(), + selector: labels.Everything(), + }, + { + ItemAction: newFakeAction("bar"), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("barnacles.anothergroup"), + namespaceIncludesExcludes: collections.NewIncludesExcludes(), + selector: labels.Everything(), + }, }, }, } @@ -112,7 +137,7 @@ func TestResolveActions(t *testing.T) { } discoveryHelper := arktest.NewFakeDiscoveryHelper(false, resources) - actual, err := resolveActions(discoveryHelper, test.input) + actual, err := resolveActions(test.input, discoveryHelper) gotError := err != nil if e, a := test.expectError, gotError; e != a { @@ -349,7 +374,6 @@ func TestBackup(t *testing.T) { tests := []struct { name string backup *v1.Backup - actions map[string]Action expectedNamespaces *collections.IncludesExcludes expectedResources *collections.IncludesExcludes expectedLabelSelector string @@ -369,7 +393,6 @@ func TestBackup(t *testing.T) { ExcludedNamespaces: []string{"c", "d"}, }, }, - actions: map[string]Action{}, expectedNamespaces: collections.NewIncludesExcludes().Includes("a", "b").Excludes("c", "d"), expectedResources: collections.NewIncludesExcludes().Includes("configmaps", "certificatesigningrequests.certificates.k8s.io", "roles.rbac.authorization.k8s.io"), expectedHooks: []resourceHook{}, @@ -388,7 +411,6 @@ func TestBackup(t *testing.T) { }, }, }, - actions: map[string]Action{}, expectedNamespaces: collections.NewIncludesExcludes(), expectedResources: collections.NewIncludesExcludes(), expectedHooks: []resourceHook{}, @@ -402,7 +424,6 @@ func TestBackup(t *testing.T) { { name: "backupGroup errors", backup: &v1.Backup{}, - actions: map[string]Action{}, expectedNamespaces: collections.NewIncludesExcludes(), expectedResources: collections.NewIncludesExcludes(), expectedHooks: []resourceHook{}, @@ -440,7 +461,6 @@ func TestBackup(t *testing.T) { }, }, }, - actions: map[string]Action{}, expectedNamespaces: collections.NewIncludesExcludes(), expectedResources: collections.NewIncludesExcludes(), expectedHooks: []resourceHook{ @@ -491,8 +511,8 @@ func TestBackup(t *testing.T) { b, err := NewKubernetesBackupper( discoveryHelper, dynamicFactory, - test.actions, podCommandExecutor, + nil, ) require.NoError(t, err) kb := b.(*kubernetesBackupper) @@ -519,10 +539,11 @@ func TestBackup(t *testing.T) { discoveryHelper, map[itemKey]struct{}{}, // backedUpItems cohabitatingResources, - kb.actions, + mock.Anything, kb.podCommandExecutor, mock.Anything, // tarWriter test.expectedHooks, + mock.Anything, ).Return(groupBackupper) for group, err := range test.backupGroupErrors { @@ -531,7 +552,7 @@ func TestBackup(t *testing.T) { var backupFile, logFile bytes.Buffer - err = b.Backup(test.backup, &backupFile, &logFile) + err = b.Backup(test.backup, &backupFile, &logFile, nil) defer func() { // print log if anything failed if t.Failed() { @@ -560,7 +581,7 @@ type mockGroupBackupperFactory struct { } func (f *mockGroupBackupperFactory) newGroupBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *v1.Backup, namespaces, resources *collections.IncludesExcludes, labelSelector string, @@ -568,10 +589,11 @@ func (f *mockGroupBackupperFactory) newGroupBackupper( discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) groupBackupper { args := f.Called( log, @@ -587,6 +609,7 @@ func (f *mockGroupBackupperFactory) newGroupBackupper( podCommandExecutor, tarWriter, resourceHooks, + snapshotService, ) return args.Get(0).(groupBackupper) } diff --git a/pkg/backup/group_backupper.go b/pkg/backup/group_backupper.go index 877dd38b6..f148a8f98 100644 --- a/pkg/backup/group_backupper.go +++ b/pkg/backup/group_backupper.go @@ -19,19 +19,21 @@ package backup import ( "strings" + "github.com/sirupsen/logrus" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kuberrs "k8s.io/apimachinery/pkg/util/errors" + "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" - "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - kuberrs "k8s.io/apimachinery/pkg/util/errors" ) type groupBackupperFactory interface { newGroupBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *v1.Backup, namespaces, resources *collections.IncludesExcludes, labelSelector string, @@ -39,17 +41,18 @@ type groupBackupperFactory interface { discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) groupBackupper } type defaultGroupBackupperFactory struct{} func (f *defaultGroupBackupperFactory) newGroupBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *v1.Backup, namespaces, resources *collections.IncludesExcludes, labelSelector string, @@ -57,26 +60,27 @@ func (f *defaultGroupBackupperFactory) newGroupBackupper( discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) groupBackupper { return &defaultGroupBackupper{ - log: log, - backup: backup, - namespaces: namespaces, - resources: resources, - labelSelector: labelSelector, - dynamicFactory: dynamicFactory, - discoveryHelper: discoveryHelper, - backedUpItems: backedUpItems, - cohabitatingResources: cohabitatingResources, - actions: actions, - podCommandExecutor: podCommandExecutor, - tarWriter: tarWriter, - resourceHooks: resourceHooks, - + log: log, + backup: backup, + namespaces: namespaces, + resources: resources, + labelSelector: labelSelector, + dynamicFactory: dynamicFactory, + discoveryHelper: discoveryHelper, + backedUpItems: backedUpItems, + cohabitatingResources: cohabitatingResources, + actions: actions, + podCommandExecutor: podCommandExecutor, + tarWriter: tarWriter, + resourceHooks: resourceHooks, + snapshotService: snapshotService, resourceBackupperFactory: &defaultResourceBackupperFactory{}, } } @@ -86,7 +90,7 @@ type groupBackupper interface { } type defaultGroupBackupper struct { - log *logrus.Entry + log logrus.FieldLogger backup *v1.Backup namespaces, resources *collections.IncludesExcludes labelSelector string @@ -94,10 +98,11 @@ type defaultGroupBackupper struct { discoveryHelper discovery.Helper backedUpItems map[itemKey]struct{} cohabitatingResources map[string]*cohabitatingResource - actions map[schema.GroupResource]Action + actions []resolvedAction podCommandExecutor podCommandExecutor tarWriter tarWriter resourceHooks []resourceHook + snapshotService cloudprovider.SnapshotService resourceBackupperFactory resourceBackupperFactory } @@ -121,6 +126,7 @@ func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) erro gb.podCommandExecutor, gb.tarWriter, gb.resourceHooks, + gb.snapshotService, ) ) diff --git a/pkg/backup/group_backupper_test.go b/pkg/backup/group_backupper_test.go index e345750dd..f150b442a 100644 --- a/pkg/backup/group_backupper_test.go +++ b/pkg/backup/group_backupper_test.go @@ -21,6 +21,7 @@ import ( "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" arktest "github.com/heptio/ark/pkg/util/test" @@ -56,8 +57,11 @@ func TestBackupGroup(t *testing.T) { }, } - actions := map[schema.GroupResource]Action{ - {Group: "", Resource: "pods"}: &fakeAction{}, + actions := []resolvedAction{ + { + ItemAction: newFakeAction("pods"), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"), + }, } podCommandExecutor := &mockPodCommandExecutor{} @@ -83,6 +87,7 @@ func TestBackupGroup(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).(*defaultGroupBackupper) resourceBackupperFactory := &mockResourceBackupperFactory{} @@ -106,6 +111,7 @@ func TestBackupGroup(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).Return(resourceBackupper) group := &metav1.APIResourceList{ @@ -140,7 +146,7 @@ type mockResourceBackupperFactory struct { } func (rbf *mockResourceBackupperFactory) newResourceBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *v1.Backup, namespaces *collections.IncludesExcludes, resources *collections.IncludesExcludes, @@ -149,10 +155,11 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper( discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) resourceBackupper { args := rbf.Called( log, @@ -168,6 +175,7 @@ func (rbf *mockResourceBackupperFactory) newResourceBackupper( podCommandExecutor, tarWriter, resourceHooks, + snapshotService, ) return args.Get(0).(resourceBackupper) } diff --git a/pkg/backup/item_action.go b/pkg/backup/item_action.go new file mode 100644 index 000000000..fab745a1e --- /dev/null +++ b/pkg/backup/item_action.go @@ -0,0 +1,37 @@ +package backup + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + api "github.com/heptio/ark/pkg/apis/ark/v1" +) + +// ItemAction is an actor that performs an operation on an individual item being backed up. +type ItemAction interface { + // AppliesTo returns information about which resources this action should be invoked for. + AppliesTo() (ResourceSelector, error) + + // Execute allows the ItemAction to perform arbitrary logic with the item being backed up and the + // backup itself. Implementations may return additional ResourceIdentifiers that indicate specific + // items that also need to be backed up. + Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []ResourceIdentifier, error) +} + +// ResourceIdentifier describes a single item by its group, resource, namespace, and name. +type ResourceIdentifier struct { + schema.GroupResource + Namespace string + Name string +} + +// ResourceSelector is a collection of included/excluded namespaces, +// included/excluded resources, and a label-selector that can be used +// to match a set of items from a cluster. +type ResourceSelector struct { + IncludedNamespaces []string + ExcludedNamespaces []string + IncludedResources []string + ExcludedResources []string + LabelSelector string +} diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index fbca75ae2..6de912e32 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -22,16 +22,21 @@ import ( "path/filepath" "time" - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/client" - "github.com/heptio/ark/pkg/discovery" - "github.com/heptio/ark/pkg/util/collections" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + + api "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" + "github.com/heptio/ark/pkg/discovery" + "github.com/heptio/ark/pkg/util/collections" + kubeutil "github.com/heptio/ark/pkg/util/kube" ) type itemBackupperFactory interface { @@ -39,12 +44,13 @@ type itemBackupperFactory interface { backup *api.Backup, namespaces, resources *collections.IncludesExcludes, backedUpItems map[itemKey]struct{}, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, dynamicFactory client.DynamicFactory, discoveryHelper discovery.Helper, + snapshotService cloudprovider.SnapshotService, ) ItemBackupper } @@ -54,12 +60,13 @@ func (f *defaultItemBackupperFactory) newItemBackupper( backup *api.Backup, namespaces, resources *collections.IncludesExcludes, backedUpItems map[itemKey]struct{}, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, dynamicFactory client.DynamicFactory, discoveryHelper discovery.Helper, + snapshotService cloudprovider.SnapshotService, ) ItemBackupper { ib := &defaultItemBackupper{ backup: backup, @@ -71,7 +78,7 @@ func (f *defaultItemBackupperFactory) newItemBackupper( resourceHooks: resourceHooks, dynamicFactory: dynamicFactory, discoveryHelper: discoveryHelper, - + snapshotService: snapshotService, itemHookHandler: &defaultItemHookHandler{ podCommandExecutor: podCommandExecutor, }, @@ -84,7 +91,7 @@ func (f *defaultItemBackupperFactory) newItemBackupper( } type ItemBackupper interface { - backupItem(logger *logrus.Entry, obj runtime.Unstructured, groupResource schema.GroupResource) error + backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource) error } type defaultItemBackupper struct { @@ -92,11 +99,12 @@ type defaultItemBackupper struct { namespaces *collections.IncludesExcludes resources *collections.IncludesExcludes backedUpItems map[itemKey]struct{} - actions map[schema.GroupResource]Action + actions []resolvedAction tarWriter tarWriter resourceHooks []resourceHook dynamicFactory client.DynamicFactory discoveryHelper discovery.Helper + snapshotService cloudprovider.SnapshotService itemHookHandler itemHookHandler additionalItemBackupper ItemBackupper @@ -107,7 +115,7 @@ var namespacesGroupResource = schema.GroupResource{Group: "", Resource: "namespa // backupItem backs up an individual item to tarWriter. The item may be excluded based on the // namespaces IncludesExcludes list. -func (ib *defaultItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Unstructured, groupResource schema.GroupResource) error { +func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource) error { metadata, err := meta.Accessor(obj) if err != nil { return err @@ -154,18 +162,38 @@ func (ib *defaultItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Uns log.Info("Backing up resource") - item := obj.UnstructuredContent() // Never save status - delete(item, "status") + delete(obj.UnstructuredContent(), "status") if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks); err != nil { return err } - if action, found := ib.actions[groupResource]; found { + for _, action := range ib.actions { + if !action.resourceIncludesExcludes.ShouldInclude(groupResource.String()) { + log.Debug("Skipping action because it does not apply to this resource") + continue + } + + if namespace != "" && !action.namespaceIncludesExcludes.ShouldInclude(namespace) { + log.Debug("Skipping action because it does not apply to this namespace") + continue + } + + if !action.selector.Matches(labels.Set(metadata.GetLabels())) { + log.Debug("Skipping action because label selector does not match") + continue + } + log.Info("Executing custom action") - if additionalItemIdentifiers, err := action.Execute(log, obj, ib.backup); err == nil { + if logSetter, ok := action.ItemAction.(LogSetter); ok { + logSetter.SetLog(log) + } + + if updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup); err == nil { + obj = updatedItem + for _, additionalItem := range additionalItemIdentifiers { gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion("")) if err != nil { @@ -189,6 +217,16 @@ func (ib *defaultItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Uns } } + if groupResource == pvGroupResource { + if ib.snapshotService == nil { + log.Debug("Skipping Persistent Volume snapshot because they're not enabled.") + } else { + if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil { + return err + } + } + } + var filePath string if namespace != "" { filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.NamespaceScopedDir, namespace, name+".json") @@ -196,7 +234,7 @@ func (ib *defaultItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Uns filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.ClusterScopedDir, name+".json") } - itemBytes, err := json.Marshal(item) + itemBytes, err := json.Marshal(obj.UnstructuredContent()) if err != nil { return errors.WithStack(err) } @@ -219,3 +257,74 @@ func (ib *defaultItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Uns return nil } + +// zoneLabel is the label that stores availability-zone info +// on PVs +const zoneLabel = "failure-domain.beta.kubernetes.io/zone" + +// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided +// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud +// disk type and IOPS (if applicable) to be able to restore to current state later. +func (ib *defaultItemBackupper) takePVSnapshot(pv runtime.Unstructured, backup *api.Backup, log logrus.FieldLogger) error { + log.Info("Executing takePVSnapshot") + + if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes { + log.Info("Backup has volume snapshots disabled; skipping volume snapshot action.") + return nil + } + + metadata, err := meta.Accessor(pv) + if err != nil { + return errors.WithStack(err) + } + + name := metadata.GetName() + var pvFailureDomainZone string + labels := metadata.GetLabels() + + if labels[zoneLabel] != "" { + pvFailureDomainZone = labels[zoneLabel] + } else { + log.Infof("label %q is not present on PersistentVolume", zoneLabel) + } + + volumeID, err := kubeutil.GetVolumeID(pv.UnstructuredContent()) + // non-nil error means it's a supported PV source but volume ID can't be found + if err != nil { + return errors.Wrapf(err, "error getting volume ID for PersistentVolume") + } + // no volumeID / nil error means unsupported PV source + if volumeID == "" { + log.Info("PersistentVolume is not a supported volume type for snapshots, skipping.") + return nil + } + + log = log.WithField("volumeID", volumeID) + + log.Info("Snapshotting PersistentVolume") + snapshotID, err := ib.snapshotService.CreateSnapshot(volumeID, pvFailureDomainZone) + if err != nil { + // log+error on purpose - log goes to the per-backup log file, error goes to the backup + log.WithError(err).Error("error creating snapshot") + return errors.WithMessage(err, "error creating snapshot") + } + + volumeType, iops, err := ib.snapshotService.GetVolumeInfo(volumeID, pvFailureDomainZone) + if err != nil { + log.WithError(err).Error("error getting volume info") + return errors.WithMessage(err, "error getting volume info") + } + + if backup.Status.VolumeBackups == nil { + backup.Status.VolumeBackups = make(map[string]*api.VolumeBackupInfo) + } + + backup.Status.VolumeBackups[name] = &api.VolumeBackupInfo{ + SnapshotID: snapshotID, + Type: volumeType, + Iops: iops, + AvailabilityZone: pvFailureDomainZone, + } + + return nil +} diff --git a/pkg/backup/item_backupper_test.go b/pkg/backup/item_backupper_test.go index 94bd6f42a..633448dea 100644 --- a/pkg/backup/item_backupper_test.go +++ b/pkg/backup/item_backupper_test.go @@ -22,8 +22,10 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/heptio/ark/pkg/apis/ark/v1" + api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/util/collections" arktest "github.com/heptio/ark/pkg/util/test" "github.com/pkg/errors" @@ -33,6 +35,7 @@ import ( "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -134,6 +137,8 @@ func TestBackupItemNoSkips(t *testing.T) { expectedActionID string customActionAdditionalItemIdentifiers []ResourceIdentifier customActionAdditionalItems []runtime.Unstructured + groupResource string + snapshottableVolumes map[string]api.VolumeBackupInfo }{ { name: "explicit namespace include", @@ -223,12 +228,33 @@ func TestBackupItemNoSkips(t *testing.T) { unstructuredOrDie(`{"apiVersion":"g2/v1","kind":"r1","metadata":{"namespace":"ns2","name":"n2"}}`), }, }, + { + name: "takePVSnapshot is not invoked for PVs when snapshotService == nil", + namespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"), + item: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, + expectError: false, + expectExcluded: false, + expectedTarHeaderName: "resources/persistentvolumes/cluster/mypv.json", + groupResource: "persistentvolumes", + }, + { + name: "takePVSnapshot is invoked for PVs when snapshotService != nil", + namespaceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"), + item: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, + expectError: false, + expectExcluded: false, + expectedTarHeaderName: "resources/persistentvolumes/cluster/mypv.json", + groupResource: "persistentvolumes", + snapshottableVolumes: map[string]api.VolumeBackupInfo{ + "vol-abc123": {SnapshotID: "snapshot-1", AvailabilityZone: "us-east-1c"}, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { var ( - actions map[schema.GroupResource]Action + actions []resolvedAction action *fakeAction backup = &v1.Backup{} groupResource = schema.ParseGroupResource("resource.group") @@ -237,6 +263,10 @@ func TestBackupItemNoSkips(t *testing.T) { w = &fakeTarWriter{} ) + if test.groupResource != "" { + groupResource = schema.ParseGroupResource(test.groupResource) + } + item, err := getAsMap(test.item) if err != nil { t.Fatal(err) @@ -258,8 +288,13 @@ func TestBackupItemNoSkips(t *testing.T) { action = &fakeAction{ additionalItems: test.customActionAdditionalItemIdentifiers, } - actions = map[schema.GroupResource]Action{ - groupResource: action, + actions = []resolvedAction{ + { + ItemAction: action, + namespaceIncludesExcludes: collections.NewIncludesExcludes(), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes(groupResource.String()), + selector: labels.Everything(), + }, } } @@ -284,8 +319,15 @@ func TestBackupItemNoSkips(t *testing.T) { resourceHooks, dynamicFactory, discoveryHelper, + nil, ).(*defaultItemBackupper) + var snapshotService *arktest.FakeSnapshotService + if test.snapshottableVolumes != nil { + snapshotService = &arktest.FakeSnapshotService{SnapshottableVolumes: test.snapshottableVolumes} + b.snapshotService = snapshotService + } + // make sure the podCommandExecutor was set correctly in the real hook handler assert.Equal(t, podCommandExecutor, b.itemHookHandler.(*defaultItemHookHandler).podCommandExecutor) @@ -361,10 +403,231 @@ func TestBackupItemNoSkips(t *testing.T) { t.Errorf("action.ids[0]: expected %s, got %s", e, a) } - if len(action.backups) != 1 { - t.Errorf("unexpected custom action backups: %#v", action.backups) - } else if e, a := backup, action.backups[0]; e != a { - t.Errorf("action.backups[0]: expected %#v, got %#v", e, a) + require.Equal(t, 1, len(action.backups), "unexpected custom action backups: %#v", action.backups) + assert.Equal(t, backup, &(action.backups[0]), "backup") + } + + if test.snapshottableVolumes != nil { + require.Equal(t, 1, len(snapshotService.SnapshotsTaken)) + + var expectedBackups []api.VolumeBackupInfo + for _, vbi := range test.snapshottableVolumes { + expectedBackups = append(expectedBackups, vbi) + } + + var actualBackups []api.VolumeBackupInfo + for _, vbi := range backup.Status.VolumeBackups { + actualBackups = append(actualBackups, *vbi) + } + + assert.Equal(t, expectedBackups, actualBackups) + } + }) + } +} + +func TestTakePVSnapshot(t *testing.T) { + iops := int64(1000) + + tests := []struct { + name string + snapshotEnabled bool + pv string + ttl time.Duration + expectError bool + expectedVolumeID string + expectedSnapshotsTaken int + existingVolumeBackups map[string]*v1.VolumeBackupInfo + volumeInfo map[string]v1.VolumeBackupInfo + }{ + { + name: "snapshot disabled", + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}}`, + snapshotEnabled: false, + }, + { + name: "can't find volume id - missing spec", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}}`, + expectError: true, + }, + { + name: "unsupported PV source type", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"unsupportedPVSource": {}}}`, + expectError: false, + }, + { + name: "can't find volume id - aws but no volume id", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"awsElasticBlockStore": {}}}`, + expectError: true, + }, + { + name: "can't find volume id - gce but no volume id", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {}}}`, + expectError: true, + }, + { + name: "aws - simple volume id", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "vol-abc123", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "vol-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "us-east-1c"}, + }, + }, + { + name: "aws - simple volume id with provisioned IOPS", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "vol-abc123", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "vol-abc123": {Type: "io1", Iops: &iops, SnapshotID: "snap-1", AvailabilityZone: "us-east-1c"}, + }, + }, + { + name: "aws - dynamically provisioned volume id", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-west-2a"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-west-2a/vol-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "vol-abc123", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "vol-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "us-west-2a"}, + }, + }, + { + name: "gce", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "gcp-zone2"}}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "pd-abc123", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "pd-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "gcp-zone2"}, + }, + }, + { + name: "azure", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"azureDisk": {"diskName": "foo-disk"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "foo-disk", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "foo-disk": {Type: "gp", SnapshotID: "snap-1"}, + }, + }, + { + name: "preexisting volume backup info in backup status", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "pd-abc123", + ttl: 5 * time.Minute, + existingVolumeBackups: map[string]*v1.VolumeBackupInfo{ + "anotherpv": {SnapshotID: "anothersnap"}, + }, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "pd-abc123": {Type: "gp", SnapshotID: "snap-1"}, + }, + }, + { + name: "create snapshot error", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, + expectError: true, + }, + { + name: "PV with label metadata but no failureDomainZone", + snapshotEnabled: true, + pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/region": "us-east-1"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, + expectError: false, + expectedSnapshotsTaken: 1, + expectedVolumeID: "vol-abc123", + ttl: 5 * time.Minute, + volumeInfo: map[string]v1.VolumeBackupInfo{ + "vol-abc123": {Type: "gp", SnapshotID: "snap-1"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + backup := &v1.Backup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: v1.DefaultNamespace, + Name: "mybackup", + }, + Spec: v1.BackupSpec{ + SnapshotVolumes: &test.snapshotEnabled, + TTL: metav1.Duration{Duration: test.ttl}, + }, + Status: v1.BackupStatus{ + VolumeBackups: test.existingVolumeBackups, + }, + } + + snapshotService := &arktest.FakeSnapshotService{SnapshottableVolumes: test.volumeInfo} + + ib := &defaultItemBackupper{snapshotService: snapshotService} + + pv, err := getAsMap(test.pv) + if err != nil { + t.Fatal(err) + } + + // method under test + err = ib.takePVSnapshot(&unstructured.Unstructured{Object: pv}, backup, arktest.NewLogger()) + + gotErr := err != nil + + if e, a := test.expectError, gotErr; e != a { + t.Errorf("error: expected %v, got %v", e, a) + } + if test.expectError { + return + } + + if !test.snapshotEnabled { + // don't need to check anything else if snapshots are disabled + return + } + + expectedVolumeBackups := test.existingVolumeBackups + if expectedVolumeBackups == nil { + expectedVolumeBackups = make(map[string]*v1.VolumeBackupInfo) + } + + // we should have one snapshot taken exactly + require.Equal(t, test.expectedSnapshotsTaken, snapshotService.SnapshotsTaken.Len()) + + if test.expectedSnapshotsTaken > 0 { + // the snapshotID should be the one in the entry in snapshotService.SnapshottableVolumes + // for the volume we ran the test for + snapshotID, _ := snapshotService.SnapshotsTaken.PopAny() + + expectedVolumeBackups["mypv"] = &v1.VolumeBackupInfo{ + SnapshotID: snapshotID, + Type: test.volumeInfo[test.expectedVolumeID].Type, + Iops: test.volumeInfo[test.expectedVolumeID].Iops, + AvailabilityZone: test.volumeInfo[test.expectedVolumeID].AvailabilityZone, + } + + if e, a := expectedVolumeBackups, backup.Status.VolumeBackups; !reflect.DeepEqual(e, a) { + t.Errorf("backup.status.VolumeBackups: expected %v, got %v", e, a) } } }) @@ -395,7 +658,7 @@ type mockItemBackupper struct { mock.Mock } -func (ib *mockItemBackupper) backupItem(logger *logrus.Entry, obj runtime.Unstructured, groupResource schema.GroupResource) error { +func (ib *mockItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource) error { args := ib.Called(logger, obj, groupResource) return args.Error(0) } diff --git a/pkg/backup/resource_backupper.go b/pkg/backup/resource_backupper.go index 7d8f23126..a13641d7e 100644 --- a/pkg/backup/resource_backupper.go +++ b/pkg/backup/resource_backupper.go @@ -19,6 +19,7 @@ package backup import ( api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" "github.com/pkg/errors" @@ -33,7 +34,7 @@ import ( type resourceBackupperFactory interface { newResourceBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *api.Backup, namespaces *collections.IncludesExcludes, resources *collections.IncludesExcludes, @@ -42,17 +43,18 @@ type resourceBackupperFactory interface { discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) resourceBackupper } type defaultResourceBackupperFactory struct{} func (f *defaultResourceBackupperFactory) newResourceBackupper( - log *logrus.Entry, + log logrus.FieldLogger, backup *api.Backup, namespaces *collections.IncludesExcludes, resources *collections.IncludesExcludes, @@ -61,10 +63,11 @@ func (f *defaultResourceBackupperFactory) newResourceBackupper( discoveryHelper discovery.Helper, backedUpItems map[itemKey]struct{}, cohabitatingResources map[string]*cohabitatingResource, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, + snapshotService cloudprovider.SnapshotService, ) resourceBackupper { return &defaultResourceBackupper{ log: log, @@ -80,8 +83,8 @@ func (f *defaultResourceBackupperFactory) newResourceBackupper( podCommandExecutor: podCommandExecutor, tarWriter: tarWriter, resourceHooks: resourceHooks, - - itemBackupperFactory: &defaultItemBackupperFactory{}, + snapshotService: snapshotService, + itemBackupperFactory: &defaultItemBackupperFactory{}, } } @@ -90,7 +93,7 @@ type resourceBackupper interface { } type defaultResourceBackupper struct { - log *logrus.Entry + log logrus.FieldLogger backup *api.Backup namespaces *collections.IncludesExcludes resources *collections.IncludesExcludes @@ -99,12 +102,12 @@ type defaultResourceBackupper struct { discoveryHelper discovery.Helper backedUpItems map[itemKey]struct{} cohabitatingResources map[string]*cohabitatingResource - actions map[schema.GroupResource]Action + actions []resolvedAction podCommandExecutor podCommandExecutor tarWriter tarWriter resourceHooks []resourceHook - - itemBackupperFactory itemBackupperFactory + snapshotService cloudprovider.SnapshotService + itemBackupperFactory itemBackupperFactory } // backupResource backs up all the objects for a given group-version-resource. @@ -177,6 +180,7 @@ func (rb *defaultResourceBackupper) backupResource( rb.resourceHooks, rb.dynamicFactory, rb.discoveryHelper, + rb.snapshotService, ) namespacesToList := getNamespacesToList(rb.namespaces) diff --git a/pkg/backup/resource_backupper_test.go b/pkg/backup/resource_backupper_test.go index 812e9e82b..501d650eb 100644 --- a/pkg/backup/resource_backupper_test.go +++ b/pkg/backup/resource_backupper_test.go @@ -21,6 +21,7 @@ import ( "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/client" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/discovery" "github.com/heptio/ark/pkg/util/collections" arktest "github.com/heptio/ark/pkg/util/test" @@ -238,8 +239,11 @@ func TestBackupResource(t *testing.T) { "networkpolicies": newCohabitatingResource("networkpolicies", "extensions", "networking.k8s.io"), } - actions := map[schema.GroupResource]Action{ - {Group: "", Resource: "pods"}: &fakeAction{}, + actions := []resolvedAction{ + { + ItemAction: newFakeAction("pods"), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"), + }, } resourceHooks := []resourceHook{ @@ -266,6 +270,7 @@ func TestBackupResource(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).(*defaultResourceBackupper) itemBackupperFactory := &mockItemBackupperFactory{} @@ -287,6 +292,7 @@ func TestBackupResource(t *testing.T) { resourceHooks, dynamicFactory, discoveryHelper, + mock.Anything, ).Return(itemBackupper) if len(test.listResponses) > 0 { @@ -393,8 +399,11 @@ func TestBackupResourceCohabitation(t *testing.T) { "networkpolicies": newCohabitatingResource("networkpolicies", "extensions", "networking.k8s.io"), } - actions := map[schema.GroupResource]Action{ - {Group: "", Resource: "pods"}: &fakeAction{}, + actions := []resolvedAction{ + { + ItemAction: newFakeAction("pods"), + resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("pods"), + }, } resourceHooks := []resourceHook{ @@ -420,6 +429,7 @@ func TestBackupResourceCohabitation(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).(*defaultResourceBackupper) itemBackupperFactory := &mockItemBackupperFactory{} @@ -440,6 +450,7 @@ func TestBackupResourceCohabitation(t *testing.T) { resourceHooks, dynamicFactory, discoveryHelper, + mock.Anything, ).Return(itemBackupper) client := &arktest.FakeDynamicClient{} @@ -476,7 +487,7 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) { cohabitatingResources := map[string]*cohabitatingResource{} - actions := map[schema.GroupResource]Action{} + actions := []resolvedAction{} resourceHooks := []resourceHook{} @@ -499,6 +510,7 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).(*defaultResourceBackupper) itemBackupperFactory := &mockItemBackupperFactory{} @@ -519,6 +531,7 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) { dynamicFactory: dynamicFactory, discoveryHelper: discoveryHelper, itemHookHandler: itemHookHandler, + snapshotService: nil, } itemBackupperFactory.On("newItemBackupper", @@ -532,6 +545,7 @@ func TestBackupResourceOnlyIncludesSpecifiedNamespaces(t *testing.T) { resourceHooks, dynamicFactory, discoveryHelper, + mock.Anything, ).Return(itemBackupper) client := &arktest.FakeDynamicClient{} @@ -567,7 +581,7 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) { cohabitatingResources := map[string]*cohabitatingResource{} - actions := map[schema.GroupResource]Action{} + actions := []resolvedAction{} resourceHooks := []resourceHook{} @@ -590,6 +604,7 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) { podCommandExecutor, tarWriter, resourceHooks, + nil, ).(*defaultResourceBackupper) itemBackupperFactory := &mockItemBackupperFactory{} @@ -613,6 +628,7 @@ func TestBackupResourceListAllNamespacesExcludesCorrectly(t *testing.T) { resourceHooks, dynamicFactory, discoveryHelper, + mock.Anything, ).Return(itemBackupper) client := &arktest.FakeDynamicClient{} @@ -642,12 +658,13 @@ func (ibf *mockItemBackupperFactory) newItemBackupper( backup *v1.Backup, namespaces, resources *collections.IncludesExcludes, backedUpItems map[itemKey]struct{}, - actions map[schema.GroupResource]Action, + actions []resolvedAction, podCommandExecutor podCommandExecutor, tarWriter tarWriter, resourceHooks []resourceHook, dynamicFactory client.DynamicFactory, discoveryHelper discovery.Helper, + snapshotService cloudprovider.SnapshotService, ) ItemBackupper { args := ibf.Called( backup, @@ -660,6 +677,7 @@ func (ibf *mockItemBackupperFactory) newItemBackupper( resourceHooks, dynamicFactory, discoveryHelper, + snapshotService, ) return args.Get(0).(ItemBackupper) } diff --git a/pkg/backup/volume_snapshot_action.go b/pkg/backup/volume_snapshot_action.go deleted file mode 100644 index e3e41f538..000000000 --- a/pkg/backup/volume_snapshot_action.go +++ /dev/null @@ -1,118 +0,0 @@ -/* -Copyright 2017 Heptio Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package backup - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - - api "github.com/heptio/ark/pkg/apis/ark/v1" - "github.com/heptio/ark/pkg/cloudprovider" - kubeutil "github.com/heptio/ark/pkg/util/kube" -) - -// zoneLabel is the label that stores availability-zone info -// on PVs -const zoneLabel = "failure-domain.beta.kubernetes.io/zone" - -// volumeSnapshotAction is a struct that knows how to take snapshots of PersistentVolumes -// that are backed by compatible cloud volumes. -type volumeSnapshotAction struct { - snapshotService cloudprovider.SnapshotService -} - -func NewVolumeSnapshotAction(snapshotService cloudprovider.SnapshotService) (Action, error) { - if snapshotService == nil { - return nil, errors.New("snapshotService cannot be nil") - } - - return &volumeSnapshotAction{ - snapshotService: snapshotService, - }, nil -} - -// Execute triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided -// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud -// disk type and IOPS (if applicable) to be able to restore to current state later. -func (a *volumeSnapshotAction) Execute(log *logrus.Entry, item runtime.Unstructured, backup *api.Backup) ([]ResourceIdentifier, error) { - var noAdditionalItems []ResourceIdentifier - - log.Info("Executing volumeSnapshotAction") - - if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes { - log.Info("Backup has volume snapshots disabled; skipping volume snapshot action.") - return noAdditionalItems, nil - } - - metadata, err := meta.Accessor(item) - if err != nil { - return noAdditionalItems, errors.WithStack(err) - } - - name := metadata.GetName() - var pvFailureDomainZone string - labels := metadata.GetLabels() - - if labels[zoneLabel] != "" { - pvFailureDomainZone = labels[zoneLabel] - } else { - log.Infof("label %q is not present on PersistentVolume", zoneLabel) - } - - volumeID, err := kubeutil.GetVolumeID(item.UnstructuredContent()) - // non-nil error means it's a supported PV source but volume ID can't be found - if err != nil { - return noAdditionalItems, errors.Wrapf(err, "error getting volume ID for PersistentVolume") - } - // no volumeID / nil error means unsupported PV source - if volumeID == "" { - log.Info("PersistentVolume is not a supported volume type for snapshots, skipping.") - return noAdditionalItems, nil - } - - log = log.WithField("volumeID", volumeID) - - log.Info("Snapshotting PersistentVolume") - snapshotID, err := a.snapshotService.CreateSnapshot(volumeID, pvFailureDomainZone) - if err != nil { - // log+error on purpose - log goes to the per-backup log file, error goes to the backup - log.WithError(err).Error("error creating snapshot") - return noAdditionalItems, errors.WithMessage(err, "error creating snapshot") - } - - volumeType, iops, err := a.snapshotService.GetVolumeInfo(volumeID, pvFailureDomainZone) - if err != nil { - log.WithError(err).Error("error getting volume info") - return noAdditionalItems, errors.WithMessage(err, "error getting volume info") - } - - if backup.Status.VolumeBackups == nil { - backup.Status.VolumeBackups = make(map[string]*api.VolumeBackupInfo) - } - - backup.Status.VolumeBackups[name] = &api.VolumeBackupInfo{ - SnapshotID: snapshotID, - Type: volumeType, - Iops: iops, - AvailabilityZone: pvFailureDomainZone, - } - - return noAdditionalItems, nil -} diff --git a/pkg/backup/volume_snapshot_action_test.go b/pkg/backup/volume_snapshot_action_test.go deleted file mode 100644 index b8dd9e5eb..000000000 --- a/pkg/backup/volume_snapshot_action_test.go +++ /dev/null @@ -1,242 +0,0 @@ -/* -Copyright 2017 Heptio Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package backup - -import ( - "reflect" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - "github.com/heptio/ark/pkg/apis/ark/v1" - arktest "github.com/heptio/ark/pkg/util/test" -) - -func TestVolumeSnapshotAction(t *testing.T) { - iops := int64(1000) - - tests := []struct { - name string - snapshotEnabled bool - pv string - ttl time.Duration - expectError bool - expectedVolumeID string - expectedSnapshotsTaken int - existingVolumeBackups map[string]*v1.VolumeBackupInfo - volumeInfo map[string]v1.VolumeBackupInfo - }{ - { - name: "snapshot disabled", - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}}`, - snapshotEnabled: false, - }, - { - name: "can't find volume id - missing spec", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}}`, - expectError: true, - }, - { - name: "unsupported PV source type", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"unsupportedPVSource": {}}}`, - expectError: false, - }, - { - name: "can't find volume id - aws but no volume id", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"awsElasticBlockStore": {}}}`, - expectError: true, - }, - { - name: "can't find volume id - gce but no volume id", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {}}}`, - expectError: true, - }, - { - name: "aws - simple volume id", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "vol-abc123", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "vol-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "us-east-1c"}, - }, - }, - { - name: "aws - simple volume id with provisioned IOPS", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-east-1c"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "vol-abc123", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "vol-abc123": {Type: "io1", Iops: &iops, SnapshotID: "snap-1", AvailabilityZone: "us-east-1c"}, - }, - }, - { - name: "aws - dynamically provisioned volume id", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "us-west-2a"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-west-2a/vol-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "vol-abc123", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "vol-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "us-west-2a"}, - }, - }, - { - name: "gce", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/zone": "gcp-zone2"}}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "pd-abc123", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "pd-abc123": {Type: "gp", SnapshotID: "snap-1", AvailabilityZone: "gcp-zone2"}, - }, - }, - { - name: "azure", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"azureDisk": {"diskName": "foo-disk"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "foo-disk", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "foo-disk": {Type: "gp", SnapshotID: "snap-1"}, - }, - }, - { - name: "preexisting volume backup info in backup status", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "pd-abc123", - ttl: 5 * time.Minute, - existingVolumeBackups: map[string]*v1.VolumeBackupInfo{ - "anotherpv": {SnapshotID: "anothersnap"}, - }, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "pd-abc123": {Type: "gp", SnapshotID: "snap-1"}, - }, - }, - { - name: "create snapshot error", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv"}, "spec": {"gcePersistentDisk": {"pdName": "pd-abc123"}}}`, - expectError: true, - }, - { - name: "PV with label metadata but no failureDomainZone", - snapshotEnabled: true, - pv: `{"apiVersion": "v1", "kind": "PersistentVolume", "metadata": {"name": "mypv", "labels": {"failure-domain.beta.kubernetes.io/region": "us-east-1"}}, "spec": {"awsElasticBlockStore": {"volumeID": "aws://us-east-1c/vol-abc123"}}}`, - expectError: false, - expectedSnapshotsTaken: 1, - expectedVolumeID: "vol-abc123", - ttl: 5 * time.Minute, - volumeInfo: map[string]v1.VolumeBackupInfo{ - "vol-abc123": {Type: "gp", SnapshotID: "snap-1"}, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - backup := &v1.Backup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: v1.DefaultNamespace, - Name: "mybackup", - }, - Spec: v1.BackupSpec{ - SnapshotVolumes: &test.snapshotEnabled, - TTL: metav1.Duration{Duration: test.ttl}, - }, - Status: v1.BackupStatus{ - VolumeBackups: test.existingVolumeBackups, - }, - } - - snapshotService := &arktest.FakeSnapshotService{SnapshottableVolumes: test.volumeInfo} - - vsa, _ := NewVolumeSnapshotAction(snapshotService) - action := vsa.(*volumeSnapshotAction) - - pv, err := getAsMap(test.pv) - if err != nil { - t.Fatal(err) - } - - // method under test - additionalItems, err := action.Execute(arktest.NewLogger(), &unstructured.Unstructured{Object: pv}, backup) - assert.Len(t, additionalItems, 0) - - gotErr := err != nil - - if e, a := test.expectError, gotErr; e != a { - t.Errorf("error: expected %v, got %v", e, a) - } - if test.expectError { - return - } - - if !test.snapshotEnabled { - // don't need to check anything else if snapshots are disabled - return - } - - expectedVolumeBackups := test.existingVolumeBackups - if expectedVolumeBackups == nil { - expectedVolumeBackups = make(map[string]*v1.VolumeBackupInfo) - } - - // we should have one snapshot taken exactly - require.Equal(t, test.expectedSnapshotsTaken, snapshotService.SnapshotsTaken.Len()) - - if test.expectedSnapshotsTaken > 0 { - // the snapshotID should be the one in the entry in snapshotService.SnapshottableVolumes - // for the volume we ran the test for - snapshotID, _ := snapshotService.SnapshotsTaken.PopAny() - - expectedVolumeBackups["mypv"] = &v1.VolumeBackupInfo{ - SnapshotID: snapshotID, - Type: test.volumeInfo[test.expectedVolumeID].Type, - Iops: test.volumeInfo[test.expectedVolumeID].Iops, - AvailabilityZone: test.volumeInfo[test.expectedVolumeID].AvailabilityZone, - } - - if e, a := expectedVolumeBackups, backup.Status.VolumeBackups; !reflect.DeepEqual(e, a) { - t.Errorf("backup.status.VolumeBackups: expected %v, got %v", e, a) - } - } - }) - } -} diff --git a/pkg/cmd/server/plugin/plugin.go b/pkg/cmd/server/plugin/plugin.go index ee7862c21..d8518e20c 100644 --- a/pkg/cmd/server/plugin/plugin.go +++ b/pkg/cmd/server/plugin/plugin.go @@ -18,8 +18,10 @@ package plugin import ( plugin "github.com/hashicorp/go-plugin" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/cloudprovider/aws" "github.com/heptio/ark/pkg/cloudprovider/azure" @@ -42,6 +44,10 @@ func NewCommand() *cobra.Command { "azure": azure.NewBlockStore(), } + backupActions := map[string]backup.ItemAction{ + "backup_pv": backup.NewBackupPVAction(logger), + } + c := &cobra.Command{ Use: "plugin [KIND] [NAME]", Hidden: true, @@ -54,31 +60,45 @@ func NewCommand() *cobra.Command { kind := args[0] name := args[1] - logger.Debugf("Running plugin command for kind=%s, name=%s", kind, name) + logger = logger.WithFields(logrus.Fields{"kind": kind, "name": name}) + + serveConfig := &plugin.ServeConfig{ + HandshakeConfig: arkplugin.Handshake, + GRPCServer: plugin.DefaultGRPCServer, + } + + logger.Debugf("Running plugin command") switch kind { case "cloudprovider": objectStore, found := objectStores[name] if !found { - logger.Fatalf("Unrecognized plugin name %q", name) + logger.Fatalf("Unrecognized plugin name") } blockStore, found := blockStores[name] if !found { - logger.Fatalf("Unrecognized plugin name %q", name) + logger.Fatalf("Unrecognized plugin name") } - plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: arkplugin.Handshake, - Plugins: map[string]plugin.Plugin{ - string(arkplugin.PluginKindObjectStore): arkplugin.NewObjectStorePlugin(objectStore), - string(arkplugin.PluginKindBlockStore): arkplugin.NewBlockStorePlugin(blockStore), - }, - GRPCServer: plugin.DefaultGRPCServer, - }) + serveConfig.Plugins = map[string]plugin.Plugin{ + string(arkplugin.PluginKindObjectStore): arkplugin.NewObjectStorePlugin(objectStore), + string(arkplugin.PluginKindBlockStore): arkplugin.NewBlockStorePlugin(blockStore), + } + case arkplugin.PluginKindBackupItemAction.String(): + action, found := backupActions[name] + if !found { + logger.Fatalf("Unrecognized plugin name") + } + + serveConfig.Plugins = map[string]plugin.Plugin{ + arkplugin.PluginKindBackupItemAction.String(): arkplugin.NewBackupItemActionPlugin(action), + } default: - logger.Fatalf("Unsupported plugin kind %q", kind) + logger.Fatalf("Unsupported plugin kind") } + + plugin.Serve(serveConfig) }, } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 26d5b3745..8a43ddb6a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -162,6 +162,11 @@ func newServer(kubeconfig, baseName string, logger *logrus.Logger) (*server, err return nil, errors.WithStack(err) } + pluginManager, err := plugin.NewManager(logger, logger.Level) + if err != nil { + return nil, err + } + ctx, cancelFunc := context.WithCancel(context.Background()) s := &server{ @@ -174,7 +179,7 @@ func newServer(kubeconfig, baseName string, logger *logrus.Logger) (*server, err ctx: ctx, cancelFunc: cancelFunc, logger: logger, - pluginManager: plugin.NewManager(logger, logger.Level), + pluginManager: pluginManager, } return s, nil @@ -444,6 +449,7 @@ func (s *server) runControllers(config *api.Config) error { config.BackupStorageProvider.Bucket, s.snapshotService != nil, s.logger, + s.pluginManager, ) wg.Add(1) go func() { @@ -545,24 +551,11 @@ func newBackupper( kubeClientConfig *rest.Config, kubeCoreV1Client kcorev1client.CoreV1Interface, ) (backup.Backupper, error) { - actions := map[string]backup.Action{} - dynamicFactory := client.NewDynamicFactory(clientPool) - - if snapshotService != nil { - action, err := backup.NewVolumeSnapshotAction(snapshotService) - if err != nil { - return nil, err - } - actions["persistentvolumes"] = action - - actions["persistentvolumeclaims"] = backup.NewBackupPVAction() - } - return backup.NewKubernetesBackupper( discoveryHelper, - dynamicFactory, - actions, + client.NewDynamicFactory(clientPool), backup.NewPodCommandExecutor(kubeClientConfig, kubeCoreV1Client.RESTClient()), + snapshotService, ) } diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 88b47a524..8b0d8d641 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -41,6 +41,7 @@ import ( arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1" informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1" listers "github.com/heptio/ark/pkg/generated/listers/ark/v1" + "github.com/heptio/ark/pkg/plugin" "github.com/heptio/ark/pkg/util/collections" "github.com/heptio/ark/pkg/util/encode" kubeutil "github.com/heptio/ark/pkg/util/kube" @@ -60,6 +61,7 @@ type backupController struct { queue workqueue.RateLimitingInterface clock clock.Clock logger *logrus.Logger + pluginManager plugin.Manager } func NewBackupController( @@ -70,6 +72,7 @@ func NewBackupController( bucket string, pvProviderExists bool, logger *logrus.Logger, + pluginManager plugin.Manager, ) Interface { c := &backupController{ backupper: backupper, @@ -82,6 +85,7 @@ func NewBackupController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), clock: &clock.RealClock{}, logger: logger, + pluginManager: pluginManager, } c.syncHandler = c.processBackup @@ -316,10 +320,18 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) err = kuberrs.NewAggregate(errs) }() - controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup") - if err := controller.backupper.Backup(backup, backupFile, logFile); err != nil { + actions, err := controller.pluginManager.GetBackupItemActions(backup.Name, controller.logger, controller.logger.Level) + if err != nil { return err } + defer controller.pluginManager.CloseBackupItemActions(backup.Name) + + controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup") + + if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil { + return err + } + controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed") // note: updating this here so the uploaded JSON shows "completed". If diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 286f65b7e..5459eebe9 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -25,12 +25,15 @@ import ( "k8s.io/apimachinery/pkg/util/clock" core "k8s.io/client-go/testing" + "github.com/sirupsen/logrus" testlogger "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/heptio/ark/pkg/apis/ark/v1" + "github.com/heptio/ark/pkg/backup" + "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/generated/clientset/versioned/fake" "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" informers "github.com/heptio/ark/pkg/generated/informers/externalversions" @@ -41,11 +44,99 @@ type fakeBackupper struct { mock.Mock } -func (b *fakeBackupper) Backup(backup *v1.Backup, data, log io.Writer) error { - args := b.Called(backup, data, log) +func (b *fakeBackupper) Backup(backup *v1.Backup, data, log io.Writer, actions []backup.ItemAction) error { + args := b.Called(backup, data, log, actions) return args.Error(0) } +// Manager is an autogenerated mock type for the Manager type +type Manager struct { + mock.Mock +} + +// CloseBackupItemActions provides a mock function with given fields: backupName +func (_m *Manager) CloseBackupItemActions(backupName string) error { + ret := _m.Called(backupName) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(backupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetBackupItemActions provides a mock function with given fields: backupName, logger, level +func (_m *Manager) GetBackupItemActions(backupName string, logger logrus.FieldLogger, level logrus.Level) ([]backup.ItemAction, error) { + ret := _m.Called(backupName, logger, level) + + var r0 []backup.ItemAction + if rf, ok := ret.Get(0).(func(string, logrus.FieldLogger, logrus.Level) []backup.ItemAction); ok { + r0 = rf(backupName, logger, level) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]backup.ItemAction) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, logrus.FieldLogger, logrus.Level) error); ok { + r1 = rf(backupName, logger, level) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetBlockStore provides a mock function with given fields: name +func (_m *Manager) GetBlockStore(name string) (cloudprovider.BlockStore, error) { + ret := _m.Called(name) + + var r0 cloudprovider.BlockStore + if rf, ok := ret.Get(0).(func(string) cloudprovider.BlockStore); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cloudprovider.BlockStore) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetObjectStore provides a mock function with given fields: name +func (_m *Manager) GetObjectStore(name string) (cloudprovider.ObjectStore, error) { + ret := _m.Called(name) + + var r0 cloudprovider.ObjectStore + if rf, ok := ret.Get(0).(func(string) cloudprovider.ObjectStore); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cloudprovider.ObjectStore) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + func TestProcessBackup(t *testing.T) { tests := []struct { name string @@ -152,6 +243,7 @@ func TestProcessBackup(t *testing.T) { cloudBackups = &BackupService{} sharedInformers = informers.NewSharedInformerFactory(client, 0) logger, _ = testlogger.NewNullLogger() + pluginManager = &Manager{} ) c := NewBackupController( @@ -162,6 +254,7 @@ func TestProcessBackup(t *testing.T) { "bucket", test.allowSnapshots, logger, + pluginManager, ).(*backupController) c.clock = clock.NewFakeClock(time.Now()) @@ -187,9 +280,12 @@ func TestProcessBackup(t *testing.T) { backup.Status.Phase = v1.BackupPhaseInProgress backup.Status.Expiration.Time = expiration backup.Status.Version = 1 - backupper.On("Backup", backup, mock.Anything, mock.Anything).Return(nil) + backupper.On("Backup", backup, mock.Anything, mock.Anything, mock.Anything).Return(nil) cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + pluginManager.On("GetBackupItemActions", backup.Name, logger, logger.Level).Return(nil, nil) + pluginManager.On("CloseBackupItemActions", backup.Name).Return(nil) } // this is necessary so the Update() call returns the appropriate object diff --git a/pkg/plugin/backup_item_action.go b/pkg/plugin/backup_item_action.go new file mode 100644 index 000000000..28dd561e2 --- /dev/null +++ b/pkg/plugin/backup_item_action.go @@ -0,0 +1,190 @@ +/* +Copyright 2017 the Heptio Ark contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "encoding/json" + + "github.com/hashicorp/go-plugin" + "github.com/sirupsen/logrus" + "golang.org/x/net/context" + "google.golang.org/grpc" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + api "github.com/heptio/ark/pkg/apis/ark/v1" + arkbackup "github.com/heptio/ark/pkg/backup" + proto "github.com/heptio/ark/pkg/plugin/generated" +) + +// BackupItemActionPlugin is an implementation of go-plugin's Plugin +// interface with support for gRPC for the backup/ItemAction +// interface. +type BackupItemActionPlugin struct { + plugin.NetRPCUnsupportedPlugin + impl arkbackup.ItemAction + log *logrusAdapter +} + +// NewBackupItemActionPlugin constructs a BackupItemActionPlugin. +func NewBackupItemActionPlugin(itemAction arkbackup.ItemAction) *BackupItemActionPlugin { + return &BackupItemActionPlugin{ + impl: itemAction, + } +} + +// GRPCServer registers a BackupItemAction gRPC server. +func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{impl: p.impl}) + return nil +} + +// GRPCClient returns a BackupItemAction gRPC client. +func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { + return &BackupItemActionGRPCClient{grpcClient: proto.NewBackupItemActionClient(c), log: p.log}, nil +} + +// BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a +// gRPC client to make calls to the plugin server. +type BackupItemActionGRPCClient struct { + grpcClient proto.BackupItemActionClient + log *logrusAdapter +} + +func (c *BackupItemActionGRPCClient) AppliesTo() (arkbackup.ResourceSelector, error) { + res, err := c.grpcClient.AppliesTo(context.Background(), &proto.Empty{}) + if err != nil { + return arkbackup.ResourceSelector{}, err + } + + return arkbackup.ResourceSelector{ + IncludedNamespaces: res.IncludedNamespaces, + ExcludedNamespaces: res.ExcludedNamespaces, + IncludedResources: res.IncludedResources, + ExcludedResources: res.ExcludedResources, + LabelSelector: res.Selector, + }, nil +} + +func (c *BackupItemActionGRPCClient) Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []arkbackup.ResourceIdentifier, error) { + itemJSON, err := json.Marshal(item.UnstructuredContent()) + if err != nil { + return nil, nil, err + } + + backupJSON, err := json.Marshal(backup) + if err != nil { + return nil, nil, err + } + + req := &proto.ExecuteRequest{ + Item: itemJSON, + Backup: backupJSON, + } + + res, err := c.grpcClient.Execute(context.Background(), req) + if err != nil { + return nil, nil, err + } + + var updatedItem unstructured.Unstructured + if err := json.Unmarshal(res.Item, &updatedItem); err != nil { + return nil, nil, err + } + + var additionalItems []arkbackup.ResourceIdentifier + + for _, itm := range res.AdditionalItems { + newItem := arkbackup.ResourceIdentifier{ + GroupResource: schema.GroupResource{ + Group: itm.Group, + Resource: itm.Resource, + }, + Namespace: itm.Namespace, + Name: itm.Name, + } + + additionalItems = append(additionalItems, newItem) + } + + return &updatedItem, additionalItems, nil +} + +func (c *BackupItemActionGRPCClient) SetLog(log logrus.FieldLogger) { + c.log.impl = log +} + +// BackupItemActionGRPCServer implements the proto-generated BackupItemActionServer interface, and accepts +// gRPC calls and forwards them to an implementation of the pluggable interface. +type BackupItemActionGRPCServer struct { + impl arkbackup.ItemAction +} + +func (s *BackupItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.Empty) (*proto.AppliesToResponse, error) { + resourceSelector, err := s.impl.AppliesTo() + if err != nil { + return nil, err + } + + return &proto.AppliesToResponse{ + IncludedNamespaces: resourceSelector.IncludedNamespaces, + ExcludedNamespaces: resourceSelector.ExcludedNamespaces, + IncludedResources: resourceSelector.IncludedResources, + ExcludedResources: resourceSelector.ExcludedResources, + Selector: resourceSelector.LabelSelector, + }, nil +} + +func (s *BackupItemActionGRPCServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (*proto.ExecuteResponse, error) { + var item unstructured.Unstructured + var backup api.Backup + + if err := json.Unmarshal(req.Item, &item); err != nil { + return nil, err + } + if err := json.Unmarshal(req.Backup, &backup); err != nil { + return nil, err + } + + updatedItem, additionalItems, err := s.impl.Execute(&item, &backup) + if err != nil { + return nil, err + } + + updatedItemJSON, err := json.Marshal(updatedItem.UnstructuredContent()) + if err != nil { + return nil, err + } + + res := &proto.ExecuteResponse{ + Item: updatedItemJSON, + } + + for _, itm := range additionalItems { + val := proto.ResourceIdentifier{ + Group: itm.Group, + Resource: itm.Resource, + Namespace: itm.Namespace, + Name: itm.Name, + } + res.AdditionalItems = append(res.AdditionalItems, &val) + } + + return res, nil +} diff --git a/pkg/plugin/client_builder.go b/pkg/plugin/client_builder.go new file mode 100644 index 000000000..cd98c4bd6 --- /dev/null +++ b/pkg/plugin/client_builder.go @@ -0,0 +1,43 @@ +package plugin + +import ( + "os/exec" + + "github.com/hashicorp/go-hclog" + hcplugin "github.com/hashicorp/go-plugin" +) + +type clientBuilder struct { + config *hcplugin.ClientConfig +} + +func newClientBuilder(baseConfig *hcplugin.ClientConfig) *clientBuilder { + return &clientBuilder{ + config: baseConfig, + } +} + +func (b *clientBuilder) withPlugin(kind PluginKind, plugin hcplugin.Plugin) *clientBuilder { + if b.config.Plugins == nil { + b.config.Plugins = make(map[string]hcplugin.Plugin) + } + b.config.Plugins[string(kind)] = plugin + + return b +} + +func (b *clientBuilder) withLogger(logger hclog.Logger) *clientBuilder { + b.config.Logger = logger + + return b +} + +func (b *clientBuilder) withCommand(name string, args ...string) *clientBuilder { + b.config.Cmd = exec.Command(name, args...) + + return b +} + +func (b *clientBuilder) client() *hcplugin.Client { + return hcplugin.NewClient(b.config) +} diff --git a/pkg/plugin/client_store.go b/pkg/plugin/client_store.go new file mode 100644 index 000000000..8656d8ac1 --- /dev/null +++ b/pkg/plugin/client_store.go @@ -0,0 +1,105 @@ +package plugin + +import ( + "sync" + + plugin "github.com/hashicorp/go-plugin" + "github.com/pkg/errors" +) + +// clientKey is a unique ID for a plugin client. +type clientKey struct { + kind PluginKind + + // scope is an additional identifier that allows multiple clients + // for the same kind/name to be differentiated. It will typically + // be the name of the applicable backup/restore for ItemAction + // clients, and blank for Object/BlockStore clients. + scope string +} + +func newClientStore() *clientStore { + return &clientStore{ + clients: make(map[clientKey]map[string]*plugin.Client), + lock: &sync.RWMutex{}, + } +} + +// clientStore is a repository of active plugin clients. +type clientStore struct { + // clients is a nested map, keyed first by clientKey (a + // combo of kind and "scope"), and second by plugin name. + // This enables easy listing of all clients for a given + // kind and scope (e.g. all BackupItemActions for a given + // backup), and efficient lookup by kind+name+scope (e.g. + // the AWS ObjectStore.) + clients map[clientKey]map[string]*plugin.Client + lock *sync.RWMutex +} + +// get returns a plugin client for the given kind/name/scope, or an error if none +// is found. +func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + if forScope, found := s.clients[clientKey{kind, scope}]; found { + if client, found := forScope[name]; found { + return client, nil + } + } + + return nil, errors.New("client not found") +} + +// list returns all plugin clients for the given kind/scope, or an +// error if none are found. +func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + if forScope, found := s.clients[clientKey{kind, scope}]; found { + var clients []*plugin.Client + + for _, client := range forScope { + clients = append(clients, client) + } + + return clients, nil + } + + return nil, errors.New("clients not found") +} + +// add stores a plugin client for the given kind/name/scope. +func (s *clientStore) add(client *plugin.Client, kind PluginKind, name, scope string) { + s.lock.Lock() + defer s.lock.Unlock() + + key := clientKey{kind, scope} + + if _, found := s.clients[key]; !found { + s.clients[key] = make(map[string]*plugin.Client) + } + + s.clients[key][name] = client +} + +// delete removes the client with the given kind/name/scope from the store. +func (s *clientStore) delete(kind PluginKind, name, scope string) { + s.lock.Lock() + defer s.lock.Unlock() + + if forScope, found := s.clients[clientKey{kind, scope}]; found { + delete(forScope, name) + } +} + +// deleteAll removes all clients with the given kind/scope from +// the store. +func (s *clientStore) deleteAll(kind PluginKind, scope string) { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.clients, clientKey{kind, scope}) +} diff --git a/pkg/plugin/generated/BackupItemAction.pb.go b/pkg/plugin/generated/BackupItemAction.pb.go new file mode 100644 index 000000000..72374ca8a --- /dev/null +++ b/pkg/plugin/generated/BackupItemAction.pb.go @@ -0,0 +1,339 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: BackupItemAction.proto + +/* +Package generated is a generated protocol buffer package. + +It is generated from these files: + BackupItemAction.proto + BlockStore.proto + ObjectStore.proto + Shared.proto + +It has these top-level messages: + AppliesToResponse + ExecuteRequest + ExecuteResponse + ResourceIdentifier + CreateVolumeRequest + CreateVolumeResponse + GetVolumeInfoRequest + GetVolumeInfoResponse + IsVolumeReadyRequest + IsVolumeReadyResponse + ListSnapshotsRequest + ListSnapshotsResponse + CreateSnapshotRequest + CreateSnapshotResponse + DeleteSnapshotRequest + PutObjectRequest + GetObjectRequest + Bytes + ListCommonPrefixesRequest + ListCommonPrefixesResponse + ListObjectsRequest + ListObjectsResponse + DeleteObjectRequest + CreateSignedURLRequest + CreateSignedURLResponse + Empty + InitRequest +*/ +package generated + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type AppliesToResponse struct { + IncludedNamespaces []string `protobuf:"bytes,1,rep,name=includedNamespaces" json:"includedNamespaces,omitempty"` + ExcludedNamespaces []string `protobuf:"bytes,2,rep,name=excludedNamespaces" json:"excludedNamespaces,omitempty"` + IncludedResources []string `protobuf:"bytes,3,rep,name=includedResources" json:"includedResources,omitempty"` + ExcludedResources []string `protobuf:"bytes,4,rep,name=excludedResources" json:"excludedResources,omitempty"` + Selector string `protobuf:"bytes,5,opt,name=selector" json:"selector,omitempty"` +} + +func (m *AppliesToResponse) Reset() { *m = AppliesToResponse{} } +func (m *AppliesToResponse) String() string { return proto.CompactTextString(m) } +func (*AppliesToResponse) ProtoMessage() {} +func (*AppliesToResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *AppliesToResponse) GetIncludedNamespaces() []string { + if m != nil { + return m.IncludedNamespaces + } + return nil +} + +func (m *AppliesToResponse) GetExcludedNamespaces() []string { + if m != nil { + return m.ExcludedNamespaces + } + return nil +} + +func (m *AppliesToResponse) GetIncludedResources() []string { + if m != nil { + return m.IncludedResources + } + return nil +} + +func (m *AppliesToResponse) GetExcludedResources() []string { + if m != nil { + return m.ExcludedResources + } + return nil +} + +func (m *AppliesToResponse) GetSelector() string { + if m != nil { + return m.Selector + } + return "" +} + +type ExecuteRequest struct { + Item []byte `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"` + Backup []byte `protobuf:"bytes,2,opt,name=backup,proto3" json:"backup,omitempty"` +} + +func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } +func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } +func (*ExecuteRequest) ProtoMessage() {} +func (*ExecuteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *ExecuteRequest) GetItem() []byte { + if m != nil { + return m.Item + } + return nil +} + +func (m *ExecuteRequest) GetBackup() []byte { + if m != nil { + return m.Backup + } + return nil +} + +type ExecuteResponse struct { + Item []byte `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"` + AdditionalItems []*ResourceIdentifier `protobuf:"bytes,2,rep,name=additionalItems" json:"additionalItems,omitempty"` +} + +func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } +func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } +func (*ExecuteResponse) ProtoMessage() {} +func (*ExecuteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *ExecuteResponse) GetItem() []byte { + if m != nil { + return m.Item + } + return nil +} + +func (m *ExecuteResponse) GetAdditionalItems() []*ResourceIdentifier { + if m != nil { + return m.AdditionalItems + } + return nil +} + +type ResourceIdentifier struct { + Group string `protobuf:"bytes,1,opt,name=group" json:"group,omitempty"` + Resource string `protobuf:"bytes,2,opt,name=resource" json:"resource,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=namespace" json:"namespace,omitempty"` + Name string `protobuf:"bytes,4,opt,name=name" json:"name,omitempty"` +} + +func (m *ResourceIdentifier) Reset() { *m = ResourceIdentifier{} } +func (m *ResourceIdentifier) String() string { return proto.CompactTextString(m) } +func (*ResourceIdentifier) ProtoMessage() {} +func (*ResourceIdentifier) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *ResourceIdentifier) GetGroup() string { + if m != nil { + return m.Group + } + return "" +} + +func (m *ResourceIdentifier) GetResource() string { + if m != nil { + return m.Resource + } + return "" +} + +func (m *ResourceIdentifier) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +func (m *ResourceIdentifier) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func init() { + proto.RegisterType((*AppliesToResponse)(nil), "generated.AppliesToResponse") + proto.RegisterType((*ExecuteRequest)(nil), "generated.ExecuteRequest") + proto.RegisterType((*ExecuteResponse)(nil), "generated.ExecuteResponse") + proto.RegisterType((*ResourceIdentifier)(nil), "generated.ResourceIdentifier") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for BackupItemAction service + +type BackupItemActionClient interface { + AppliesTo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*AppliesToResponse, error) + Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) +} + +type backupItemActionClient struct { + cc *grpc.ClientConn +} + +func NewBackupItemActionClient(cc *grpc.ClientConn) BackupItemActionClient { + return &backupItemActionClient{cc} +} + +func (c *backupItemActionClient) AppliesTo(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*AppliesToResponse, error) { + out := new(AppliesToResponse) + err := grpc.Invoke(ctx, "/generated.BackupItemAction/AppliesTo", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *backupItemActionClient) Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) { + out := new(ExecuteResponse) + err := grpc.Invoke(ctx, "/generated.BackupItemAction/Execute", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for BackupItemAction service + +type BackupItemActionServer interface { + AppliesTo(context.Context, *Empty) (*AppliesToResponse, error) + Execute(context.Context, *ExecuteRequest) (*ExecuteResponse, error) +} + +func RegisterBackupItemActionServer(s *grpc.Server, srv BackupItemActionServer) { + s.RegisterService(&_BackupItemAction_serviceDesc, srv) +} + +func _BackupItemAction_AppliesTo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BackupItemActionServer).AppliesTo(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/generated.BackupItemAction/AppliesTo", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BackupItemActionServer).AppliesTo(ctx, req.(*Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _BackupItemAction_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecuteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BackupItemActionServer).Execute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/generated.BackupItemAction/Execute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BackupItemActionServer).Execute(ctx, req.(*ExecuteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _BackupItemAction_serviceDesc = grpc.ServiceDesc{ + ServiceName: "generated.BackupItemAction", + HandlerType: (*BackupItemActionServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "AppliesTo", + Handler: _BackupItemAction_AppliesTo_Handler, + }, + { + MethodName: "Execute", + Handler: _BackupItemAction_Execute_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "BackupItemAction.proto", +} + +func init() { proto.RegisterFile("BackupItemAction.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 366 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xcb, 0x4e, 0xeb, 0x30, + 0x10, 0x86, 0x95, 0xde, 0xce, 0xc9, 0x9c, 0xea, 0xb4, 0xb5, 0x50, 0x15, 0xa2, 0x22, 0x55, 0x59, + 0x75, 0x81, 0xb2, 0x28, 0x4b, 0x58, 0x50, 0xa4, 0x0a, 0x75, 0xc3, 0xc2, 0xf0, 0x02, 0x69, 0x32, + 0x94, 0x88, 0xc4, 0x36, 0xb6, 0x23, 0x95, 0xc7, 0xe0, 0x39, 0x79, 0x09, 0x64, 0xe7, 0xd2, 0xd2, + 0x74, 0x97, 0x99, 0xff, 0x9b, 0x89, 0xe7, 0x9f, 0x81, 0xe9, 0x43, 0x14, 0xbf, 0x17, 0x62, 0xa3, + 0x31, 0x5f, 0xc5, 0x3a, 0xe5, 0x2c, 0x14, 0x92, 0x6b, 0x4e, 0xdc, 0x1d, 0x32, 0x94, 0x91, 0xc6, + 0xc4, 0x1f, 0x3e, 0xbf, 0x45, 0x12, 0x93, 0x52, 0x08, 0xbe, 0x1d, 0x98, 0xac, 0x84, 0xc8, 0x52, + 0x54, 0x2f, 0x9c, 0xa2, 0x12, 0x9c, 0x29, 0x24, 0x21, 0x90, 0x94, 0xc5, 0x59, 0x91, 0x60, 0xf2, + 0x14, 0xe5, 0xa8, 0x44, 0x14, 0xa3, 0xf2, 0x9c, 0x79, 0x77, 0xe1, 0xd2, 0x33, 0x8a, 0xe1, 0x71, + 0xdf, 0xe2, 0x3b, 0x25, 0xdf, 0x56, 0xc8, 0x35, 0x4c, 0xea, 0x2e, 0x14, 0x15, 0x2f, 0xa4, 0xc1, + 0xbb, 0x16, 0x6f, 0x0b, 0x86, 0xae, 0x7b, 0x1c, 0xe8, 0x5e, 0x49, 0xb7, 0x04, 0xe2, 0xc3, 0x5f, + 0x85, 0x19, 0xc6, 0x9a, 0x4b, 0xaf, 0x3f, 0x77, 0x16, 0x2e, 0x6d, 0xe2, 0xe0, 0x0e, 0xfe, 0xaf, + 0xf7, 0x18, 0x17, 0x1a, 0x29, 0x7e, 0x14, 0xa8, 0x34, 0x21, 0xd0, 0x4b, 0x35, 0xe6, 0x9e, 0x33, + 0x77, 0x16, 0x43, 0x6a, 0xbf, 0xc9, 0x14, 0x06, 0x5b, 0x6b, 0xa3, 0xd7, 0xb1, 0xd9, 0x2a, 0x0a, + 0x18, 0x8c, 0x9a, 0xea, 0xca, 0xa8, 0x73, 0xe5, 0x8f, 0x30, 0x8a, 0x92, 0x24, 0x35, 0xee, 0x47, + 0x99, 0xd9, 0x44, 0xe9, 0xc4, 0xbf, 0xe5, 0x55, 0xd8, 0x6c, 0x21, 0xac, 0xdf, 0xbb, 0x49, 0x90, + 0xe9, 0xf4, 0x35, 0x45, 0x49, 0x4f, 0xab, 0x82, 0x3d, 0x90, 0x36, 0x46, 0x2e, 0xa0, 0xbf, 0x93, + 0xbc, 0x10, 0xf6, 0x9f, 0x2e, 0x2d, 0x03, 0x33, 0xb5, 0xac, 0x58, 0xfb, 0x6a, 0x97, 0x36, 0x31, + 0x99, 0x81, 0xcb, 0x6a, 0xef, 0xbd, 0xae, 0x15, 0x0f, 0x09, 0x33, 0x82, 0x09, 0xbc, 0x9e, 0x15, + 0xec, 0xf7, 0xf2, 0xcb, 0x81, 0xf1, 0xe9, 0x25, 0x91, 0x5b, 0x70, 0x9b, 0x4b, 0x21, 0xe3, 0xa3, + 0x59, 0xd6, 0xb9, 0xd0, 0x9f, 0xfe, 0xec, 0x28, 0xd3, 0xbe, 0xa8, 0x7b, 0xf8, 0x53, 0x79, 0x47, + 0x2e, 0x8f, 0x4b, 0x7f, 0x6d, 0xc3, 0xf7, 0xcf, 0x49, 0x65, 0x87, 0xed, 0xc0, 0x1e, 0xec, 0xcd, + 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x20, 0xf8, 0x53, 0xe3, 0x02, 0x00, 0x00, +} diff --git a/pkg/plugin/generated/BlockStore.pb.go b/pkg/plugin/generated/BlockStore.pb.go index aad719fab..c4c35ca4e 100644 --- a/pkg/plugin/generated/BlockStore.pb.go +++ b/pkg/plugin/generated/BlockStore.pb.go @@ -1,39 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: BlockStore.proto -/* -Package generated is a generated protocol buffer package. - -It is generated from these files: - BlockStore.proto - ObjectStore.proto - Shared.proto - -It has these top-level messages: - CreateVolumeRequest - CreateVolumeResponse - GetVolumeInfoRequest - GetVolumeInfoResponse - IsVolumeReadyRequest - IsVolumeReadyResponse - ListSnapshotsRequest - ListSnapshotsResponse - CreateSnapshotRequest - CreateSnapshotResponse - DeleteSnapshotRequest - PutObjectRequest - GetObjectRequest - Bytes - ListCommonPrefixesRequest - ListCommonPrefixesResponse - ListObjectsRequest - ListObjectsResponse - DeleteObjectRequest - CreateSignedURLRequest - CreateSignedURLResponse - Empty - InitRequest -*/ package generated import proto "github.com/golang/protobuf/proto" @@ -50,12 +17,6 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - type CreateVolumeRequest struct { SnapshotID string `protobuf:"bytes,1,opt,name=snapshotID" json:"snapshotID,omitempty"` VolumeType string `protobuf:"bytes,2,opt,name=volumeType" json:"volumeType,omitempty"` @@ -66,7 +27,7 @@ type CreateVolumeRequest struct { func (m *CreateVolumeRequest) Reset() { *m = CreateVolumeRequest{} } func (m *CreateVolumeRequest) String() string { return proto.CompactTextString(m) } func (*CreateVolumeRequest) ProtoMessage() {} -func (*CreateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*CreateVolumeRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } func (m *CreateVolumeRequest) GetSnapshotID() string { if m != nil { @@ -103,7 +64,7 @@ type CreateVolumeResponse struct { func (m *CreateVolumeResponse) Reset() { *m = CreateVolumeResponse{} } func (m *CreateVolumeResponse) String() string { return proto.CompactTextString(m) } func (*CreateVolumeResponse) ProtoMessage() {} -func (*CreateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*CreateVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} } func (m *CreateVolumeResponse) GetVolumeID() string { if m != nil { @@ -120,7 +81,7 @@ type GetVolumeInfoRequest struct { func (m *GetVolumeInfoRequest) Reset() { *m = GetVolumeInfoRequest{} } func (m *GetVolumeInfoRequest) String() string { return proto.CompactTextString(m) } func (*GetVolumeInfoRequest) ProtoMessage() {} -func (*GetVolumeInfoRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*GetVolumeInfoRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} } func (m *GetVolumeInfoRequest) GetVolumeID() string { if m != nil { @@ -144,7 +105,7 @@ type GetVolumeInfoResponse struct { func (m *GetVolumeInfoResponse) Reset() { *m = GetVolumeInfoResponse{} } func (m *GetVolumeInfoResponse) String() string { return proto.CompactTextString(m) } func (*GetVolumeInfoResponse) ProtoMessage() {} -func (*GetVolumeInfoResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*GetVolumeInfoResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } func (m *GetVolumeInfoResponse) GetVolumeType() string { if m != nil { @@ -168,7 +129,7 @@ type IsVolumeReadyRequest struct { func (m *IsVolumeReadyRequest) Reset() { *m = IsVolumeReadyRequest{} } func (m *IsVolumeReadyRequest) String() string { return proto.CompactTextString(m) } func (*IsVolumeReadyRequest) ProtoMessage() {} -func (*IsVolumeReadyRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*IsVolumeReadyRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} } func (m *IsVolumeReadyRequest) GetVolumeID() string { if m != nil { @@ -191,7 +152,7 @@ type IsVolumeReadyResponse struct { func (m *IsVolumeReadyResponse) Reset() { *m = IsVolumeReadyResponse{} } func (m *IsVolumeReadyResponse) String() string { return proto.CompactTextString(m) } func (*IsVolumeReadyResponse) ProtoMessage() {} -func (*IsVolumeReadyResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*IsVolumeReadyResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } func (m *IsVolumeReadyResponse) GetReady() bool { if m != nil { @@ -207,7 +168,7 @@ type ListSnapshotsRequest struct { func (m *ListSnapshotsRequest) Reset() { *m = ListSnapshotsRequest{} } func (m *ListSnapshotsRequest) String() string { return proto.CompactTextString(m) } func (*ListSnapshotsRequest) ProtoMessage() {} -func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} } func (m *ListSnapshotsRequest) GetTagFilters() map[string]string { if m != nil { @@ -223,7 +184,7 @@ type ListSnapshotsResponse struct { func (m *ListSnapshotsResponse) Reset() { *m = ListSnapshotsResponse{} } func (m *ListSnapshotsResponse) String() string { return proto.CompactTextString(m) } func (*ListSnapshotsResponse) ProtoMessage() {} -func (*ListSnapshotsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*ListSnapshotsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} } func (m *ListSnapshotsResponse) GetSnapshotIDs() []string { if m != nil { @@ -241,7 +202,7 @@ type CreateSnapshotRequest struct { func (m *CreateSnapshotRequest) Reset() { *m = CreateSnapshotRequest{} } func (m *CreateSnapshotRequest) String() string { return proto.CompactTextString(m) } func (*CreateSnapshotRequest) ProtoMessage() {} -func (*CreateSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } +func (*CreateSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} } func (m *CreateSnapshotRequest) GetVolumeID() string { if m != nil { @@ -271,7 +232,7 @@ type CreateSnapshotResponse struct { func (m *CreateSnapshotResponse) Reset() { *m = CreateSnapshotResponse{} } func (m *CreateSnapshotResponse) String() string { return proto.CompactTextString(m) } func (*CreateSnapshotResponse) ProtoMessage() {} -func (*CreateSnapshotResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } +func (*CreateSnapshotResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} } func (m *CreateSnapshotResponse) GetSnapshotID() string { if m != nil { @@ -287,7 +248,7 @@ type DeleteSnapshotRequest struct { func (m *DeleteSnapshotRequest) Reset() { *m = DeleteSnapshotRequest{} } func (m *DeleteSnapshotRequest) String() string { return proto.CompactTextString(m) } func (*DeleteSnapshotRequest) ProtoMessage() {} -func (*DeleteSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } +func (*DeleteSnapshotRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{10} } func (m *DeleteSnapshotRequest) GetSnapshotID() string { if m != nil { @@ -580,9 +541,9 @@ var _BlockStore_serviceDesc = grpc.ServiceDesc{ Metadata: "BlockStore.proto", } -func init() { proto.RegisterFile("BlockStore.proto", fileDescriptor0) } +func init() { proto.RegisterFile("BlockStore.proto", fileDescriptor1) } -var fileDescriptor0 = []byte{ +var fileDescriptor1 = []byte{ // 539 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xc1, 0x6e, 0xd3, 0x40, 0x10, 0xd5, 0xc6, 0x06, 0x35, 0x53, 0x5a, 0xa2, 0xc5, 0xae, 0x2c, 0x1f, 0x8a, 0xf1, 0x29, 0x42, diff --git a/pkg/plugin/generated/ObjectStore.pb.go b/pkg/plugin/generated/ObjectStore.pb.go index 4e755d257..10f0a4f8d 100644 --- a/pkg/plugin/generated/ObjectStore.pb.go +++ b/pkg/plugin/generated/ObjectStore.pb.go @@ -26,7 +26,7 @@ type PutObjectRequest struct { func (m *PutObjectRequest) Reset() { *m = PutObjectRequest{} } func (m *PutObjectRequest) String() string { return proto.CompactTextString(m) } func (*PutObjectRequest) ProtoMessage() {} -func (*PutObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } +func (*PutObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} } func (m *PutObjectRequest) GetBucket() string { if m != nil { @@ -57,7 +57,7 @@ type GetObjectRequest struct { func (m *GetObjectRequest) Reset() { *m = GetObjectRequest{} } func (m *GetObjectRequest) String() string { return proto.CompactTextString(m) } func (*GetObjectRequest) ProtoMessage() {} -func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} } +func (*GetObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } func (m *GetObjectRequest) GetBucket() string { if m != nil { @@ -80,7 +80,7 @@ type Bytes struct { func (m *Bytes) Reset() { *m = Bytes{} } func (m *Bytes) String() string { return proto.CompactTextString(m) } func (*Bytes) ProtoMessage() {} -func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} } +func (*Bytes) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} } func (m *Bytes) GetData() []byte { if m != nil { @@ -97,7 +97,7 @@ type ListCommonPrefixesRequest struct { func (m *ListCommonPrefixesRequest) Reset() { *m = ListCommonPrefixesRequest{} } func (m *ListCommonPrefixesRequest) String() string { return proto.CompactTextString(m) } func (*ListCommonPrefixesRequest) ProtoMessage() {} -func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } +func (*ListCommonPrefixesRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} } func (m *ListCommonPrefixesRequest) GetBucket() string { if m != nil { @@ -120,7 +120,7 @@ type ListCommonPrefixesResponse struct { func (m *ListCommonPrefixesResponse) Reset() { *m = ListCommonPrefixesResponse{} } func (m *ListCommonPrefixesResponse) String() string { return proto.CompactTextString(m) } func (*ListCommonPrefixesResponse) ProtoMessage() {} -func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{4} } +func (*ListCommonPrefixesResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{4} } func (m *ListCommonPrefixesResponse) GetPrefixes() []string { if m != nil { @@ -137,7 +137,7 @@ type ListObjectsRequest struct { func (m *ListObjectsRequest) Reset() { *m = ListObjectsRequest{} } func (m *ListObjectsRequest) String() string { return proto.CompactTextString(m) } func (*ListObjectsRequest) ProtoMessage() {} -func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } +func (*ListObjectsRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{5} } func (m *ListObjectsRequest) GetBucket() string { if m != nil { @@ -160,7 +160,7 @@ type ListObjectsResponse struct { func (m *ListObjectsResponse) Reset() { *m = ListObjectsResponse{} } func (m *ListObjectsResponse) String() string { return proto.CompactTextString(m) } func (*ListObjectsResponse) ProtoMessage() {} -func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} } +func (*ListObjectsResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{6} } func (m *ListObjectsResponse) GetKeys() []string { if m != nil { @@ -177,7 +177,7 @@ type DeleteObjectRequest struct { func (m *DeleteObjectRequest) Reset() { *m = DeleteObjectRequest{} } func (m *DeleteObjectRequest) String() string { return proto.CompactTextString(m) } func (*DeleteObjectRequest) ProtoMessage() {} -func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{7} } +func (*DeleteObjectRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{7} } func (m *DeleteObjectRequest) GetBucket() string { if m != nil { @@ -202,7 +202,7 @@ type CreateSignedURLRequest struct { func (m *CreateSignedURLRequest) Reset() { *m = CreateSignedURLRequest{} } func (m *CreateSignedURLRequest) String() string { return proto.CompactTextString(m) } func (*CreateSignedURLRequest) ProtoMessage() {} -func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{8} } +func (*CreateSignedURLRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{8} } func (m *CreateSignedURLRequest) GetBucket() string { if m != nil { @@ -232,7 +232,7 @@ type CreateSignedURLResponse struct { func (m *CreateSignedURLResponse) Reset() { *m = CreateSignedURLResponse{} } func (m *CreateSignedURLResponse) String() string { return proto.CompactTextString(m) } func (*CreateSignedURLResponse) ProtoMessage() {} -func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{9} } +func (*CreateSignedURLResponse) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{9} } func (m *CreateSignedURLResponse) GetUrl() string { if m != nil { @@ -586,9 +586,9 @@ var _ObjectStore_serviceDesc = grpc.ServiceDesc{ Metadata: "ObjectStore.proto", } -func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor1) } +func init() { proto.RegisterFile("ObjectStore.proto", fileDescriptor2) } -var fileDescriptor1 = []byte{ +var fileDescriptor2 = []byte{ // 444 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xdf, 0x8b, 0xd3, 0x40, 0x10, 0xc7, 0x89, 0xa9, 0xc5, 0xcc, 0x15, 0x8c, 0x73, 0x50, 0x6b, 0x4e, 0xa5, 0x2e, 0x0a, 0x15, diff --git a/pkg/plugin/generated/Shared.pb.go b/pkg/plugin/generated/Shared.pb.go index 81ea2a973..965b77237 100644 --- a/pkg/plugin/generated/Shared.pb.go +++ b/pkg/plugin/generated/Shared.pb.go @@ -18,7 +18,7 @@ type Empty struct { func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) String() string { return proto.CompactTextString(m) } func (*Empty) ProtoMessage() {} -func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{0} } +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{0} } type InitRequest struct { Config map[string]string `protobuf:"bytes,1,rep,name=config" json:"config,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` @@ -27,7 +27,7 @@ type InitRequest struct { func (m *InitRequest) Reset() { *m = InitRequest{} } func (m *InitRequest) String() string { return proto.CompactTextString(m) } func (*InitRequest) ProtoMessage() {} -func (*InitRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } +func (*InitRequest) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{1} } func (m *InitRequest) GetConfig() map[string]string { if m != nil { @@ -41,9 +41,9 @@ func init() { proto.RegisterType((*InitRequest)(nil), "generated.InitRequest") } -func init() { proto.RegisterFile("Shared.proto", fileDescriptor2) } +func init() { proto.RegisterFile("Shared.proto", fileDescriptor3) } -var fileDescriptor2 = []byte{ +var fileDescriptor3 = []byte{ // 156 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x09, 0xce, 0x48, 0x2c, 0x4a, 0x4d, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0x4f, 0xcd, 0x4b, 0x2d, 0x4a, diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index cff434467..b8db2b806 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -18,15 +18,17 @@ package plugin import ( "fmt" + "io/ioutil" "os" - "os/exec" "path/filepath" + "strings" "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/cloudprovider" ) @@ -38,6 +40,13 @@ func (k PluginKind) String() string { return string(k) } +func baseConfig() *plugin.ClientConfig { + return &plugin.ClientConfig{ + HandshakeConfig: Handshake, + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + } +} + const ( // PluginKindObjectStore is the Kind string for // an Object Store plugin. @@ -47,12 +56,36 @@ const ( // a Block Store plugin. PluginKindBlockStore PluginKind = "blockstore" + // PluginKindCloudProvider is the Kind string for + // a CloudProvider plugin (i.e. an Object & Block + // store). + // + // NOTE that it is highly likely that in subsequent + // versions of Ark this kind of plugin will be replaced + // with a different mechanism for providing multiple + // plugin impls within a single binary. This should + // probably not be used. + PluginKindCloudProvider PluginKind = "cloudprovider" + + // PluginKindBackupItemAction is the Kind string for + // a Backup ItemAction plugin. + PluginKindBackupItemAction PluginKind = "backupitemaction" + pluginDir = "/plugins" ) +var AllPluginKinds = []PluginKind{ + PluginKindObjectStore, + PluginKindBlockStore, + PluginKindCloudProvider, + PluginKindBackupItemAction, +} + type pluginInfo struct { - kind PluginKind - name string + kinds []PluginKind + name string + commandName string + commandArgs []string } // Manager exposes functions for getting implementations of the pluggable @@ -65,86 +98,58 @@ type Manager interface { // GetBlockStore returns the plugin implementation of the // cloudprovider.BlockStore interface with the specified name. GetBlockStore(name string) (cloudprovider.BlockStore, error) + + // GetBackupItemActions returns all backup.ItemAction plugins. + // These plugin instances should ONLY be used for a single backup + // (mainly because each one outputs to a per-backup log), + // and should be terminated upon completion of the backup with + // CloseBackupItemActions(). + GetBackupItemActions(backupName string, logger logrus.FieldLogger, level logrus.Level) ([]backup.ItemAction, error) + + // CloseBackupItemActions terminates the plugin sub-processes that + // are hosting BackupItemAction plugins for the given backup name. + CloseBackupItemActions(backupName string) error } type manager struct { - logger hclog.Logger - clients map[pluginInfo]*plugin.Client - internalPlugins map[pluginInfo]interface{} + logger hclog.Logger + pluginRegistry *registry + clientStore *clientStore } // NewManager constructs a manager for getting plugin implementations. -func NewManager(logger logrus.FieldLogger, level logrus.Level) Manager { - return &manager{ - logger: (&logrusAdapter{impl: logger, level: level}), - clients: make(map[pluginInfo]*plugin.Client), - internalPlugins: map[pluginInfo]interface{}{ - {kind: PluginKindObjectStore, name: "aws"}: struct{}{}, - {kind: PluginKindBlockStore, name: "aws"}: struct{}{}, +func NewManager(logger logrus.FieldLogger, level logrus.Level) (Manager, error) { + m := &manager{ + logger: &logrusAdapter{impl: logger, level: level}, + pluginRegistry: newRegistry(), + clientStore: newClientStore(), + } - {kind: PluginKindObjectStore, name: "gcp"}: struct{}{}, - {kind: PluginKindBlockStore, name: "gcp"}: struct{}{}, + if err := m.registerPlugins(); err != nil { + return nil, err + } - {kind: PluginKindObjectStore, name: "azure"}: struct{}{}, - {kind: PluginKindBlockStore, name: "azure"}: struct{}{}, - }, + return m, nil +} + +func pluginForKind(kind PluginKind) plugin.Plugin { + switch kind { + case PluginKindObjectStore: + return &ObjectStorePlugin{} + case PluginKindBlockStore: + return &BlockStorePlugin{} + default: + return nil } } -func addPlugins(config *plugin.ClientConfig, kinds ...PluginKind) { - for _, kind := range kinds { - if kind == PluginKindObjectStore { - config.Plugins[kind.String()] = &ObjectStorePlugin{} - } else if kind == PluginKindBlockStore { - config.Plugins[kind.String()] = &BlockStorePlugin{} - } - } -} - -func (m *manager) getPlugin(descriptor pluginInfo, logger hclog.Logger) (interface{}, error) { - client, found := m.clients[descriptor] - if !found { - var ( - externalPath = filepath.Join(pluginDir, fmt.Sprintf("ark-%s-%s", descriptor.kind, descriptor.name)) - config = &plugin.ClientConfig{ - HandshakeConfig: Handshake, - AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, - Plugins: make(map[string]plugin.Plugin), - Logger: logger, - } - ) - - // First check to see if there's an external plugin for this kind and name. this - // is so users can override the built-in plugins if they want. If it doesn't exist, - // see if there's an internal one. - if _, err := os.Stat(externalPath); err == nil { - addPlugins(config, descriptor.kind) - config.Cmd = exec.Command(externalPath) - - client = plugin.NewClient(config) - - m.clients[descriptor] = client - } else if _, found := m.internalPlugins[descriptor]; found { - addPlugins(config, PluginKindObjectStore, PluginKindBlockStore) - config.Cmd = exec.Command("/ark", "plugin", "cloudprovider", descriptor.name) - - client = plugin.NewClient(config) - - // since a single sub-process will serve both an object and block store - // for a given cloud-provider, record this client as being valid for both - m.clients[pluginInfo{PluginKindObjectStore, descriptor.name}] = client - m.clients[pluginInfo{PluginKindBlockStore, descriptor.name}] = client - } else { - return nil, errors.Errorf("plugin not found for kind=%s, name=%s", descriptor.kind, descriptor.name) - } - } - +func getPluginInstance(client *plugin.Client, kind PluginKind) (interface{}, error) { protocolClient, err := client.Client() if err != nil { return nil, errors.WithStack(err) } - plugin, err := protocolClient.Dispense(descriptor.kind.String()) + plugin, err := protocolClient.Dispense(string(kind)) if err != nil { return nil, errors.WithStack(err) } @@ -152,10 +157,56 @@ func (m *manager) getPlugin(descriptor pluginInfo, logger hclog.Logger) (interfa return plugin, nil } +func (m *manager) registerPlugins() error { + // first, register internal plugins + for _, provider := range []string{"aws", "gcp", "azure"} { + m.pluginRegistry.register(provider, "/ark", []string{"plugin", "cloudprovider", provider}, PluginKindObjectStore, PluginKindBlockStore) + } + m.pluginRegistry.register("backup_pv", "/ark", []string{"plugin", string(PluginKindBackupItemAction), "backup_pv"}, PluginKindBackupItemAction) + + // second, register external plugins (these will override internal plugins, if applicable) + if _, err := os.Stat(pluginDir); err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + files, err := ioutil.ReadDir(pluginDir) + if err != nil { + return err + } + + for _, file := range files { + name, kind, err := parse(file.Name()) + if err != nil { + continue + } + + if kind == PluginKindCloudProvider { + m.pluginRegistry.register(name, filepath.Join(pluginDir, file.Name()), nil, PluginKindObjectStore, PluginKindBlockStore) + } else { + m.pluginRegistry.register(name, filepath.Join(pluginDir, file.Name()), nil, kind) + } + } + + return nil +} + +func parse(filename string) (string, PluginKind, error) { + for _, kind := range AllPluginKinds { + if prefix := fmt.Sprintf("ark-%s-", kind); strings.Index(filename, prefix) == 0 { + return strings.Replace(filename, prefix, "", -1), kind, nil + } + } + + return "", "", errors.New("invalid file name") +} + // GetObjectStore returns the plugin implementation of the cloudprovider.ObjectStore // interface with the specified name. func (m *manager) GetObjectStore(name string) (cloudprovider.ObjectStore, error) { - pluginObj, err := m.getPlugin(pluginInfo{PluginKindObjectStore, name}, m.logger) + pluginObj, err := m.getCloudProviderPlugin(name, PluginKindObjectStore) if err != nil { return nil, err } @@ -171,7 +222,7 @@ func (m *manager) GetObjectStore(name string) (cloudprovider.ObjectStore, error) // GetBlockStore returns the plugin implementation of the cloudprovider.BlockStore // interface with the specified name. func (m *manager) GetBlockStore(name string) (cloudprovider.BlockStore, error) { - pluginObj, err := m.getPlugin(pluginInfo{PluginKindBlockStore, name}, m.logger) + pluginObj, err := m.getCloudProviderPlugin(name, PluginKindBlockStore) if err != nil { return nil, err } @@ -183,3 +234,99 @@ func (m *manager) GetBlockStore(name string) (cloudprovider.BlockStore, error) { return blockStore, nil } + +func (m *manager) getCloudProviderPlugin(name string, kind PluginKind) (interface{}, error) { + client, err := m.clientStore.get(kind, name, "") + if err != nil { + pluginInfo, err := m.pluginRegistry.get(kind, name) + if err != nil { + return nil, err + } + + // build a plugin client that can dispense all of the PluginKinds it's registered for + clientBuilder := newClientBuilder(baseConfig()). + withCommand(pluginInfo.commandName, pluginInfo.commandArgs...) + + for _, kind := range pluginInfo.kinds { + clientBuilder.withPlugin(kind, pluginForKind(kind)) + } + + client = clientBuilder.client() + + // register the plugin client for the appropriate kinds + for _, kind := range pluginInfo.kinds { + m.clientStore.add(client, kind, name, "") + } + } + + pluginObj, err := getPluginInstance(client, kind) + if err != nil { + return nil, err + } + + return pluginObj, nil +} + +// GetBackupActions returns all backup.BackupAction plugins. +// These plugin instances should ONLY be used for a single backup +// (mainly because each one outputs to a per-backup log), +// and should be terminated upon completion of the backup with +// CloseBackupActions(). +func (m *manager) GetBackupItemActions(backupName string, logger logrus.FieldLogger, level logrus.Level) ([]backup.ItemAction, error) { + clients, err := m.clientStore.list(PluginKindBackupItemAction, backupName) + if err != nil { + pluginInfo, err := m.pluginRegistry.list(PluginKindBackupItemAction) + if err != nil { + return nil, err + } + + // create clients for each, using the provided logger + log := &logrusAdapter{impl: logger, level: level} + + for _, plugin := range pluginInfo { + client := newClientBuilder(baseConfig()). + withCommand(plugin.commandName, plugin.commandArgs...). + withPlugin(PluginKindBackupItemAction, &BackupItemActionPlugin{log: log}). + withLogger(log). + client() + + m.clientStore.add(client, PluginKindBackupItemAction, plugin.name, backupName) + + clients = append(clients, client) + } + } + + var backupActions []backup.ItemAction + for _, client := range clients { + plugin, err := getPluginInstance(client, PluginKindBackupItemAction) + if err != nil { + return nil, err + } + + backupAction, ok := plugin.(backup.ItemAction) + if !ok { + return nil, errors.New("could not convert gRPC client to backup.BackupAction") + } + + backupActions = append(backupActions, backupAction) + } + + return backupActions, nil +} + +// CloseBackupItemActions terminates the plugin sub-processes that +// are hosting BackupItemAction plugins for the given backup name. +func (m *manager) CloseBackupItemActions(backupName string) error { + clients, err := m.clientStore.list(PluginKindBackupItemAction, backupName) + if err != nil { + return err + } + + for _, client := range clients { + client.Kill() + } + + m.clientStore.deleteAll(PluginKindBackupItemAction, backupName) + + return nil +} diff --git a/pkg/plugin/proto/BackupItemAction.proto b/pkg/plugin/proto/BackupItemAction.proto new file mode 100644 index 000000000..cc6950be6 --- /dev/null +++ b/pkg/plugin/proto/BackupItemAction.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; +package generated; + +import "Shared.proto"; + +message AppliesToResponse { + repeated string includedNamespaces = 1; + repeated string excludedNamespaces = 2; + repeated string includedResources = 3; + repeated string excludedResources = 4; + string selector = 5; +} + +message ExecuteRequest { + bytes item = 1; + bytes backup = 2; +} + +message ExecuteResponse { + bytes item = 1; + repeated ResourceIdentifier additionalItems = 2; +} + +message ResourceIdentifier { + string group = 1; + string resource = 2; + string namespace = 3; + string name = 4; +} + +service BackupItemAction { + rpc AppliesTo(Empty) returns (AppliesToResponse); + rpc Execute(ExecuteRequest) returns (ExecuteResponse); +} diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go new file mode 100644 index 000000000..fc5e11694 --- /dev/null +++ b/pkg/plugin/registry.go @@ -0,0 +1,68 @@ +package plugin + +import ( + "github.com/pkg/errors" +) + +// registry is a simple store of plugin binary information. If a binary +// is registered as supporting multiple PluginKinds, it will be +// gettable/listable for all of those kinds. +type registry struct { + // plugins is a nested map, keyed first by PluginKind, + // and second by name. this is to allow easy listing + // of plugins for a kind, as well as efficient lookup + // of a plugin by kind+name. + plugins map[PluginKind]map[string]pluginInfo +} + +func newRegistry() *registry { + return ®istry{ + plugins: make(map[PluginKind]map[string]pluginInfo), + } +} + +// register adds a binary to the registry. If the binary supports multiple +// PluginKinds, it will be stored for each of those kinds so subsequent gets/lists +// for any supported kind will return it. +func (r *registry) register(name, commandName string, commandArgs []string, kinds ...PluginKind) { + for _, kind := range kinds { + if r.plugins[kind] == nil { + r.plugins[kind] = make(map[string]pluginInfo) + } + + r.plugins[kind][name] = pluginInfo{ + kinds: kinds, + name: name, + commandName: commandName, + commandArgs: commandArgs, + } + } +} + +// list returns info about all plugin binaries that implement the given +// PluginKind. +func (r *registry) list(kind PluginKind) ([]pluginInfo, error) { + var res []pluginInfo + + if plugins, found := r.plugins[kind]; found { + for _, itm := range plugins { + res = append(res, itm) + } + + return res, nil + } + + return nil, errors.New("plugins not found") +} + +// get returns info about a plugin with the given name and kind, or an +// error if one cannot be found. +func (r *registry) get(kind PluginKind, name string) (pluginInfo, error) { + if forKind := r.plugins[kind]; forKind != nil { + if plugin, found := r.plugins[kind][name]; found { + return plugin, nil + } + } + + return pluginInfo{}, errors.New("plugin not found") +}