when backing up PVCs, also back up claimed PVs

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2017-08-24 16:44:01 -07:00
parent 388be34a04
commit 9438a8670a
13 changed files with 446 additions and 230 deletions

View File

@@ -113,6 +113,7 @@ rules:
- apiGroups:
- "*"
verbs:
- get
- list
- watch
- create

View File

@@ -20,6 +20,7 @@ import (
"archive/tar"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"strings"
"time"
@@ -69,7 +70,17 @@ func (ac ActionContext) infof(msg string, args ...interface{}) {
type Action interface {
// Execute is invoked on an item being backed up. If an error is returned, the Backup is marked as
// failed.
Execute(ctx ActionContext, item map[string]interface{}, backup *api.Backup) error
Execute(ctx *backupContext, item map[string]interface{}, backupper itemBackupper) error
}
type itemKey struct {
resource string
namespace string
name string
}
func (i *itemKey) String() string {
return fmt.Sprintf("resource=%s,namespace=%s,name=%s", i.resource, i.namespace, i.name)
}
// NewKubernetesBackupper creates a new kubernetesBackupper.
@@ -97,33 +108,39 @@ func resolveActions(helper discovery.Helper, actions map[string]Action) (map[sch
ret := make(map[schema.GroupResource]Action)
for resource, action := range actions {
gr, err := helper.ResolveGroupResource(resource)
gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
if err != nil {
return nil, err
}
ret[gr] = action
ret[gvr.GroupResource()] = action
}
return ret, nil
}
// getResourceIncludesExcludes takes the lists of resources to include and exclude from the
// backup, uses the discovery helper to resolve them to fully-qualified group-resource names, and returns
// an IncludesExcludes list.
// getResourceIncludesExcludes takes the lists of resources to include and exclude, uses the
// discovery helper to resolve them to fully-qualified group-resource names, and returns an
// IncludesExcludes list.
func (ctx *backupContext) getResourceIncludesExcludes(helper discovery.Helper, includes, excludes []string) *collections.IncludesExcludes {
return collections.GenerateIncludesExcludes(
resources := collections.GenerateIncludesExcludes(
includes,
excludes,
func(item string) string {
gr, err := helper.ResolveGroupResource(item)
gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(item).WithVersion(""))
if err != nil {
ctx.infof("Unable to resolve resource %q: %v", item, err)
return ""
}
gr := gvr.GroupResource()
return gr.String()
},
)
ctx.infof("Including resources: %v", strings.Join(resources.GetIncludes(), ", "))
ctx.infof("Excluding resources: %v", strings.Join(resources.GetExcludes(), ", "))
return resources
}
// getNamespaceIncludesExcludes returns an IncludesExcludes list containing which namespaces to
@@ -146,6 +163,15 @@ type backupContext struct {
// resource, from either the networking.k8s.io or extensions api groups. We only want to back them
// up once, from whichever api group we see first.
networkPoliciesBackedUp bool
actions map[schema.GroupResource]Action
// backedUpItems keeps track of items that have been backed up already.
backedUpItems map[itemKey]struct{}
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
}
func (ctx *backupContext) infof(msg string, args ...interface{}) {
@@ -174,6 +200,10 @@ func (kb *kubernetesBackupper) Backup(backup *api.Backup, backupFile, logFile io
w: tw,
logger: log,
namespaceIncludesExcludes: getNamespaceIncludesExcludes(backup),
backedUpItems: make(map[itemKey]struct{}),
actions: kb.actions,
dynamicFactory: kb.dynamicFactory,
discoveryHelper: kb.discoveryHelper,
}
ctx.infof("Starting backup")
@@ -205,15 +235,34 @@ type tarWriter interface {
// backupGroup backs up a single API group.
func (kb *kubernetesBackupper) backupGroup(ctx *backupContext, group *metav1.APIResourceList) error {
var errs []error
var (
errs []error
pv *metav1.APIResource
)
for _, resource := range group.APIResources {
processResource := func(resource metav1.APIResource) {
ctx.infof("Processing resource %s/%s", group.GroupVersion, resource.Name)
if err := kb.backupResource(ctx, group, resource); err != nil {
errs = append(errs, err)
}
}
for _, resource := range group.APIResources {
// do PVs last because if we're also backing up PVCs, we want to backup
// PVs within the scope of the PVCs (within the PVC action) to allow
// for hooks to run
if strings.ToLower(resource.Name) == "persistentvolumes" && strings.ToLower(group.GroupVersion) == "v1" {
pvResource := resource
pv = &pvResource
continue
}
processResource(resource)
}
if pv != nil {
processResource(*pv)
}
return kuberrs.NewAggregate(errs)
}
@@ -245,34 +294,37 @@ func (kb *kubernetesBackupper) backupResource(
return nil
}
if grString == appsDeploymentsResource || grString == extensionsDeploymentsResource {
if ctx.deploymentsBackedUp {
var other string
if grString == appsDeploymentsResource {
other = extensionsDeploymentsResource
} else {
other = appsDeploymentsResource
}
ctx.infof("Skipping resource %q because it's a duplicate of %q", grString, other)
return nil
shouldBackup := func(gr, gr1, gr2 string, backedUp *bool) bool {
// if it's neither of the specified dupe group-resources, back it up
if gr != gr1 && gr != gr2 {
return true
}
ctx.deploymentsBackedUp = true
// if it hasn't been backed up yet, back it up
if !*backedUp {
*backedUp = true
return true
}
// else, don't back it up, and log why
var other string
switch gr {
case gr1:
other = gr2
case gr2:
other = gr1
}
ctx.infof("Skipping resource %q because it's a duplicate of %q", gr, other)
return false
}
if grString == networkingNetworkPoliciesResource || grString == extensionsNetworkPoliciesResource {
if ctx.networkPoliciesBackedUp {
var other string
if grString == networkingNetworkPoliciesResource {
other = extensionsNetworkPoliciesResource
} else {
other = networkingNetworkPoliciesResource
}
ctx.infof("Skipping resource %q because it's a duplicate of %q", grString, other)
return nil
}
if !shouldBackup(grString, appsDeploymentsResource, extensionsDeploymentsResource, &ctx.deploymentsBackedUp) {
return nil
}
ctx.networkPoliciesBackedUp = true
if !shouldBackup(grString, networkingNetworkPoliciesResource, extensionsNetworkPoliciesResource, &ctx.networkPoliciesBackedUp) {
return nil
}
var namespacesToList []string
@@ -302,8 +354,6 @@ func (kb *kubernetesBackupper) backupResource(
return errors.WithStack(err)
}
action := kb.actions[gr]
for _, item := range items {
unstructured, ok := item.(runtime.Unstructured)
if !ok {
@@ -313,7 +363,7 @@ func (kb *kubernetesBackupper) backupResource(
obj := unstructured.UnstructuredContent()
if err := kb.itemBackupper.backupItem(ctx, obj, grString, action); err != nil {
if err := kb.itemBackupper.backupItem(ctx, obj, gr); err != nil {
errs = append(errs, err)
}
}
@@ -346,28 +396,21 @@ func getNamespacesToList(ie *collections.IncludesExcludes) []string {
}
type itemBackupper interface {
backupItem(ctx *backupContext, item map[string]interface{}, groupResource string, action Action) error
backupItem(ctx *backupContext, item map[string]interface{}, groupResource schema.GroupResource) error
}
type realItemBackupper struct{}
// backupItem backs up an individual item to tarWriter. The item may be excluded based on the
// namespaces IncludesExcludes list.
func (*realItemBackupper) backupItem(ctx *backupContext, item map[string]interface{}, groupResource string, action Action) error {
// Never save status
delete(item, "status")
metadata, err := collections.GetMap(item, "metadata")
func (ib *realItemBackupper) backupItem(ctx *backupContext, item map[string]interface{}, groupResource schema.GroupResource) error {
name, err := collections.GetString(item, "metadata.name")
if err != nil {
return err
}
name, err := collections.GetString(metadata, "name")
if err != nil {
return err
}
namespace, err := collections.GetString(metadata, "namespace")
namespace, err := collections.GetString(item, "metadata.namespace")
// a non-nil error is assumed to be due to a cluster-scoped item
if err == nil {
if !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
ctx.infof("Excluding item %s because namespace %s is excluded", name, namespace)
@@ -375,22 +418,41 @@ func (*realItemBackupper) backupItem(ctx *backupContext, item map[string]interfa
}
}
if action != nil {
ctx.infof("Executing action on %s, ns=%s, name=%s", groupResource, namespace, name)
if !ctx.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
ctx.infof("Excluding item %s because resource %s is excluded", name, groupResource.String())
return nil
}
actionCtx := ActionContext{logger: ctx.logger}
if err := action.Execute(actionCtx, item, ctx.backup); err != nil {
key := itemKey{
resource: groupResource.String(),
namespace: namespace,
name: name,
}
if _, exists := ctx.backedUpItems[key]; exists {
ctx.infof("Skipping item %s because it's already been backed up.", name)
return nil
}
ctx.backedUpItems[key] = struct{}{}
// Never save status
delete(item, "status")
if action, hasAction := ctx.actions[groupResource]; hasAction {
ctx.infof("Executing action on %s, ns=%s, name=%s", groupResource.String(), namespace, name)
if err := action.Execute(ctx, item, ib); err != nil {
return err
}
}
ctx.infof("Backing up resource=%s, ns=%s, name=%s", groupResource, namespace, name)
ctx.infof("Backing up resource=%s, ns=%s, name=%s", groupResource.String(), namespace, name)
var filePath string
if namespace != "" {
filePath = strings.Join([]string{api.NamespaceScopedDir, namespace, groupResource, name + ".json"}, "/")
filePath = strings.Join([]string{api.NamespaceScopedDir, namespace, groupResource.String(), name + ".json"}, "/")
} else {
filePath = strings.Join([]string{api.ClusterScopedDir, groupResource, name + ".json"}, "/")
filePath = strings.Join([]string{api.ClusterScopedDir, groupResource.String(), name + ".json"}, "/")
}
itemBytes, err := json.Marshal(item)

View File

@@ -0,0 +1,80 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/heptio/ark/pkg/util/collections"
)
// backupPVAction inspects a PersistentVolumeClaim for the PersistentVolume
// that it references and backs it up
type backupPVAction struct {
}
var _ Action = &backupPVAction{}
func NewBackupPVAction() Action {
return &backupPVAction{}
}
// Execute finds the PersistentVolume referenced by the provided
// PersistentVolumeClaim and backs it up
func (a *backupPVAction) Execute(ctx *backupContext, pvc map[string]interface{}, backupper itemBackupper) error {
pvcName, err := collections.GetString(pvc, "metadata.name")
if err != nil {
ctx.infof("unable to get metadata.name for PersistentVolumeClaim: %v", err)
return err
}
volumeName, err := collections.GetString(pvc, "spec.volumeName")
if err != nil {
ctx.infof("unable to get spec.volumeName for PersistentVolumeClaim %s: %v", pvcName, err)
return err
}
gvr, resource, err := ctx.discoveryHelper.ResourceFor(schema.GroupVersionResource{Resource: "persistentvolumes"})
if err != nil {
ctx.infof("error getting GroupVersionResource for PersistentVolumes: %v", err)
return err
}
gr := gvr.GroupResource()
client, err := ctx.dynamicFactory.ClientForGroupVersionResource(gvr, resource, "")
if err != nil {
ctx.infof("error getting client for GroupVersionResource=%s, Resource=%s: %v", gvr.String(), resource, err)
return err
}
pv, err := client.Get(volumeName, metav1.GetOptions{})
if err != nil {
ctx.infof("error getting PersistentVolume %s: %v", volumeName, err)
return errors.WithStack(err)
}
ctx.infof("backing up PersistentVolume %s for PersistentVolumeClaim %s", volumeName, pvcName)
if err := backupper.backupItem(ctx, pv.UnstructuredContent(), gr); err != nil {
ctx.infof("error backing up PersistentVolume %s: %v", volumeName, err)
return err
}
return nil
}

View File

@@ -53,7 +53,7 @@ type fakeAction struct {
var _ Action = &fakeAction{}
func (a *fakeAction) Execute(ctx ActionContext, item map[string]interface{}, backup *v1.Backup) error {
func (a *fakeAction) Execute(ctx *backupContext, item map[string]interface{}, backupper itemBackupper) error {
metadata, err := collections.GetMap(item, "metadata")
if err != nil {
return err
@@ -70,7 +70,7 @@ func (a *fakeAction) Execute(ctx ActionContext, item map[string]interface{}, bac
}
a.ids = append(a.ids, id)
a.backups = append(a.backups, backup)
a.backups = append(a.backups, ctx.backup)
return nil
}
@@ -106,18 +106,15 @@ func TestResolveActions(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dh := &FakeDiscoveryHelper{
RESTMapper: &FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "foo"}: schema.GroupVersionResource{Group: "somegroup", Resource: "foodies"},
schema.GroupVersionResource{Resource: "fie"}: schema.GroupVersionResource{Group: "somegroup", Resource: "fields"},
schema.GroupVersionResource{Resource: "bar"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "barnacles"},
schema.GroupVersionResource{Resource: "baz"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "bazaars"},
},
},
resources := map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "foo"}: schema.GroupVersionResource{Group: "somegroup", Resource: "foodies"},
schema.GroupVersionResource{Resource: "fie"}: schema.GroupVersionResource{Group: "somegroup", Resource: "fields"},
schema.GroupVersionResource{Resource: "bar"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "barnacles"},
schema.GroupVersionResource{Resource: "baz"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "bazaars"},
}
discoveryHelper := NewFakeDiscoveryHelper(false, resources)
actual, err := resolveActions(dh, test.input)
actual, err := resolveActions(discoveryHelper, test.input)
gotError := err != nil
if e, a := test.expectError, gotError; e != a {
@@ -180,24 +177,20 @@ func TestGetResourceIncludesExcludes(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dh := &FakeDiscoveryHelper{
RESTMapper: &FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "foo"}: schema.GroupVersionResource{Group: "somegroup", Resource: "foodies"},
schema.GroupVersionResource{Resource: "fie"}: schema.GroupVersionResource{Group: "somegroup", Resource: "fields"},
schema.GroupVersionResource{Resource: "bar"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "barnacles"},
schema.GroupVersionResource{Resource: "baz"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "bazaars"},
},
},
resources := map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "foo"}: schema.GroupVersionResource{Group: "somegroup", Resource: "foodies"},
schema.GroupVersionResource{Resource: "fie"}: schema.GroupVersionResource{Group: "somegroup", Resource: "fields"},
schema.GroupVersionResource{Resource: "bar"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "barnacles"},
schema.GroupVersionResource{Resource: "baz"}: schema.GroupVersionResource{Group: "anothergroup", Resource: "bazaars"},
}
discoveryHelper := NewFakeDiscoveryHelper(false, resources)
log, _ := testlogger.NewNullLogger()
ctx := &backupContext{
logger: log,
}
actual := ctx.getResourceIncludesExcludes(dh, test.includes, test.excludes)
actual := ctx.getResourceIncludesExcludes(discoveryHelper, test.includes, test.excludes)
sort.Strings(test.expectedIncludes)
actualIncludes := actual.GetIncludes()
@@ -294,7 +287,7 @@ func TestBackupMethod(t *testing.T) {
}
discoveryHelper := &FakeDiscoveryHelper{
RESTMapper: &FakeMapper{
Mapper: &FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "cm"}: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"},
schema.GroupVersionResource{Resource: "csr"}: schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"},
@@ -636,29 +629,29 @@ func TestBackupResource(t *testing.T) {
expectedListedNamespaces: []string{"a", "b"},
lists: []string{
`{
"apiVersion": "apps/v1beta1",
"kind": "DeploymentList",
"items": [
{
"metadata": {
"namespace": "a",
"name": "1"
}
}
]
}`,
"apiVersion": "apps/v1beta1",
"kind": "DeploymentList",
"items": [
{
"metadata": {
"namespace": "a",
"name": "1"
}
}
]
}`,
`{
"apiVersion": "apps/v1beta1v1",
"kind": "DeploymentList",
"items": [
{
"metadata": {
"namespace": "b",
"name": "2"
}
}
]
}`,
"apiVersion": "apps/v1beta1v1",
"kind": "DeploymentList",
"items": [
{
"metadata": {
"namespace": "b",
"name": "2"
}
}
]
}`,
},
expectedDeploymentsBackedUp: true,
},
@@ -674,17 +667,17 @@ func TestBackupResource(t *testing.T) {
expectedListedNamespaces: []string{""},
lists: []string{
`{
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicyList",
"items": [
{
"metadata": {
"namespace": "a",
"name": "1"
}
}
]
}`,
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicyList",
"items": [
{
"metadata": {
"namespace": "a",
"name": "1"
}
}
]
}`,
},
expectedNetworkPoliciesBackedUp: true,
},
@@ -701,19 +694,19 @@ func TestBackupResource(t *testing.T) {
labelSelector: "a=b",
lists: []string{
`{
"apiVersion": "certifiaces.k8s.io/v1beta1",
"kind": "CertificateSigningRequestList",
"items": [
{
"metadata": {
"name": "1",
"labels": {
"a": "b"
"apiVersion": "certifiaces.k8s.io/v1beta1",
"kind": "CertificateSigningRequestList",
"items": [
{
"metadata": {
"name": "1",
"labels": {
"a": "b"
}
}
}
}
}
]
}`,
]
}`,
},
},
{
@@ -729,7 +722,7 @@ func TestBackupResource(t *testing.T) {
labelSelector: "a=b",
lists: []string{
`{
"apiVersion": "certifiaces.k8s.io/v1beta1",
"apiVersion": "certificates.k8s.io/v1beta1",
"kind": "CertificateSigningRequestList",
"items": [
{
@@ -803,7 +796,7 @@ func TestBackupResource(t *testing.T) {
require.NoError(t, err)
for i := range list {
item := list[i].(*unstructured.Unstructured)
itemBackupper.On("backupItem", ctx, item.Object, gr.String(), action).Return(nil)
itemBackupper.On("backupItem", ctx, item.Object, gr).Return(nil)
if action != nil {
a, err := meta.Accessor(item)
require.NoError(t, err)
@@ -822,14 +815,11 @@ func TestBackupResource(t *testing.T) {
}
}
discoveryHelper := &FakeDiscoveryHelper{
RESTMapper: &FakeMapper{
Resources: map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "certificatesigningrequests"}: schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"},
schema.GroupVersionResource{Resource: "other"}: schema.GroupVersionResource{Group: "somegroup", Version: "someversion", Resource: "otherthings"},
},
},
resources := map[schema.GroupVersionResource]schema.GroupVersionResource{
schema.GroupVersionResource{Resource: "certificatesigningrequests"}: schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"},
schema.GroupVersionResource{Resource: "other"}: schema.GroupVersionResource{Group: "somegroup", Version: "someversion", Resource: "otherthings"},
}
discoveryHelper := NewFakeDiscoveryHelper(false, resources)
kb, err := NewKubernetesBackupper(discoveryHelper, dynamicFactory, test.actions)
require.NoError(t, err)
@@ -849,8 +839,8 @@ type fakeItemBackupper struct {
mock.Mock
}
func (f *fakeItemBackupper) backupItem(ctx *backupContext, obj map[string]interface{}, groupResource string, action Action) error {
args := f.Called(ctx, obj, groupResource, action)
func (f *fakeItemBackupper) backupItem(ctx *backupContext, obj map[string]interface{}, groupResource schema.GroupResource) error {
args := f.Called(ctx, obj, groupResource)
return args.Error(0)
}
@@ -989,26 +979,31 @@ func TestBackupItem(t *testing.T) {
}
var (
actionParam Action
action *fakeAction
backup *v1.Backup
action *fakeAction
backup = &v1.Backup{}
groupResource = schema.ParseGroupResource("resource.group")
log, _ = testlogger.NewNullLogger()
)
if test.customAction {
action = &fakeAction{}
actionParam = action
backup = &v1.Backup{}
}
log, _ := testlogger.NewNullLogger()
ctx := &backupContext{
backup: backup,
namespaceIncludesExcludes: namespaces,
w: w,
logger: log,
w: w,
logger: log,
backedUpItems: make(map[itemKey]struct{}),
resourceIncludesExcludes: collections.NewIncludesExcludes().Includes("*"),
}
if test.customAction {
action = &fakeAction{}
ctx.actions = map[schema.GroupResource]Action{
groupResource: action,
}
backup = ctx.backup
}
b := &realItemBackupper{}
err = b.backupItem(ctx, item, "resource.group", actionParam)
err = b.backupItem(ctx, item, groupResource)
gotError := err != nil
if e, a := test.expectError, gotError; e != a {
t.Fatalf("error: expected %t, got %t", e, a)
@@ -1035,33 +1030,13 @@ func TestBackupItem(t *testing.T) {
t.Fatal(err)
}
if e, a := 1, len(w.headers); e != a {
t.Errorf("headers: expected %d, got %d", e, a)
}
if e, a := test.expectedTarHeaderName, w.headers[0].Name; e != a {
t.Errorf("header.name: expected %s, got %s", e, a)
}
if e, a := int64(len(itemWithoutStatus)), w.headers[0].Size; e != a {
t.Errorf("header.size: expected %d, got %d", e, a)
}
if e, a := byte(tar.TypeReg), w.headers[0].Typeflag; e != a {
t.Errorf("header.typeflag: expected %v, got %v", e, a)
}
if e, a := int64(0755), w.headers[0].Mode; e != a {
t.Errorf("header.mode: expected %d, got %d", e, a)
}
if w.headers[0].ModTime.IsZero() {
t.Errorf("header.modTime: expected it to be set")
}
if e, a := 1, len(w.data); e != a {
t.Errorf("# of data: expected %d, got %d", e, a)
}
require.Equal(t, 1, len(w.headers), "headers")
assert.Equal(t, test.expectedTarHeaderName, w.headers[0].Name, "header.name")
assert.Equal(t, int64(len(itemWithoutStatus)), w.headers[0].Size, "header.size")
assert.Equal(t, byte(tar.TypeReg), w.headers[0].Typeflag, "header.typeflag")
assert.Equal(t, int64(0755), w.headers[0].Mode, "header.mode")
assert.False(t, w.headers[0].ModTime.IsZero(), "header.modTime set")
assert.Equal(t, 1, len(w.data), "# of data")
actual, err := getAsMap(string(w.data[0]))
if err != nil {

View File

@@ -17,8 +17,6 @@ limitations under the License.
package backup
import (
"fmt"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
@@ -56,8 +54,12 @@ func NewVolumeSnapshotAction(snapshotService cloudprovider.SnapshotService) (Act
// Execute triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (a *volumeSnapshotAction) Execute(ctx ActionContext, volume map[string]interface{}, backup *api.Backup) error {
backupName := fmt.Sprintf("%s/%s", backup.Namespace, backup.Name)
func (a *volumeSnapshotAction) Execute(ctx *backupContext, volume map[string]interface{}, backupper itemBackupper) error {
var (
backup = ctx.backup
backupName = kubeutil.NamespaceAndName(backup)
)
if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes {
ctx.infof("Backup %q has volume snapshots disabled; skipping volume snapshot action.", backupName)
return nil
@@ -88,10 +90,7 @@ func (a *volumeSnapshotAction) Execute(ctx ActionContext, volume map[string]inte
return nil
}
expiration := a.clock.Now().Add(backup.Spec.TTL.Duration)
ctx.infof("Backup %q: snapshotting PersistentVolume %q, volume-id %q, expiration %v", backupName, name, volumeID, expiration)
ctx.infof("Backup %q: snapshotting PersistentVolume %q, volume-id %q", backupName, name, volumeID)
snapshotID, err := a.snapshotService.CreateSnapshot(volumeID, pvFailureDomainZone)
if err != nil {
ctx.infof("error creating snapshot for backup %q, volume %q, volume-id %q: %v", backupName, name, volumeID, err)

View File

@@ -200,8 +200,14 @@ func TestVolumeSnapshotAction(t *testing.T) {
log, _ := testlogger.NewNullLogger()
actionCtx := ActionContext{logger: log}
err = action.Execute(actionCtx, pv, backup)
ctx := &backupContext{
backup: backup,
logger: log,
}
// method under test
err = action.Execute(ctx, pv, nil)
gotErr := err != nil
if e, a := test.expectError, gotErr; e != a {

View File

@@ -80,6 +80,8 @@ type Dynamic interface {
List(metav1.ListOptions) (runtime.Object, error)
// Watch watches for changes to objects of a given resource.
Watch(metav1.ListOptions) (watch.Interface, error)
// Get fetches an object by name.
Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error)
}
// dynamicResourceClient implements Dynamic.
@@ -100,3 +102,7 @@ func (d *dynamicResourceClient) List(options metav1.ListOptions) (runtime.Object
func (d *dynamicResourceClient) Watch(options metav1.ListOptions) (watch.Interface, error) {
return d.resourceClient.Watch(options)
}
func (d *dynamicResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) {
return d.resourceClient.Get(name, opts)
}

View File

@@ -620,6 +620,7 @@ func newBackupper(
}
actions["persistentvolumes"] = action
actions["persistentvolumeclaims"] = backup.NewBackupPVAction()
}
return backup.NewKubernetesBackupper(

View File

@@ -35,31 +35,28 @@ import (
// Helper exposes functions for interacting with the Kubernetes discovery
// API.
type Helper interface {
// Mapper gets a RESTMapper for the current set of resources retrieved
// from discovery.
Mapper() meta.RESTMapper
// Resources gets the current set of resources retrieved from discovery
// that are backuppable by Ark.
Resources() []*metav1.APIResourceList
// ResourceFor gets a fully-resolved GroupVersionResource and an
// APIResource for the provided partially-specified GroupVersionResource.
ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, metav1.APIResource, error)
// Refresh pulls an updated set of Ark-backuppable resources from the
// discovery API.
Refresh() error
// ResolveGroupResource uses the RESTMapper to resolve resource to a fully-qualified
// schema.GroupResource. If the RESTMapper is unable to do so, an error is returned instead.
ResolveGroupResource(resource string) (schema.GroupResource, error)
}
type helper struct {
discoveryClient discovery.DiscoveryInterface
logger *logrus.Logger
// lock guards mapper and resources
lock sync.RWMutex
mapper meta.RESTMapper
resources []*metav1.APIResourceList
// lock guards mapper, resources and resourcesMap
lock sync.RWMutex
mapper meta.RESTMapper
resources []*metav1.APIResourceList
resourcesMap map[schema.GroupVersionResource]metav1.APIResource
}
var _ Helper = &helper{}
@@ -74,6 +71,23 @@ func NewHelper(discoveryClient discovery.DiscoveryInterface, logger *logrus.Logg
return h, nil
}
func (h *helper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, metav1.APIResource, error) {
h.lock.RLock()
defer h.lock.RUnlock()
gvr, err := h.mapper.ResourceFor(input)
if err != nil {
return schema.GroupVersionResource{}, metav1.APIResource{}, err
}
apiResource, found := h.resourcesMap[gvr]
if !found {
return schema.GroupVersionResource{}, metav1.APIResource{}, errors.Errorf("APIResource not found for GroupVersionResource %s", gvr)
}
return gvr, apiResource, nil
}
func (h *helper) Refresh() error {
h.lock.Lock()
defer h.lock.Unlock()
@@ -106,6 +120,19 @@ func (h *helper) Refresh() error {
sortResources(h.resources)
h.resourcesMap = make(map[schema.GroupVersionResource]metav1.APIResource)
for _, resourceGroup := range h.resources {
gv, err := schema.ParseGroupVersion(resourceGroup.GroupVersion)
if err != nil {
return errors.Wrapf(err, "unable to parse GroupVersion %s", resourceGroup.GroupVersion)
}
for _, resource := range resourceGroup.APIResources {
gvr := gv.WithResource(resource.Name)
h.resourcesMap[gvr] = resource
}
}
return nil
}
@@ -135,22 +162,8 @@ func sortResources(resources []*metav1.APIResourceList) {
})
}
func (h *helper) Mapper() meta.RESTMapper {
h.lock.RLock()
defer h.lock.RUnlock()
return h.mapper
}
func (h *helper) Resources() []*metav1.APIResourceList {
h.lock.RLock()
defer h.lock.RUnlock()
return h.resources
}
func (h *helper) ResolveGroupResource(resource string) (schema.GroupResource, error) {
gvr, err := h.mapper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
if err != nil {
return schema.GroupResource{}, err
}
return gvr.GroupResource(), nil
}

View File

@@ -72,9 +72,8 @@ type kubernetesRestorer struct {
logger *logrus.Logger
}
// prioritizeResources takes a list of pre-prioritized resources and a full list of resources to restore,
// and returns an ordered list of GroupResource-resolved resources in the order that they should be
// restored.
// prioritizeResources returns an ordered, fully-resolved list of resources to restore based on
// the provided discovery helper, resource priorities, and included/excluded resources.
func prioritizeResources(helper discovery.Helper, priorities []string, includedResources *collections.IncludesExcludes, logger *logrus.Logger) ([]schema.GroupResource, error) {
var ret []schema.GroupResource
@@ -83,16 +82,16 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR
// start by resolving priorities into GroupResources and adding them to ret
for _, r := range priorities {
gr, err := helper.ResolveGroupResource(r)
gvr, _, err := helper.ResourceFor(schema.ParseGroupResource(r).WithVersion(""))
if err != nil {
return nil, err
}
gr := gvr.GroupResource()
if !includedResources.ShouldInclude(gr.String()) {
logger.WithField("groupResource", gr).Info("Not including resource")
continue
}
ret = append(ret, gr)
set.Insert(gr.String())
}
@@ -144,11 +143,11 @@ func NewKubernetesRestorer(
) (Restorer, error) {
r := make(map[schema.GroupResource]restorers.ResourceRestorer)
for gr, restorer := range customRestorers {
resolved, err := discoveryHelper.ResolveGroupResource(gr)
gvr, _, err := discoveryHelper.ResourceFor(schema.ParseGroupResource(gr).WithVersion(""))
if err != nil {
return nil, err
}
r[resolved] = restorer
r[gvr.GroupResource()] = restorer
}
return &kubernetesRestorer{
@@ -187,12 +186,13 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup,
restore.Spec.IncludedResources,
restore.Spec.ExcludedResources,
func(item string) string {
gr, err := kr.discoveryHelper.ResolveGroupResource(item)
gvr, _, err := kr.discoveryHelper.ResourceFor(schema.ParseGroupResource(item).WithVersion(""))
if err != nil {
kr.logger.WithError(err).WithField("resource", item).Error("Unable to resolve resource")
return ""
}
gr := gvr.GroupResource()
return gr.String()
},
)

View File

@@ -85,16 +85,19 @@ func TestPrioritizeResources(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
helper := &FakeDiscoveryHelper{RESTMapper: &FakeMapper{AutoReturnResource: true}}
var helperResourceList []*metav1.APIResourceList
for gv, resources := range test.apiResources {
resourceList := &metav1.APIResourceList{GroupVersion: gv}
for _, resource := range resources {
resourceList.APIResources = append(resourceList.APIResources, metav1.APIResource{Name: resource})
}
helper.ResourceList = append(helper.ResourceList, resourceList)
helperResourceList = append(helperResourceList, resourceList)
}
helper := NewFakeDiscoveryHelper(true, nil)
helper.ResourceList = helperResourceList
includesExcludes := collections.NewIncludesExcludes().Includes(test.includes...).Excludes(test.excludes...)
result, err := prioritizeResources(helper, test.priorities, includesExcludes, logger)

View File

@@ -17,31 +17,96 @@ limitations under the License.
package test
import (
"errors"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type FakeDiscoveryHelper struct {
ResourceList []*metav1.APIResourceList
RESTMapper meta.RESTMapper
ResourceList []*metav1.APIResourceList
Mapper meta.RESTMapper
AutoReturnResource bool
}
func (dh *FakeDiscoveryHelper) Mapper() meta.RESTMapper {
return dh.RESTMapper
func NewFakeDiscoveryHelper(autoReturnResource bool, resources map[schema.GroupVersionResource]schema.GroupVersionResource) *FakeDiscoveryHelper {
helper := &FakeDiscoveryHelper{
AutoReturnResource: autoReturnResource,
Mapper: &FakeMapper{
Resources: resources,
},
}
if resources == nil {
return helper
}
apiResourceMap := make(map[string][]metav1.APIResource)
for _, gvr := range resources {
var gvString string
if gvr.Version != "" && gvr.Group != "" {
gvString = fmt.Sprintf("%s/%s", gvr.Group, gvr.Version)
} else {
gvString = fmt.Sprintf("%s%s", gvr.Group, gvr.Version)
}
apiResourceMap[gvString] = append(apiResourceMap[gvString], metav1.APIResource{Name: gvr.Resource})
}
for group, resources := range apiResourceMap {
helper.ResourceList = append(helper.ResourceList, &metav1.APIResourceList{GroupVersion: group, APIResources: resources})
}
return helper
}
func (dh *FakeDiscoveryHelper) Resources() []*metav1.APIResourceList {
return dh.ResourceList
}
func (dh *FakeDiscoveryHelper) Refresh() error {
return nil
}
func (dh *FakeDiscoveryHelper) ResolveGroupResource(resource string) (schema.GroupResource, error) {
gvr, err := dh.RESTMapper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
if err != nil {
return schema.GroupResource{}, err
func (dh *FakeDiscoveryHelper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, metav1.APIResource, error) {
if dh.AutoReturnResource {
return schema.GroupVersionResource{
Group: input.Group,
Version: input.Version,
Resource: input.Resource,
},
metav1.APIResource{
Name: input.Resource,
},
nil
}
return gvr.GroupResource(), nil
gvr, err := dh.Mapper.ResourceFor(input)
if err != nil {
return schema.GroupVersionResource{}, metav1.APIResource{}, err
}
var gvString string
if gvr.Version != "" && gvr.Group != "" {
gvString = fmt.Sprintf("%s/%s", gvr.Group, gvr.Version)
} else {
gvString = gvr.Version + gvr.Group
}
for _, gr := range dh.ResourceList {
if gr.GroupVersion != gvString {
continue
}
for _, resource := range gr.APIResources {
if resource.Name == gvr.Resource {
return gvr, resource, nil
}
}
}
return schema.GroupVersionResource{}, metav1.APIResource{}, errors.New("APIResource not found")
}

View File

@@ -64,3 +64,8 @@ func (c *FakeDynamicClient) Watch(options metav1.ListOptions) (watch.Interface,
args := c.Called(options)
return args.Get(0).(watch.Interface), args.Error(1)
}
func (c *FakeDynamicClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) {
args := c.Called(name, opts)
return args.Get(0).(*unstructured.Unstructured), args.Error(1)
}