Compare commits

...

25 Commits

Author SHA1 Message Date
Nolan Brubaker
908900e9dd Merge pull request #850 from carlisia/release-0.9
Cherry picks for release 0.9.5
2018-09-17 14:31:09 -04:00
Steve Kriss
73e0d3a62f v0.9.5 changelog entry
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-09-17 11:25:05 -07:00
Steve Kriss
bf59d76a3f get a new metadata accessor after calling backup item actions
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-09-17 11:24:54 -07:00
Steve Kriss
d2ebc6ca88 Merge pull request #813 from nrb/v0.9.4-changelog
Update CHANGELOG for v0.9.4
2018-09-05 09:12:00 -07:00
Nolan Brubaker
79504e7fee Update CHANGELOG for v0.9.4
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-09-05 11:21:36 -04:00
Steve Kriss
8369745070 Merge pull request #812 from nrb/0.9.4-cherry-picks
Fix map merging logic
2018-08-30 14:00:37 -07:00
Nolan Brubaker
1cb098760e Fix map merging logic
Fixes #777

Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-08-30 15:48:08 -04:00
Nolan Brubaker
ca428c7301 Merge pull request #797 from skriss/plugins-fix-0.9
terminate plugin clients explicitly instead of using managed ones
2018-08-30 15:47:12 -04:00
Steve Kriss
1e59553d90 plugins: add unit tests for close client methods
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-08-27 11:29:25 -07:00
Steve Kriss
721b629f0e terminate plugin clients explicitly instead of using managed ones
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-08-27 10:44:40 -07:00
Carlisia
659a852c8c Merge pull request #760 from skriss/v0.9.3-cherrypicks
V0.9.3 cherrypicks
2018-08-10 11:16:49 -07:00
Nolan Brubaker
ca8ae18020 Add v0.9.3 changelog entry
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-08-10 10:12:02 -07:00
Alex Lemaresquier
9f80f01c2a Initialize schedule Prometheus metrics to have them created beforehand (see https://prometheus.io/docs/practices/instrumentation/#avoid-missing-metrics)
Signed-off-by: Alex Lemaresquier <alex+github@lemaresquier.org>
2018-08-10 10:11:42 -07:00
Carlisia
0acd368291 Merge pull request #707 from skriss/release-v0.9.2
Release v0.9.2
2018-07-26 14:19:04 -07:00
Steve Kriss
0640cdab06 update changelog for v0.9.2
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-26 14:06:57 -07:00
Steve Kriss
d21ce48db1 fix bug preventing backup item action item updates from saving
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-26 14:02:01 -07:00
Carlisia
10a1fe2bfa Merge pull request #695 from skriss/release-0.9
cherry-pick commits for v0.9.1
2018-07-23 13:37:00 -07:00
Steve Kriss
07ce4988e3 update CHANGELOG.md for v0.9.1
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 12:13:27 -07:00
Steve Kriss
89e4611d1b cleanup service account action log statement
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 12:13:20 -07:00
Nolan Brubaker
7d6bebadc4 Add RBAC support for 1.7 clusters
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-07-23 12:01:40 -07:00
Steve Kriss
84f872e4d5 delete old deletion requests for backup when processing a new one
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 10:43:39 -07:00
Steve Kriss
b566a7c101 return nil error if 404 encountered when deleting snapshots
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 10:43:32 -07:00
Steve Kriss
b4f8d7cb5f fix tagging latest by using make's ifeq
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 12:06:29 -07:00
Steve Kriss
c23d9dd7c5 exit server if not all Ark CRDs exist at startup
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 11:57:16 -07:00
Steve Kriss
400e8a165b require namespace for backups/etc. to exist at server startup
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 11:53:25 -07:00
34 changed files with 1577 additions and 273 deletions

View File

@@ -1,5 +1,35 @@
# Changelog
#### [v0.9.5](https://github.com/heptio/ark/releases/tag/v0.9.5) - 2018-09-17
#### Bug Fixes
* Fix issue causing restic restores not to work (#834, @skriss)
#### [v0.9.4](https://github.com/heptio/ark/releases/tag/v0.9.4) - 2018-09-05
#### Bug Fixes
* Terminate plugin clients to resolve memory leaks (#797, @skriss)
* Fix nil map errors when merging annotations (#812, @nrb)
#### [v0.9.3](https://github.com/heptio/ark/releases/tag/v0.9.3) - 2018-08-10
#### Bug Fixes
* Initalize Prometheus metrics when creating a new schedule (#689, @lemaral)
#### [v0.9.2](https://github.com/heptio/ark/releases/tag/v0.9.2) - 2018-07-26
##### Bug Fixes:
* Fix issue where modifications made by backup item actions were not being saved to backup tarball (#704, @skriss)
#### [v0.9.1](https://github.com/heptio/ark/releases/tag/v0.9.1) - 2018-07-23
##### Bug Fixes:
* Require namespace for Ark's CRDs to already exist at server startup (#676, @skriss)
* Require all Ark CRDs to exist at server startup (#683, @skriss)
* Fix `latest` tagging in Makefile (#690, @skriss)
* Make Ark compatible with clusters that don't have the `rbac.authorization.k8s.io/v1` API group (#682, @nrb)
* Don't consider missing snapshots an error during backup deletion, limit backup deletion requests per backup to 1 (#687, @skriss)
#### [v0.9.0](https://github.com/heptio/ark/releases/tag/v0.9.0) - 2018-07-06
##### Highlights:

View File

@@ -138,10 +138,10 @@ all-push:
push: .push-$(DOTFILE_IMAGE) push-name
.push-$(DOTFILE_IMAGE): .container-$(DOTFILE_IMAGE)
@docker push $(IMAGE):$(VERSION)
@if [[ "$(TAG_LATEST)" == "true" ]]; then \
docker tag $(IMAGE):$(VERSION) $(IMAGE):latest; \
docker push $(IMAGE):latest; \
fi
ifeq ($(TAG_LATEST), true)
docker tag $(IMAGE):$(VERSION) $(IMAGE):latest
docker push $(IMAGE):latest
endif
@docker images -q $(IMAGE):$(VERSION) > $@
push-name:

View File

@@ -41,27 +41,41 @@ func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
type typeInfo struct {
PluralName string
ItemType runtime.Object
ItemListType runtime.Object
}
func newTypeInfo(pluralName string, itemType, itemListType runtime.Object) typeInfo {
return typeInfo{
PluralName: pluralName,
ItemType: itemType,
ItemListType: itemListType,
}
}
// CustomResources returns a map of all custom resources within the Ark
// API group, keyed on Kind.
func CustomResources() map[string]typeInfo {
return map[string]typeInfo{
"Backup": newTypeInfo("backups", &Backup{}, &BackupList{}),
"Restore": newTypeInfo("restores", &Restore{}, &RestoreList{}),
"Schedule": newTypeInfo("schedules", &Schedule{}, &ScheduleList{}),
"Config": newTypeInfo("configs", &Config{}, &ConfigList{}),
"DownloadRequest": newTypeInfo("downloadrequests", &DownloadRequest{}, &DownloadRequestList{}),
"DeleteBackupRequest": newTypeInfo("deletebackuprequests", &DeleteBackupRequest{}, &DeleteBackupRequestList{}),
"PodVolumeBackup": newTypeInfo("podvolumebackups", &PodVolumeBackup{}, &PodVolumeBackupList{}),
"PodVolumeRestore": newTypeInfo("podvolumerestores", &PodVolumeRestore{}, &PodVolumeRestoreList{}),
"ResticRepository": newTypeInfo("resticrepositories", &ResticRepository{}, &ResticRepositoryList{}),
}
}
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Backup{},
&BackupList{},
&Schedule{},
&ScheduleList{},
&Restore{},
&RestoreList{},
&Config{},
&ConfigList{},
&DownloadRequest{},
&DownloadRequestList{},
&DeleteBackupRequest{},
&DeleteBackupRequestList{},
&PodVolumeBackup{},
&PodVolumeBackupList{},
&PodVolumeRestore{},
&PodVolumeRestoreList{},
&ResticRepository{},
&ResticRepositoryList{},
)
for _, typeInfo := range CustomResources() {
scheme.AddKnownTypes(SchemeGroupVersion, typeInfo.ItemType, typeInfo.ItemListType)
}
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@@ -204,9 +204,22 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
}
}
if err := ib.executeActions(log, obj, groupResource, name, namespace, metadata); err != nil {
updatedObj, err := ib.executeActions(log, obj, groupResource, name, namespace, metadata)
if err != nil {
log.WithError(err).Error("Error executing item actions")
backupErrs = append(backupErrs, err)
// if there was an error running actions, execute post hooks and return
log.Debug("Executing post hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePost); err != nil {
backupErrs = append(backupErrs, err)
}
return kubeerrs.NewAggregate(backupErrs)
}
obj = updatedObj
if metadata, err = meta.Accessor(obj); err != nil {
return errors.WithStack(err)
}
if groupResource == kuberesource.PersistentVolumes {
@@ -285,7 +298,13 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co
return ib.resticBackupper.BackupPodVolumes(ib.backup, pod, log)
}
func (ib *defaultItemBackupper) executeActions(log logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, name, namespace string, metadata metav1.Object) error {
func (ib *defaultItemBackupper) executeActions(
log logrus.FieldLogger,
obj runtime.Unstructured,
groupResource schema.GroupResource,
name, namespace string,
metadata metav1.Object,
) (runtime.Unstructured, error) {
for _, action := range ib.actions {
if !action.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
log.Debug("Skipping action because it does not apply to this resource")
@@ -308,40 +327,40 @@ func (ib *defaultItemBackupper) executeActions(log logrus.FieldLogger, obj runti
logSetter.SetLog(log)
}
if updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup); err == nil {
obj = updatedItem
for _, additionalItem := range additionalItemIdentifiers {
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
if err != nil {
return err
}
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
if err != nil {
return err
}
additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return err
}
if err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()); err != nil {
return err
}
}
} else {
updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup)
if err != nil {
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
// harder to tell when it happened.
log.WithError(err).Error("error executing custom action")
return errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
return nil, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
obj = updatedItem
for _, additionalItem := range additionalItemIdentifiers {
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
if err != nil {
return nil, err
}
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
if err != nil {
return nil, err
}
additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()); err != nil {
return nil, err
}
}
}
return nil
return obj, nil
}
// zoneLabel is the label that stores availability-zone info

View File

@@ -26,6 +26,7 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
api "github.com/heptio/ark/pkg/apis/ark/v1"
resticmocks "github.com/heptio/ark/pkg/restic/mocks"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
@@ -34,6 +35,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@@ -539,6 +541,152 @@ func TestBackupItemNoSkips(t *testing.T) {
}
}
type addAnnotationAction struct{}
func (a *addAnnotationAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []ResourceIdentifier, error) {
// since item actions run out-of-proc, do a deep-copy here to simulate passing data
// across a process boundary.
copy := item.(*unstructured.Unstructured).DeepCopy()
metadata, err := meta.Accessor(copy)
if err != nil {
return copy, nil, nil
}
annotations := metadata.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations["foo"] = "bar"
metadata.SetAnnotations(annotations)
return copy, nil, nil
}
func (a *addAnnotationAction) AppliesTo() (ResourceSelector, error) {
panic("not implemented")
}
func TestItemActionModificationsToItemPersist(t *testing.T) {
var (
w = &fakeTarWriter{}
obj = &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"namespace": "myns",
"name": "bar",
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
nil,
newPVCSnapshotTracker(),
).(*defaultItemBackupper)
)
// our expected backed-up object is the passed-in object plus the annotation
// that the backup item action adds.
expected := obj.DeepCopy()
expected.SetAnnotations(map[string]string{"foo": "bar"})
// method under test
require.NoError(t, b.backupItem(arktest.NewLogger(), obj, schema.ParseGroupResource("resource.group")))
// get the actual backed-up item
require.Len(t, w.data, 1)
actual, err := arktest.GetAsMap(string(w.data[0]))
require.NoError(t, err)
assert.EqualValues(t, expected.Object, actual)
}
func TestResticAnnotationsPersist(t *testing.T) {
var (
w = &fakeTarWriter{}
obj = &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"namespace": "myns",
"name": "bar",
"annotations": map[string]interface{}{
"backup.ark.heptio.com/backup-volumes": "volume-1,volume-2",
},
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
}
resticBackupper = &resticmocks.Backupper{}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
resticBackupper,
newPVCSnapshotTracker(),
).(*defaultItemBackupper)
)
resticBackupper.
On("BackupPodVolumes", mock.Anything, mock.Anything, mock.Anything).
Return(map[string]string{"volume-1": "snapshot-1", "volume-2": "snapshot-2"}, nil)
// our expected backed-up object is the passed-in object, plus the annotation
// that the backup item action adds, plus the annotations that the restic
// backupper adds
expected := obj.DeepCopy()
annotations := expected.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations["foo"] = "bar"
annotations["snapshot.ark.heptio.com/volume-1"] = "snapshot-1"
annotations["snapshot.ark.heptio.com/volume-2"] = "snapshot-2"
expected.SetAnnotations(annotations)
// method under test
require.NoError(t, b.backupItem(arktest.NewLogger(), obj, schema.ParseGroupResource("pods")))
// get the actual backed-up item
require.Len(t, w.data, 1)
actual, err := arktest.GetAsMap(string(w.data[0]))
require.NoError(t, err)
assert.EqualValues(t, expected.Object, actual)
}
func TestTakePVSnapshot(t *testing.T) {
iops := int64(1000)

140
pkg/backup/rbac.go Normal file
View File

@@ -0,0 +1,140 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"github.com/pkg/errors"
rbac "k8s.io/api/rbac/v1"
rbacbeta "k8s.io/api/rbac/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
rbacbetaclient "k8s.io/client-go/kubernetes/typed/rbac/v1beta1"
)
// ClusterRoleBindingLister allows for listing ClusterRoleBindings in a version-independent way.
type ClusterRoleBindingLister interface {
// List returns a slice of ClusterRoleBindings which can represent either v1 or v1beta1 ClusterRoleBindings.
List() ([]ClusterRoleBinding, error)
}
// noopClusterRoleBindingLister exists to handle clusters where RBAC is disabled.
type noopClusterRoleBindingLister struct {
}
func (noop noopClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
return []ClusterRoleBinding{}, nil
}
type v1ClusterRoleBindingLister struct {
client rbacclient.ClusterRoleBindingInterface
}
func (v1 v1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
crbList, err := v1.client.List(metav1.ListOptions{})
if err != nil {
return nil, errors.WithStack(err)
}
var crbs []ClusterRoleBinding
for _, crb := range crbList.Items {
crbs = append(crbs, v1ClusterRoleBinding{crb: crb})
}
return crbs, nil
}
type v1beta1ClusterRoleBindingLister struct {
client rbacbetaclient.ClusterRoleBindingInterface
}
func (v1beta1 v1beta1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
crbList, err := v1beta1.client.List(metav1.ListOptions{})
if err != nil {
return nil, errors.WithStack(err)
}
var crbs []ClusterRoleBinding
for _, crb := range crbList.Items {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: crb})
}
return crbs, nil
}
// NewClusterRoleBindingListerMap creates a map of RBAC version strings to their associated
// ClusterRoleBindingLister structs.
// Necessary so that callers to the ClusterRoleBindingLister interfaces don't need the kubernetes.Interface.
func NewClusterRoleBindingListerMap(clientset kubernetes.Interface) map[string]ClusterRoleBindingLister {
return map[string]ClusterRoleBindingLister{
rbac.SchemeGroupVersion.Version: v1ClusterRoleBindingLister{client: clientset.RbacV1().ClusterRoleBindings()},
rbacbeta.SchemeGroupVersion.Version: v1beta1ClusterRoleBindingLister{client: clientset.RbacV1beta1().ClusterRoleBindings()},
"": noopClusterRoleBindingLister{},
}
}
// ClusterRoleBinding abstracts access to ClusterRoleBindings whether they're v1 or v1beta1.
type ClusterRoleBinding interface {
// Name returns the name of a ClusterRoleBinding.
Name() string
// ServiceAccountSubjects returns the names of subjects that are service accounts in the given namespace.
ServiceAccountSubjects(namespace string) []string
// RoleRefName returns the name of a ClusterRoleBinding's RoleRef.
RoleRefName() string
}
type v1ClusterRoleBinding struct {
crb rbac.ClusterRoleBinding
}
func (c v1ClusterRoleBinding) Name() string {
return c.crb.Name
}
func (c v1ClusterRoleBinding) RoleRefName() string {
return c.crb.RoleRef.Name
}
func (c v1ClusterRoleBinding) ServiceAccountSubjects(namespace string) []string {
var saSubjects []string
for _, s := range c.crb.Subjects {
if s.Kind == rbac.ServiceAccountKind && s.Namespace == namespace {
saSubjects = append(saSubjects, s.Name)
}
}
return saSubjects
}
type v1beta1ClusterRoleBinding struct {
crb rbacbeta.ClusterRoleBinding
}
func (c v1beta1ClusterRoleBinding) Name() string {
return c.crb.Name
}
func (c v1beta1ClusterRoleBinding) RoleRefName() string {
return c.crb.RoleRef.Name
}
func (c v1beta1ClusterRoleBinding) ServiceAccountSubjects(namespace string) []string {
var saSubjects []string
for _, s := range c.crb.Subjects {
if s.Kind == rbac.ServiceAccountKind && s.Namespace == namespace {
saSubjects = append(saSubjects, s.Name)
}
}
return saSubjects
}

View File

@@ -25,28 +25,41 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
"github.com/heptio/ark/pkg/apis/ark/v1"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/kuberesource"
)
// serviceAccountAction implements ItemAction.
type serviceAccountAction struct {
log logrus.FieldLogger
clusterRoleBindings []rbac.ClusterRoleBinding
clusterRoleBindings []ClusterRoleBinding
}
// NewServiceAccountAction creates a new ItemAction for service accounts.
func NewServiceAccountAction(log logrus.FieldLogger, client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) {
clusterRoleBindings, err := client.List(metav1.ListOptions{})
func NewServiceAccountAction(log logrus.FieldLogger, clusterRoleBindingListers map[string]ClusterRoleBindingLister, discoveryHelper arkdiscovery.Helper) (ItemAction, error) {
// Look up the supported RBAC version
var supportedAPI metav1.GroupVersionForDiscovery
for _, ag := range discoveryHelper.APIGroups() {
if ag.Name == rbac.GroupName {
supportedAPI = ag.PreferredVersion
break
}
}
crbLister := clusterRoleBindingListers[supportedAPI.Version]
// This should be safe because the List call will return a 0-item slice
// if there's no matching API version.
crbs, err := crbLister.List()
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
return &serviceAccountAction{
log: log,
clusterRoleBindings: clusterRoleBindings.Items,
clusterRoleBindings: crbs,
}, nil
}
@@ -76,14 +89,14 @@ func (a *serviceAccountAction) Execute(item runtime.Unstructured, backup *v1.Bac
roles = sets.NewString()
)
for _, clusterRoleBinding := range a.clusterRoleBindings {
for _, subj := range clusterRoleBinding.Subjects {
if subj.Kind == rbac.ServiceAccountKind && subj.Namespace == namespace && subj.Name == name {
for _, crb := range a.clusterRoleBindings {
for _, s := range crb.ServiceAccountSubjects(namespace) {
if s == name {
a.log.Infof("Adding clusterrole %s and clusterrolebinding %s to additionalItems since serviceaccount %s/%s is a subject",
clusterRoleBinding.RoleRef.Name, clusterRoleBinding.Name, namespace, name)
crb.RoleRefName(), crb.Name(), namespace, name)
bindings.Insert(clusterRoleBinding.Name)
roles.Insert(clusterRoleBinding.RoleRef.Name)
bindings.Insert(crb.Name())
roles.Insert(crb.RoleRefName())
break
}
}

View File

@@ -24,29 +24,61 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/rbac/v1"
rbac "k8s.io/api/rbac/v1"
rbacbeta "k8s.io/api/rbac/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
"github.com/heptio/ark/pkg/kuberesource"
arktest "github.com/heptio/ark/pkg/util/test"
)
type fakeClusterRoleBindingClient struct {
clusterRoleBindings []v1.ClusterRoleBinding
func newV1ClusterRoleBindingList(rbacCRBList []rbac.ClusterRoleBinding) []ClusterRoleBinding {
var crbs []ClusterRoleBinding
for _, c := range rbacCRBList {
crbs = append(crbs, v1ClusterRoleBinding{crb: c})
}
rbacclient.ClusterRoleBindingInterface
return crbs
}
func (c *fakeClusterRoleBindingClient) List(opts metav1.ListOptions) (*v1.ClusterRoleBindingList, error) {
return &v1.ClusterRoleBindingList{
Items: c.clusterRoleBindings,
}, nil
func newV1beta1ClusterRoleBindingList(rbacCRBList []rbacbeta.ClusterRoleBinding) []ClusterRoleBinding {
var crbs []ClusterRoleBinding
for _, c := range rbacCRBList {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: c})
}
return crbs
}
type FakeV1ClusterRoleBindingLister struct {
v1crbs []rbac.ClusterRoleBinding
}
func (f FakeV1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
var crbs []ClusterRoleBinding
for _, c := range f.v1crbs {
crbs = append(crbs, v1ClusterRoleBinding{crb: c})
}
return crbs, nil
}
type FakeV1beta1ClusterRoleBindingLister struct {
v1beta1crbs []rbacbeta.ClusterRoleBinding
}
func (f FakeV1beta1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
var crbs []ClusterRoleBinding
for _, c := range f.v1beta1crbs {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: c})
}
return crbs, nil
}
func TestServiceAccountActionAppliesTo(t *testing.T) {
a, _ := NewServiceAccountAction(arktest.NewLogger(), &fakeClusterRoleBindingClient{})
// Instantiating the struct directly since using
// NewServiceAccountAction requires a full kubernetes clientset
a := &serviceAccountAction{}
actual, err := a.AppliesTo()
require.NoError(t, err)
@@ -57,11 +89,119 @@ func TestServiceAccountActionAppliesTo(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestNewServiceAccountAction(t *testing.T) {
tests := []struct {
name string
version string
expectedCRBs []ClusterRoleBinding
}{
{
name: "rbac v1 API instantiates an saAction",
version: rbac.SchemeGroupVersion.Version,
expectedCRBs: []ClusterRoleBinding{
v1ClusterRoleBinding{
crb: rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-1",
},
},
},
v1ClusterRoleBinding{
crb: rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-2",
},
},
},
},
},
{
name: "rbac v1beta1 API instantiates an saAction",
version: rbacbeta.SchemeGroupVersion.Version,
expectedCRBs: []ClusterRoleBinding{
v1beta1ClusterRoleBinding{
crb: rbacbeta.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-1",
},
},
},
v1beta1ClusterRoleBinding{
crb: rbacbeta.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-2",
},
},
},
},
},
{
name: "no RBAC API instantiates an saAction with empty slice",
version: "",
expectedCRBs: []ClusterRoleBinding{},
},
}
// Set up all of our fakes outside the test loop
discoveryHelper := arktest.FakeDiscoveryHelper{}
logger := arktest.NewLogger()
v1crbs := []rbac.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-2",
},
},
}
v1beta1crbs := []rbacbeta.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-2",
},
},
}
clusterRoleBindingListers := map[string]ClusterRoleBindingLister{
rbac.SchemeGroupVersion.Version: FakeV1ClusterRoleBindingLister{v1crbs: v1crbs},
rbacbeta.SchemeGroupVersion.Version: FakeV1beta1ClusterRoleBindingLister{v1beta1crbs: v1beta1crbs},
"": noopClusterRoleBindingLister{},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// We only care about the preferred version, nothing else in the list
discoveryHelper.APIGroupsList = []metav1.APIGroup{
{
Name: rbac.GroupName,
PreferredVersion: metav1.GroupVersionForDiscovery{
Version: test.version,
},
},
}
action, err := NewServiceAccountAction(logger, clusterRoleBindingListers, &discoveryHelper)
require.NoError(t, err)
saAction, ok := action.(*serviceAccountAction)
require.True(t, ok)
assert.Equal(t, test.expectedCRBs, saAction.clusterRoleBindings)
})
}
}
func TestServiceAccountActionExecute(t *testing.T) {
tests := []struct {
name string
serviceAccount runtime.Unstructured
crbs []v1.ClusterRoleBinding
crbs []rbac.ClusterRoleBinding
expectedAdditionalItems []ResourceIdentifier
}{
{
@@ -91,9 +231,9 @@ func TestServiceAccountActionExecute(t *testing.T) {
}
}
`),
crbs: []v1.ClusterRoleBinding{
crbs: []rbac.ClusterRoleBinding{
{
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
@@ -105,17 +245,17 @@ func TestServiceAccountActionExecute(t *testing.T) {
Name: "ark",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "non-matching-ns",
Name: "ark",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role",
},
},
@@ -134,19 +274,19 @@ func TestServiceAccountActionExecute(t *testing.T) {
}
}
`),
crbs: []v1.ClusterRoleBinding{
crbs: []rbac.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-1",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-1",
},
},
@@ -154,19 +294,19 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-2",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-2",
},
},
@@ -174,14 +314,14 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-3",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-3",
},
},
@@ -189,9 +329,9 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-4",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
@@ -201,7 +341,7 @@ func TestServiceAccountActionExecute(t *testing.T) {
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-4",
},
},
@@ -235,14 +375,221 @@ func TestServiceAccountActionExecute(t *testing.T) {
},
}
crbClient := &fakeClusterRoleBindingClient{}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
crbClient.clusterRoleBindings = test.crbs
action, err := NewServiceAccountAction(arktest.NewLogger(), crbClient)
require.Nil(t, err)
// Create the action struct directly so we don't need to mock a clientset
action := &serviceAccountAction{
log: arktest.NewLogger(),
clusterRoleBindings: newV1ClusterRoleBindingList(test.crbs),
}
res, additional, err := action.Execute(test.serviceAccount, nil)
assert.Equal(t, test.serviceAccount, res)
assert.Nil(t, err)
// ensure slices are ordered for valid comparison
sort.Slice(test.expectedAdditionalItems, func(i, j int) bool {
return fmt.Sprintf("%s.%s", test.expectedAdditionalItems[i].GroupResource.String(), test.expectedAdditionalItems[i].Name) <
fmt.Sprintf("%s.%s", test.expectedAdditionalItems[j].GroupResource.String(), test.expectedAdditionalItems[j].Name)
})
sort.Slice(additional, func(i, j int) bool {
return fmt.Sprintf("%s.%s", additional[i].GroupResource.String(), additional[i].Name) <
fmt.Sprintf("%s.%s", additional[j].GroupResource.String(), additional[j].Name)
})
assert.Equal(t, test.expectedAdditionalItems, additional)
})
}
}
func TestServiceAccountActionExecuteOnBeta1(t *testing.T) {
tests := []struct {
name string
serviceAccount runtime.Unstructured
crbs []rbacbeta.ClusterRoleBinding
expectedAdditionalItems []ResourceIdentifier
}{
{
name: "no crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: nil,
expectedAdditionalItems: nil,
},
{
name: "no matching crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: []rbacbeta.ClusterRoleBinding{
{
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: "non-matching-kind",
Namespace: "heptio-ark",
Name: "ark",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "non-matching-ns",
Name: "ark",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role",
},
},
},
expectedAdditionalItems: nil,
},
{
name: "some matching crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: []rbacbeta.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-1",
},
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-2",
},
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-2",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-3",
},
Subjects: []rbacbeta.Subject{
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-3",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-4",
},
Subjects: []rbacbeta.Subject{
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-4",
},
},
},
expectedAdditionalItems: []ResourceIdentifier{
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-2",
},
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-3",
},
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-4",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-2",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-3",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-4",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create the action struct directly so we don't need to mock a clientset
action := &serviceAccountAction{
log: arktest.NewLogger(),
clusterRoleBindings: newV1beta1ClusterRoleBindingList(test.crbs),
}
res, additional, err := action.Execute(test.serviceAccount, nil)

View File

@@ -20,6 +20,7 @@ import (
"regexp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
@@ -219,7 +220,17 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.ec2.DeleteSnapshot(req)
return errors.WithStack(err)
// if it's a NotFound error, we don't need to return an error
// since the snapshot is not there.
// see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidSnapshot.NotFound" {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
var ebsVolumeIDRegex = regexp.MustCompile("vol-.*")

View File

@@ -19,6 +19,7 @@ package azure
import (
"context"
"fmt"
"net/http"
"os"
"regexp"
"strings"
@@ -280,7 +281,16 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
err = <-errChan
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if azureErr, ok := err.(autorest.DetailedError); ok && azureErr.StatusCode == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func getComputeResourceName(subscription, resourceGroup, resource, name string) string {

View File

@@ -19,6 +19,7 @@ package gcp
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"github.com/pkg/errors"
@@ -27,6 +28,7 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"k8s.io/apimachinery/pkg/runtime"
@@ -212,7 +214,16 @@ func getSnapshotTags(arkTags map[string]string, diskDescription string, log logr
func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.gce.Snapshots.Delete(b.project, snapshotID).Do()
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if gcpErr, ok := err.(*googleapi.Error); ok && gcpErr.Code == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (b *blockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {

View File

@@ -28,6 +28,7 @@ import (
"github.com/heptio/ark/pkg/cloudprovider/azure"
"github.com/heptio/ark/pkg/cloudprovider/gcp"
"github.com/heptio/ark/pkg/cmd"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
arkplugin "github.com/heptio/ark/pkg/plugin"
"github.com/heptio/ark/pkg/restore"
)
@@ -87,7 +88,13 @@ func NewCommand(f client.Factory) *cobra.Command {
clientset, err := f.KubeClient()
cmd.CheckError(err)
action, err = backup.NewServiceAccountAction(logger, clientset.RbacV1().ClusterRoleBindings())
discoveryHelper, err := arkdiscovery.NewHelper(clientset.Discovery(), logger)
cmd.CheckError(err)
action, err = backup.NewServiceAccountAction(
logger,
backup.NewClusterRoleBindingListerMap(clientset),
discoveryHelper)
cmd.CheckError(err)
default:
logger.Fatal("Unrecognized plugin name")

View File

@@ -34,11 +34,12 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
@@ -152,6 +153,7 @@ type server struct {
snapshotService cloudprovider.SnapshotService
discoveryClient discovery.DiscoveryInterface
clientPool dynamic.ClientPool
discoveryHelper arkdiscovery.Helper
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
@@ -203,11 +205,23 @@ func newServer(namespace, baseName, pluginDir, metricsAddr string, logger *logru
}
func (s *server) run() error {
defer s.pluginManager.CleanupClients()
defer s.pluginManager.CloseAllClients()
signals.CancelOnShutdown(s.cancelFunc, s.logger)
if err := s.ensureArkNamespace(); err != nil {
// Since s.namespace, which specifies where backups/restores/schedules/etc. should live,
// *could* be different from the namespace where the Ark server pod runs, check to make
// sure it exists, and fail fast if it doesn't.
if err := s.namespaceExists(s.namespace); err != nil {
return err
}
if err := s.initDiscoveryHelper(); err != nil {
return err
}
// check to ensure all Ark CRDs exist
if err := s.arkResourcesExist(); err != nil {
return err
}
@@ -244,22 +258,78 @@ func (s *server) run() error {
return nil
}
func (s *server) ensureArkNamespace() error {
logContext := s.logger.WithField("namespace", s.namespace)
// namespaceExists returns nil if namespace can be successfully
// gotten from the kubernetes API, or an error otherwise.
func (s *server) namespaceExists(namespace string) error {
s.logger.WithField("namespace", namespace).Info("Checking existence of namespace")
logContext.Info("Ensuring namespace exists for backups")
defaultNamespace := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: s.namespace,
},
if _, err := s.kubeClient.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}); err != nil {
return errors.WithStack(err)
}
if created, err := kube.EnsureNamespaceExists(&defaultNamespace, s.kubeClient.CoreV1().Namespaces()); created {
logContext.Info("Namespace created")
} else if err != nil {
s.logger.WithField("namespace", namespace).Info("Namespace exists")
return nil
}
// initDiscoveryHelper instantiates the server's discovery helper and spawns a
// goroutine to call Refresh() every 5 minutes.
func (s *server) initDiscoveryHelper() error {
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient, s.logger)
if err != nil {
return err
}
logContext.Info("Namespace already exists")
s.discoveryHelper = discoveryHelper
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
s.logger.WithError(err).Error("Error refreshing discovery")
}
},
5*time.Minute,
s.ctx.Done(),
)
return nil
}
// arkResourcesExist checks for the existence of each Ark CRD via discovery
// and returns an error if any of them don't exist.
func (s *server) arkResourcesExist() error {
s.logger.Info("Checking existence of Ark custom resource definitions")
var arkGroupVersion *metav1.APIResourceList
for _, gv := range s.discoveryHelper.Resources() {
if gv.GroupVersion == api.SchemeGroupVersion.String() {
arkGroupVersion = gv
break
}
}
if arkGroupVersion == nil {
return errors.Errorf("Ark API group %s not found", api.SchemeGroupVersion)
}
foundResources := sets.NewString()
for _, resource := range arkGroupVersion.APIResources {
foundResources.Insert(resource.Kind)
}
var errs []error
for kind := range api.CustomResources() {
if foundResources.Has(kind) {
s.logger.WithField("kind", kind).Debug("Found custom resource")
continue
}
errs = append(errs, errors.Errorf("custom resource %s not found in Ark API group %s", kind, api.SchemeGroupVersion))
}
if len(errs) > 0 {
return kubeerrs.NewAggregate(errs)
}
s.logger.Info("All Ark custom resource definitions exist")
return nil
}
@@ -541,27 +611,13 @@ func (s *server) runControllers(config *api.Config) error {
wg.Done()
}()
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient, s.logger)
if err != nil {
return err
}
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
s.logger.WithError(err).Error("Error refreshing discovery")
}
},
5*time.Minute,
ctx.Done(),
)
if config.RestoreOnlyMode {
s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers")
} else {
backupTracker := controller.NewBackupTracker()
backupper, err := backup.NewKubernetesBackupper(
discoveryHelper,
s.discoveryHelper,
client.NewDynamicFactory(s.clientPool),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.snapshotService,
@@ -595,6 +651,7 @@ func (s *server) runControllers(config *api.Config) error {
s.sharedInformerFactory.Ark().V1().Schedules(),
config.ScheduleSyncPeriod.Duration,
s.logger,
s.metrics,
)
wg.Add(1)
go func() {
@@ -605,6 +662,7 @@ func (s *server) runControllers(config *api.Config) error {
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
s.arkClient.ArkV1(),
config.GCSyncPeriod.Duration,
)
@@ -637,7 +695,7 @@ func (s *server) runControllers(config *api.Config) error {
}
restorer, err := restore.NewKubernetesRestorer(
discoveryHelper,
s.discoveryHelper,
client.NewDynamicFactory(s.clientPool),
s.backupService,
s.snapshotService,

View File

@@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/heptio/ark/pkg/apis/ark/v1"
arktest "github.com/heptio/ark/pkg/util/test"
)
@@ -51,3 +53,47 @@ func TestApplyConfigDefaults(t *testing.T) {
assert.Equal(t, 3*time.Minute, c.ScheduleSyncPeriod.Duration)
assert.Equal(t, []string{"a", "b"}, c.ResourcePriorities)
}
func TestArkResourcesExist(t *testing.T) {
var (
fakeDiscoveryHelper = &arktest.FakeDiscoveryHelper{}
server = &server{
logger: arktest.NewLogger(),
discoveryHelper: fakeDiscoveryHelper,
}
)
// Ark API group doesn't exist in discovery: should error
fakeDiscoveryHelper.ResourceList = []*metav1.APIResourceList{
{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{
Name: "Backups",
Kind: "Backup",
},
},
},
}
assert.Error(t, server.arkResourcesExist())
// Ark API group doesn't contain any custom resources: should error
arkAPIResourceList := &metav1.APIResourceList{
GroupVersion: v1.SchemeGroupVersion.String(),
}
fakeDiscoveryHelper.ResourceList = append(fakeDiscoveryHelper.ResourceList, arkAPIResourceList)
assert.Error(t, server.arkResourcesExist())
// Ark API group contains all custom resources: should not error
for kind := range v1.CustomResources() {
arkAPIResourceList.APIResources = append(arkAPIResourceList.APIResources, metav1.APIResource{
Kind: kind,
})
}
assert.NoError(t, server.arkResourcesExist())
// Ark API group contains some but not all custom resources: should error
arkAPIResourceList.APIResources = arkAPIResourceList.APIResources[:3]
assert.Error(t, server.arkResourcesExist())
}

View File

@@ -204,7 +204,7 @@ func TestProcessBackup(t *testing.T) {
cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything, mock.Anything).Return(nil)
pluginManager.On("GetBackupItemActions", backup.Name).Return(nil, nil)
pluginManager.On("CloseBackupItemActions", backup.Name).Return(nil)
pluginManager.On("CloseBackupItemActions", backup.Name).Return()
}
// this is necessary so the Patch() call returns the appropriate object
@@ -321,17 +321,9 @@ type MockManager struct {
}
// CloseBackupItemActions provides a mock function with given fields: backupName
func (_m *MockManager) CloseBackupItemActions(backupName string) error {
ret := _m.Called(backupName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(backupName)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) CloseBackupItemActions(backupName string) {
_ = _m.Called(backupName)
return
}
// GetBackupItemActions provides a mock function with given fields: backupName, logger, level
@@ -358,17 +350,9 @@ func (_m *MockManager) GetBackupItemActions(backupName string) ([]backup.ItemAct
}
// CloseRestoreItemActions provides a mock function with given fields: restoreName
func (_m *MockManager) CloseRestoreItemActions(restoreName string) error {
ret := _m.Called(restoreName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(restoreName)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) CloseRestoreItemActions(restoreName string) {
_ = _m.Called(restoreName)
return
}
// GetRestoreItemActions provides a mock function with given fields: restoreName, logger, level
@@ -440,8 +424,8 @@ func (_m *MockManager) GetObjectStore(name string) (cloudprovider.ObjectStore, e
return r0, r1
}
// CleanupClients provides a mock function
func (_m *MockManager) CleanupClients() {
// CloseAllClients provides a mock function
func (_m *MockManager) CloseAllClients() {
_ = _m.Called()
return
}

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
)
@@ -103,8 +104,7 @@ func NewBackupDeletionController(
deleteBackupRequestInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
AddFunc: c.enqueue,
},
)
@@ -162,6 +162,12 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return err
}
// Remove any existing deletion requests for this backup so we only have
// one at a time
if errs := c.deleteExistingDeletionRequests(req, log); errs != nil {
return kubeerrs.NewAggregate(errs)
}
// Don't allow deleting an in-progress backup
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
@@ -303,6 +309,30 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return nil
}
func (c *backupDeletionController) deleteExistingDeletionRequests(req *v1.DeleteBackupRequest, log logrus.FieldLogger) []error {
log.Info("Removing existing deletion requests for backup")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
v1.BackupNameLabel: req.Spec.BackupName,
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector)
if err != nil {
return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")}
}
var errs []error
for _, dbr := range dbrs {
if dbr.Name == req.Name {
continue
}
if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(dbr.Name, nil); err != nil {
errs = append(errs, errors.WithStack(err))
}
}
return errs
}
func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error {
if c.resticMgr == nil {
return nil

View File

@@ -17,7 +17,6 @@ limitations under the License.
package controller
import (
"context"
"fmt"
"testing"
"time"
@@ -26,7 +25,6 @@ import (
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/util/kube"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@@ -36,74 +34,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
)
func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) {
req := pkgbackup.NewDeleteBackupRequest("foo", "uid")
req.Namespace = "heptio-ark"
expected := kube.NamespaceAndName(req)
client := fake.NewSimpleClientset(req)
fakeWatch := watch.NewFake()
defer fakeWatch.Stop()
client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil))
sharedInformers := informers.NewSharedInformerFactory(client, 0)
controller := NewBackupDeletionController(
arktest.NewLogger(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(), // deleteBackupRequestClient
client.ArkV1(), // backupClient
nil, // snapshotService
nil, // backupService
"bucket",
sharedInformers.Ark().V1().Restores(),
client.ArkV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Ark().V1().PodVolumeBackups(),
).(*backupDeletionController)
// disable resync handler since we don't want to test it here
controller.resyncFunc = nil
keys := make(chan string)
controller.syncHandler = func(key string) error {
keys <- key
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go sharedInformers.Start(ctx.Done())
go controller.Run(ctx, 1)
// wait for the AddFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for AddFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
fakeWatch.Add(req)
// wait for the UpdateFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for UpdateFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
}
func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
@@ -236,6 +169,62 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
assert.Equal(t, expectedActions, td.client.Actions())
})
t.Run("existing deletion requests for the backup are deleted", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)
// add the backup to the tracker so the execution of processRequest doesn't progress
// past checking for an in-progress backup. this makes validation easier.
td.controller.backupTracker.Add(td.req.Namespace, td.req.Spec.BackupName)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(td.req))
existing := &v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar",
Labels: map[string]string{
v1.BackupNameLabel: td.req.Spec.BackupName,
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: td.req.Spec.BackupName,
},
}
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(existing))
_, err := td.client.ArkV1().DeleteBackupRequests(td.req.Namespace).Create(existing)
require.NoError(t, err)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(
&v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar-2",
Labels: map[string]string{
v1.BackupNameLabel: "some-other-backup",
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: "some-other-backup",
},
},
))
assert.NoError(t, td.controller.processRequest(td.req))
expectedDeleteAction := core.NewDeleteAction(
v1.SchemeGroupVersion.WithResource("deletebackuprequests"),
td.req.Namespace,
"bar",
)
// first action is the Create of an existing DBR for the backup as part of test data setup
// second action is the Delete of the existing DBR, which we're validating
// third action is the Patch of the DBR to set it to processed with an error
require.Len(t, td.client.Actions(), 3)
assert.Equal(t, expectedDeleteAction, td.client.Actions()[1])
})
t.Run("deleting an in progress backup isn't allowed", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@@ -39,6 +40,7 @@ type gcController struct {
logger logrus.FieldLogger
backupLister listers.BackupLister
deleteBackupRequestLister listers.DeleteBackupRequestLister
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
syncPeriod time.Duration
@@ -49,6 +51,7 @@ type gcController struct {
func NewGCController(
logger logrus.FieldLogger,
backupInformer informers.BackupInformer,
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
syncPeriod time.Duration,
) Interface {
@@ -62,12 +65,16 @@ func NewGCController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
backupLister: backupInformer.Lister(),
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
deleteBackupRequestClient: deleteBackupRequestClient,
logger: logger,
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced)
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
deleteBackupRequestInformer.Informer().HasSynced,
)
c.resyncPeriod = syncPeriod
c.resyncFunc = c.enqueueAllBackups
@@ -130,12 +137,32 @@ func (c *gcController) processQueueItem(key string) error {
return nil
}
log.Info("Backup has expired. Creating a DeleteBackupRequest.")
log.Info("Backup has expired")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
arkv1api.BackupNameLabel: backup.Name,
arkv1api.BackupUIDLabel: string(backup.UID),
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector)
if err != nil {
return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")
}
// if there's an existing unprocessed deletion request for this backup, don't create
// another one
for _, dbr := range dbrs {
switch dbr.Status.Phase {
case "", arkv1api.DeleteBackupRequestPhaseNew, arkv1api.DeleteBackupRequestPhaseInProgress:
log.Info("Backup already has a pending deletion request")
return nil
}
}
log.Info("Creating a new deletion request")
req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID))
_, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req)
if err != nil {
if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req); err != nil {
return errors.Wrap(err, "error creating DeleteBackupRequest")
}

View File

@@ -25,7 +25,9 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
@@ -46,6 +48,7 @@ func TestGCControllerEnqueueAllBackups(t *testing.T) {
controller = NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -109,6 +112,7 @@ func TestGCControllerHasUpdateFunc(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -152,6 +156,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
tests := []struct {
name string
backup *api.Backup
deleteBackupRequests []*api.DeleteBackupRequest
expectDeletion bool
createDeleteBackupRequestError bool
expectError bool
@@ -160,19 +165,63 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
name: "can't find backup - no error",
},
{
name: "expired backup is deleted",
name: "unexpired backup is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
Backup,
expectDeletion: false,
},
{
name: "expired backup with no pending deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
expectDeletion: true,
},
{
name: "unexpired backup is not deleted",
name: "expired backup with a pending deletion request is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseInProgress,
},
},
},
expectDeletion: false,
},
{
name: "expired backup with only processed deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseProcessed,
},
},
},
expectDeletion: true,
},
{
name: "create DeleteBackupRequest error returns an error",
backup: arktest.NewTestBackup().WithName("backup-1").
@@ -194,6 +243,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -205,6 +255,10 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup)
}
for _, dbr := range test.deleteBackupRequests {
sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(dbr)
}
if test.createDeleteBackupRequestError {
client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("foo")
@@ -216,7 +270,12 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
assert.Equal(t, test.expectError, gotErr)
if test.expectDeletion {
assert.Len(t, client.Actions(), 1)
require.Len(t, client.Actions(), 1)
createAction, ok := client.Actions()[0].(core.CreateAction)
require.True(t, ok)
assert.Equal(t, "deletebackuprequests", createAction.GetResource().Resource)
} else {
assert.Len(t, client.Actions(), 0)
}

View File

@@ -377,7 +377,7 @@ func TestProcessRestore(t *testing.T) {
if test.restore != nil {
pluginManager.On("GetRestoreItemActions", test.restore.Name).Return(nil, nil)
pluginManager.On("CloseRestoreItemActions", test.restore.Name).Return(nil)
pluginManager.On("CloseRestoreItemActions", test.restore.Name).Return()
}
err = c.processRestore(key)

View File

@@ -41,6 +41,7 @@ import (
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/metrics"
kubeutil "github.com/heptio/ark/pkg/util/kube"
)
@@ -55,6 +56,7 @@ type scheduleController struct {
syncPeriod time.Duration
clock clock.Clock
logger logrus.FieldLogger
metrics *metrics.ServerMetrics
}
func NewScheduleController(
@@ -64,6 +66,7 @@ func NewScheduleController(
schedulesInformer informers.ScheduleInformer,
syncPeriod time.Duration,
logger logrus.FieldLogger,
metrics *metrics.ServerMetrics,
) *scheduleController {
if syncPeriod < time.Minute {
logger.WithField("syncPeriod", syncPeriod).Info("Provided schedule sync period is too short. Setting to 1 minute")
@@ -80,6 +83,7 @@ func NewScheduleController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
logger: logger,
metrics: metrics,
}
c.syncHandler = c.processSchedule
@@ -106,6 +110,10 @@ func NewScheduleController(
return
}
c.queue.Add(key)
scheduleName := schedule.GetName()
c.logger.Info("Creating schedule ", scheduleName)
//Init Prometheus metrics to 0 to have them flowing up
metrics.InitSchedule(scheduleName)
},
},
)

View File

@@ -34,6 +34,7 @@ import (
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/metrics"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
)
@@ -129,6 +130,7 @@ func TestProcessSchedule(t *testing.T) {
sharedInformers.Ark().V1().Schedules(),
time.Duration(0),
logger,
metrics.NewServerMetrics(),
)
var (

View File

@@ -45,6 +45,10 @@ type Helper interface {
// Refresh pulls an updated set of Ark-backuppable resources from the
// discovery API.
Refresh() error
// APIGroups gets the current set of supported APIGroups
// in the cluster.
APIGroups() []metav1.APIGroup
}
type helper struct {
@@ -56,6 +60,7 @@ type helper struct {
mapper meta.RESTMapper
resources []*metav1.APIResourceList
resourcesMap map[schema.GroupVersionResource]metav1.APIResource
apiGroups []metav1.APIGroup
}
var _ Helper = &helper{}
@@ -127,6 +132,12 @@ func (h *helper) Refresh() error {
}
}
apiGroupList, err := h.discoveryClient.ServerGroups()
if err != nil {
return errors.WithStack(err)
}
h.apiGroups = apiGroupList.Groups
return nil
}
@@ -165,3 +176,9 @@ func (h *helper) Resources() []*metav1.APIResourceList {
defer h.lock.RUnlock()
return h.resources
}
func (h *helper) APIGroups() []metav1.APIGroup {
h.lock.RLock()
defer h.lock.RUnlock()
return h.apiGroups
}

View File

@@ -27,17 +27,13 @@ import (
// CRDs returns a list of the CRD types for all of the required Ark CRDs
func CRDs() []*apiextv1beta1.CustomResourceDefinition {
return []*apiextv1beta1.CustomResourceDefinition{
crd("Backup", "backups"),
crd("Schedule", "schedules"),
crd("Restore", "restores"),
crd("Config", "configs"),
crd("DownloadRequest", "downloadrequests"),
crd("DeleteBackupRequest", "deletebackuprequests"),
crd("PodVolumeBackup", "podvolumebackups"),
crd("PodVolumeRestore", "podvolumerestores"),
crd("ResticRepository", "resticrepositories"),
var crds []*apiextv1beta1.CustomResourceDefinition
for kind, typeInfo := range arkv1.CustomResources() {
crds = append(crds, crd(kind, typeInfo.PluralName))
}
return crds
}
func crd(kind, plural string) *apiextv1beta1.CustomResourceDefinition {

View File

@@ -106,6 +106,18 @@ func (m *ServerMetrics) RegisterAllMetrics() {
}
}
func (m *ServerMetrics) InitSchedule(scheduleName string) {
if c, ok := m.metrics[backupAttemptCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
if c, ok := m.metrics[backupSuccessCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
if c, ok := m.metrics[backupFailureCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
}
// SetBackupTarballSizeBytesGauge records the size, in bytes, of a backup tarball.
func (m *ServerMetrics) SetBackupTarballSizeBytesGauge(backupSchedule string, size int64) {
if g, ok := m.metrics[backupTarballSizeBytesGauge].(*prometheus.GaugeVec); ok {

View File

@@ -38,6 +38,6 @@ func (b *clientBuilder) withCommand(name string, args ...string) *clientBuilder
return b
}
func (b *clientBuilder) client() *hcplugin.Client {
func (b *clientBuilder) client() pluginClient {
return hcplugin.NewClient(b.config)
}

View File

@@ -3,7 +3,6 @@ package plugin
import (
"sync"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
)
@@ -20,7 +19,7 @@ type clientKey struct {
func newClientStore() *clientStore {
return &clientStore{
clients: make(map[clientKey]map[string]*plugin.Client),
clients: make(map[clientKey]map[string]pluginClient),
lock: &sync.RWMutex{},
}
}
@@ -33,13 +32,13 @@ type clientStore struct {
// kind and scope (e.g. all BackupItemActions for a given
// backup), and efficient lookup by kind+name+scope (e.g.
// the AWS ObjectStore.)
clients map[clientKey]map[string]*plugin.Client
clients map[clientKey]map[string]pluginClient
lock *sync.RWMutex
}
// get returns a plugin client for the given kind/name/scope, or an error if none
// is found.
func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client, error) {
func (s *clientStore) get(kind PluginKind, name, scope string) (pluginClient, error) {
s.lock.RLock()
defer s.lock.RUnlock()
@@ -54,12 +53,12 @@ func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client,
// list returns all plugin clients for the given kind/scope, or an
// error if none are found.
func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, error) {
func (s *clientStore) list(kind PluginKind, scope string) ([]pluginClient, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if forScope, found := s.clients[clientKey{kind, scope}]; found {
var clients []*plugin.Client
var clients []pluginClient
for _, client := range forScope {
clients = append(clients, client)
@@ -71,15 +70,31 @@ func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, err
return nil, errors.New("clients not found")
}
// listAll returns all plugin clients for all kinds/scopes, or a
// zero-valued slice if there are none.
func (s *clientStore) listAll() []pluginClient {
s.lock.RLock()
defer s.lock.RUnlock()
var clients []pluginClient
for _, pluginsByName := range s.clients {
for name := range pluginsByName {
clients = append(clients, pluginsByName[name])
}
}
return clients
}
// add stores a plugin client for the given kind/name/scope.
func (s *clientStore) add(client *plugin.Client, kind PluginKind, name, scope string) {
func (s *clientStore) add(client pluginClient, kind PluginKind, name, scope string) {
s.lock.Lock()
defer s.lock.Unlock()
key := clientKey{kind, scope}
if _, found := s.clients[key]; !found {
s.clients[key] = make(map[string]*plugin.Client)
s.clients[key] = make(map[string]pluginClient)
}
s.clients[key][name] = client
@@ -103,3 +118,11 @@ func (s *clientStore) deleteAll(kind PluginKind, scope string) {
delete(s.clients, clientKey{kind, scope})
}
// clear removes all clients for all kinds/scopes from the store.
func (s *clientStore) clear() {
s.lock.Lock()
defer s.lock.Unlock()
s.clients = make(map[clientKey]map[string]pluginClient)
}

View File

@@ -44,7 +44,6 @@ func baseConfig() *plugin.ClientConfig {
return &plugin.ClientConfig{
HandshakeConfig: Handshake,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Managed: true,
}
}
@@ -112,7 +111,7 @@ type Manager interface {
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
CloseBackupItemActions(backupName string) error
CloseBackupItemActions(backupName string)
// GetRestoreItemActions returns all restore.ItemAction plugins.
// These plugin instances should ONLY be used for a single restore
@@ -123,10 +122,10 @@ type Manager interface {
// CloseRestoreItemActions terminates the plugin sub-processes that
// are hosting RestoreItemAction plugins for the given restore name.
CloseRestoreItemActions(restoreName string) error
CloseRestoreItemActions(restoreName string)
// CleanupClients kills all plugin subprocesses.
CleanupClients()
// CloseAllClients terminates all plugin subprocesses.
CloseAllClients()
}
type manager struct {
@@ -165,7 +164,12 @@ func pluginForKind(kind PluginKind) plugin.Plugin {
}
}
func getPluginInstance(client *plugin.Client, kind PluginKind) (interface{}, error) {
type pluginClient interface {
Client() (plugin.ClientProtocol, error)
Kill()
}
func getPluginInstance(client pluginClient, kind PluginKind) (interface{}, error) {
protocolClient, err := client.Client()
if err != nil {
return nil, errors.WithStack(err)
@@ -349,8 +353,8 @@ func (m *manager) GetBackupItemActions(backupName string) ([]backup.ItemAction,
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
func (m *manager) CloseBackupItemActions(backupName string) error {
return closeAll(m.clientStore, PluginKindBackupItemAction, backupName)
func (m *manager) CloseBackupItemActions(backupName string) {
closeAll(m.clientStore, PluginKindBackupItemAction, backupName)
}
func (m *manager) GetRestoreItemActions(restoreName string) ([]restore.ItemAction, error) {
@@ -398,14 +402,18 @@ func (m *manager) GetRestoreItemActions(restoreName string) ([]restore.ItemActio
// CloseRestoreItemActions terminates the plugin sub-processes that
// are hosting RestoreItemAction plugins for the given restore name.
func (m *manager) CloseRestoreItemActions(restoreName string) error {
return closeAll(m.clientStore, PluginKindRestoreItemAction, restoreName)
func (m *manager) CloseRestoreItemActions(restoreName string) {
closeAll(m.clientStore, PluginKindRestoreItemAction, restoreName)
}
func closeAll(store *clientStore, kind PluginKind, scope string) error {
func closeAll(store *clientStore, kind PluginKind, scope string) {
clients, err := store.list(kind, scope)
if err != nil {
return err
// store.list(...) only returns an error if no clients are
// found for the specified kind and scope. We don't need
// to treat this as an error when trying to close all clients,
// because this means there are no clients to close.
return
}
for _, client := range clients {
@@ -413,10 +421,12 @@ func closeAll(store *clientStore, kind PluginKind, scope string) error {
}
store.deleteAll(kind, scope)
return nil
}
func (m *manager) CleanupClients() {
plugin.CleanupClients()
func (m *manager) CloseAllClients() {
for _, client := range m.clientStore.listAll() {
client.Kill()
}
m.clientStore.clear()
}

179
pkg/plugin/manager_test.go Normal file
View File

@@ -0,0 +1,179 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"testing"
plugin "github.com/hashicorp/go-plugin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakePluginClient struct {
terminated map[*fakePluginClient]bool
}
func (c *fakePluginClient) Kill() {
c.terminated[c] = true
}
func (c *fakePluginClient) Client() (plugin.ClientProtocol, error) {
panic("not implemented")
}
func TestCloseAllClients(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"scope-1", "scope-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseAllClients()
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// all clients should have their entry in the `terminated` map flipped to true
for _, status := range terminated {
assert.True(t, status)
}
// the store's `clients` map should be empty
assert.Len(t, store.clients, 0)
}
func TestCloseBackupItemActions(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
expectedTerminations = make(map[*fakePluginClient]bool)
backupName = "backup-1"
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"backup-1", "backup-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
if kind == PluginKindBackupItemAction && scope == backupName {
expectedTerminations[client] = true
}
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseBackupItemActions(backupName)
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// only those clients that we expected to be terminated should have
// their entry in the `terminated` map flipped to true
for client, status := range terminated {
_, ok := expectedTerminations[client]
assert.Equal(t, ok, status)
}
// clients for the kind/scope should have been removed
_, err := store.list(PluginKindBackupItemAction, backupName)
assert.EqualError(t, err, "clients not found")
// total number of clients should decrease by the number of terminated
// clients
assert.Len(t, store.listAll(), clientCount-len(expectedTerminations))
}
func TestCloseRestoreItemActions(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
expectedTerminations = make(map[*fakePluginClient]bool)
restoreName = "restore-2"
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"restore-1", "restore-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
if kind == PluginKindRestoreItemAction && scope == restoreName {
expectedTerminations[client] = true
}
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseRestoreItemActions(restoreName)
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// only those clients that we expected to be terminated should have
// their entry in the `terminated` map flipped to true
for client, status := range terminated {
_, ok := expectedTerminations[client]
assert.Equal(t, ok, status)
}
// clients for the kind/scope should have been removed
_, err := store.list(PluginKindRestoreItemAction, restoreName)
assert.EqualError(t, err, "clients not found")
// total number of clients should decrease by the number of terminated
// clients
assert.Len(t, store.listAll(), clientCount-len(expectedTerminations))
}

View File

@@ -0,0 +1,38 @@
// Code generated by mockery v1.0.0
package mocks
import corev1 "k8s.io/api/core/v1"
import logrus "github.com/sirupsen/logrus"
import mock "github.com/stretchr/testify/mock"
import v1 "github.com/heptio/ark/pkg/apis/ark/v1"
// Backupper is an autogenerated mock type for the Backupper type
type Backupper struct {
mock.Mock
}
// BackupPodVolumes provides a mock function with given fields: backup, pod, log
func (_m *Backupper) BackupPodVolumes(backup *v1.Backup, pod *corev1.Pod, log logrus.FieldLogger) (map[string]string, []error) {
ret := _m.Called(backup, pod, log)
var r0 map[string]string
if rf, ok := ret.Get(0).(func(*v1.Backup, *corev1.Pod, logrus.FieldLogger) map[string]string); ok {
r0 = rf(backup, pod, log)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]string)
}
}
var r1 []error
if rf, ok := ret.Get(1).(func(*v1.Backup, *corev1.Pod, logrus.FieldLogger) []error); ok {
r1 = rf(backup, pod, log)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]error)
}
}
return r0, r1
}

View File

@@ -58,9 +58,9 @@ func mergeServiceAccounts(fromCluster, fromBackup *unstructured.Unstructured) (*
desired.ImagePullSecrets = mergeLocalObjectReferenceSlices(desired.ImagePullSecrets, backupSA.ImagePullSecrets)
collections.MergeMaps(desired.Labels, backupSA.Labels)
desired.Labels = collections.MergeMaps(desired.Labels, backupSA.Labels)
collections.MergeMaps(desired.Annotations, backupSA.Annotations)
desired.Annotations = collections.MergeMaps(desired.Annotations, backupSA.Annotations)
desiredUnstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(desired)
if err != nil {

View File

@@ -125,11 +125,18 @@ func Exists(root map[string]interface{}, path string) bool {
// MergeMaps takes two map[string]string and merges missing keys from the second into the first.
// If a key already exists, its value is not overwritten.
func MergeMaps(first, second map[string]string) {
func MergeMaps(first, second map[string]string) map[string]string {
// If the first map passed in is empty, just use all of the second map's data
if first == nil {
first = map[string]string{}
}
for k, v := range second {
_, ok := first[k]
if !ok {
first[k] = v
}
}
return first
}

View File

@@ -18,6 +18,8 @@ package collections
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetString(t *testing.T) {
@@ -44,3 +46,57 @@ func TestGetString(t *testing.T) {
}
}
}
func TestMergeMaps(t *testing.T) {
var testCases = []struct {
name string
source map[string]string
destination map[string]string
expected map[string]string
}{
{
name: "nil destination should result in source being copied",
destination: nil,
source: map[string]string{
"k1": "v1",
},
expected: map[string]string{
"k1": "v1",
},
},
{
name: "keys missing from destination should be copied from source",
destination: map[string]string{
"k2": "v2",
},
source: map[string]string{
"k1": "v1",
},
expected: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
{
name: "matching key should not have value copied from source",
destination: map[string]string{
"k1": "v1",
},
source: map[string]string{
"k1": "v2",
},
expected: map[string]string{
"k1": "v1",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := MergeMaps(tc.destination, tc.source)
assert.Equal(t, tc.expected, result)
})
}
}

View File

@@ -29,6 +29,7 @@ type FakeDiscoveryHelper struct {
ResourceList []*metav1.APIResourceList
Mapper meta.RESTMapper
AutoReturnResource bool
APIGroupsList []metav1.APIGroup
}
func NewFakeDiscoveryHelper(autoReturnResource bool, resources map[schema.GroupVersionResource]schema.GroupVersionResource) *FakeDiscoveryHelper {
@@ -54,6 +55,14 @@ func NewFakeDiscoveryHelper(autoReturnResource bool, resources map[schema.GroupV
}
apiResourceMap[gvString] = append(apiResourceMap[gvString], metav1.APIResource{Name: gvr.Resource})
helper.APIGroupsList = append(helper.APIGroupsList,
metav1.APIGroup{
Name: gvr.Group,
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: gvString,
Version: gvr.Version,
},
})
}
for group, resources := range apiResourceMap {
@@ -110,3 +119,7 @@ func (dh *FakeDiscoveryHelper) ResourceFor(input schema.GroupVersionResource) (s
return schema.GroupVersionResource{}, metav1.APIResource{}, errors.New("APIResource not found")
}
func (dh *FakeDiscoveryHelper) APIGroups() []metav1.APIGroup {
return dh.APIGroupsList
}