Compare commits

...

43 Commits

Author SHA1 Message Date
KubeKween
78141e4735 Merge pull request #919 from skriss/v0.9.8-beta.1-cherrypicks
V0.9.8 beta.1 cherrypicks
2018-10-10 09:13:32 -07:00
Steve Kriss
7d9a7c80ae CHANGELOG entry for v0.9.8-beta.1
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-10-10 09:58:59 -06:00
James Powis
ddca555639 Drop volumeMounts from initContainers if SAToken
Signed-off-by: James Powis <powisj@gmail.com>
2018-10-10 09:58:45 -06:00
Wayne Witzel III
caa962fa22 Merge pull request #908 from skriss/v0.9.7-cherrypicks
V0.9.7 cherrypicks
2018-10-04 14:43:16 -04:00
Steve Kriss
ee00ce46ef add node port fix to v0.9.7 changelog
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-10-04 12:28:06 -06:00
Wayne Witzel III
39f097813f Update CHANGELOG.md for v0.9.7
Signed-off-by: Wayne Witzel III <wayne@riotousliving.com>
2018-10-04 12:27:13 -06:00
Steve Kriss
7bf9adb92e bug: fix restic restores when using namespace mappings
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-10-04 12:21:50 -06:00
Marc Tudurí
31bf26b2e9 Change CreationTimestamp by StartTimestamp in backup list
Signed-off-by: Marc Tudurí <marctc@gmail.com>
2018-10-04 12:21:28 -06:00
Steve Kriss
1dfe75a0c8 remove restore log helper for accurate line #'s
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-10-04 12:20:13 -06:00
Michal Wieczorek
226a687c01 Enable restoring resources with ownerReference set
Signed-off-by: Michal Wieczorek <wieczorek-michal@wp.pl>
2018-10-04 12:17:53 -06:00
Shubheksha Jalan
ccfef26ef3 move code dealing with node ports into a separate function
Signed-off-by: Shubheksha Jalan <jshubheksha@gmail.com>
2018-10-04 12:15:26 -06:00
Shubheksha Jalan
e62afa8b61 ignore spec.ports not being there for services of type ExternalName
Signed-off-by: Shubheksha Jalan <jshubheksha@gmail.com>
2018-10-04 12:15:22 -06:00
Shubheksha Jalan
b51b3c27ce fix error during restore when spec.ports are not found
Signed-off-by: Shubheksha Jalan <jshubheksha@gmail.com>
2018-10-04 12:15:16 -06:00
Timo Reimann
856e632109 Preserve node ports during restore when annotations hold specification.
This is to better reflect the intent of the user when node ports are
specified explicitly (as opposed to being assigned by Kubernetes). The
`last-applied-configuration` annotation added by `kubectl apply` is one
such indicator we are now leveraging.

We still default to omitting the node ports when the annotation is
missing.

Signed-off-by: Timo Reimann <ttr314@googlemail.com>
2018-10-04 12:14:03 -06:00
KubeKween
ad61989beb Merge pull request #861 from skriss/v0.9.6-cherrypicks
v0.9.6 cherrypicks
2018-09-21 09:07:22 -07:00
Steve Kriss
e353beb6c4 update CHANGELOG for v0.9.6
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-09-21 10:00:51 -06:00
Nolan Brubaker
93926d9025 Bump alpine image for security fix
This change includes the fix at
6484ed9849

Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-09-21 09:50:27 -06:00
James Powis
81e1997002 Change from regex matching default-token to prefix SA-token-
Signed-off-by: James Powis <powisj@gmail.com>
2018-09-21 09:49:54 -06:00
Nolan Brubaker
908900e9dd Merge pull request #850 from carlisia/release-0.9
Cherry picks for release 0.9.5
2018-09-17 14:31:09 -04:00
Steve Kriss
73e0d3a62f v0.9.5 changelog entry
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-09-17 11:25:05 -07:00
Steve Kriss
bf59d76a3f get a new metadata accessor after calling backup item actions
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-09-17 11:24:54 -07:00
Steve Kriss
d2ebc6ca88 Merge pull request #813 from nrb/v0.9.4-changelog
Update CHANGELOG for v0.9.4
2018-09-05 09:12:00 -07:00
Nolan Brubaker
79504e7fee Update CHANGELOG for v0.9.4
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-09-05 11:21:36 -04:00
Steve Kriss
8369745070 Merge pull request #812 from nrb/0.9.4-cherry-picks
Fix map merging logic
2018-08-30 14:00:37 -07:00
Nolan Brubaker
1cb098760e Fix map merging logic
Fixes #777

Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-08-30 15:48:08 -04:00
Nolan Brubaker
ca428c7301 Merge pull request #797 from skriss/plugins-fix-0.9
terminate plugin clients explicitly instead of using managed ones
2018-08-30 15:47:12 -04:00
Steve Kriss
1e59553d90 plugins: add unit tests for close client methods
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-08-27 11:29:25 -07:00
Steve Kriss
721b629f0e terminate plugin clients explicitly instead of using managed ones
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-08-27 10:44:40 -07:00
Carlisia
659a852c8c Merge pull request #760 from skriss/v0.9.3-cherrypicks
V0.9.3 cherrypicks
2018-08-10 11:16:49 -07:00
Nolan Brubaker
ca8ae18020 Add v0.9.3 changelog entry
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-08-10 10:12:02 -07:00
Alex Lemaresquier
9f80f01c2a Initialize schedule Prometheus metrics to have them created beforehand (see https://prometheus.io/docs/practices/instrumentation/#avoid-missing-metrics)
Signed-off-by: Alex Lemaresquier <alex+github@lemaresquier.org>
2018-08-10 10:11:42 -07:00
Carlisia
0acd368291 Merge pull request #707 from skriss/release-v0.9.2
Release v0.9.2
2018-07-26 14:19:04 -07:00
Steve Kriss
0640cdab06 update changelog for v0.9.2
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-26 14:06:57 -07:00
Steve Kriss
d21ce48db1 fix bug preventing backup item action item updates from saving
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-26 14:02:01 -07:00
Carlisia
10a1fe2bfa Merge pull request #695 from skriss/release-0.9
cherry-pick commits for v0.9.1
2018-07-23 13:37:00 -07:00
Steve Kriss
07ce4988e3 update CHANGELOG.md for v0.9.1
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 12:13:27 -07:00
Steve Kriss
89e4611d1b cleanup service account action log statement
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 12:13:20 -07:00
Nolan Brubaker
7d6bebadc4 Add RBAC support for 1.7 clusters
Signed-off-by: Nolan Brubaker <nolan@heptio.com>
2018-07-23 12:01:40 -07:00
Steve Kriss
84f872e4d5 delete old deletion requests for backup when processing a new one
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 10:43:39 -07:00
Steve Kriss
b566a7c101 return nil error if 404 encountered when deleting snapshots
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-23 10:43:32 -07:00
Steve Kriss
b4f8d7cb5f fix tagging latest by using make's ifeq
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 12:06:29 -07:00
Steve Kriss
c23d9dd7c5 exit server if not all Ark CRDs exist at startup
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 11:57:16 -07:00
Steve Kriss
400e8a165b require namespace for backups/etc. to exist at server startup
Signed-off-by: Steve Kriss <steve@heptio.com>
2018-07-20 11:53:25 -07:00
45 changed files with 1923 additions and 370 deletions

View File

@@ -1,5 +1,56 @@
# Changelog
#### [v0.9.8-beta.1](https://github.com/heptio/ark/releases/tag/v0.9.8-beta.1) - 2018-10-10
#### Bug Fixes
* Discard service account token volume mounts from init containers on restore (#910, @james-powis)
#### [v0.9.7](https://github.com/heptio/ark/releases/tag/v0.9.7) - 2018-10-04
#### Bug Fixes
* Preserve explicitly-specified node ports during restore (#712, @timoreimann)
* Enable restoring resources with ownerReference set (#837, @mwieczorek)
* Fix error when restoring ExternalName services (#869, @shubheksha)
* remove restore log helper for accurate line numbers (#891, @skriss)
* Display backup StartTimestamp in `ark backup get` output (#894, @marctc)
* Fix restic restores when using namespace mappings (#900, @skriss)
#### [v0.9.6](https://github.com/heptio/ark/releases/tag/v0.9.6) - 2018-09-21
#### Bug Fixes
* Discard service account tokens from non-default service accounts on restore (#843, @james-powis)
* Update Docker images to use `alpine:3.8` (#852, @nrb)
#### [v0.9.5](https://github.com/heptio/ark/releases/tag/v0.9.5) - 2018-09-17
#### Bug Fixes
* Fix issue causing restic restores not to work (#834, @skriss)
#### [v0.9.4](https://github.com/heptio/ark/releases/tag/v0.9.4) - 2018-09-05
#### Bug Fixes
* Terminate plugin clients to resolve memory leaks (#797, @skriss)
* Fix nil map errors when merging annotations (#812, @nrb)
#### [v0.9.3](https://github.com/heptio/ark/releases/tag/v0.9.3) - 2018-08-10
#### Bug Fixes
* Initalize Prometheus metrics when creating a new schedule (#689, @lemaral)
#### [v0.9.2](https://github.com/heptio/ark/releases/tag/v0.9.2) - 2018-07-26
##### Bug Fixes:
* Fix issue where modifications made by backup item actions were not being saved to backup tarball (#704, @skriss)
#### [v0.9.1](https://github.com/heptio/ark/releases/tag/v0.9.1) - 2018-07-23
##### Bug Fixes:
* Require namespace for Ark's CRDs to already exist at server startup (#676, @skriss)
* Require all Ark CRDs to exist at server startup (#683, @skriss)
* Fix `latest` tagging in Makefile (#690, @skriss)
* Make Ark compatible with clusters that don't have the `rbac.authorization.k8s.io/v1` API group (#682, @nrb)
* Don't consider missing snapshots an error during backup deletion, limit backup deletion requests per backup to 1 (#687, @skriss)
#### [v0.9.0](https://github.com/heptio/ark/releases/tag/v0.9.0) - 2018-07-06
##### Highlights:

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:3.7
FROM alpine:3.8
MAINTAINER Steve Kriss <steve@heptio.com>
@@ -20,4 +20,4 @@ ADD /bin/linux/amd64/ark-restic-restore-helper .
USER nobody:nobody
ENTRYPOINT [ "/ark-restic-restore-helper" ]
ENTRYPOINT [ "/ark-restic-restore-helper" ]

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:3.7
FROM alpine:3.8
MAINTAINER Andy Goldstein <andy@heptio.com>

View File

@@ -138,10 +138,10 @@ all-push:
push: .push-$(DOTFILE_IMAGE) push-name
.push-$(DOTFILE_IMAGE): .container-$(DOTFILE_IMAGE)
@docker push $(IMAGE):$(VERSION)
@if [[ "$(TAG_LATEST)" == "true" ]]; then \
docker tag $(IMAGE):$(VERSION) $(IMAGE):latest; \
docker push $(IMAGE):latest; \
fi
ifeq ($(TAG_LATEST), true)
docker tag $(IMAGE):$(VERSION) $(IMAGE):latest
docker push $(IMAGE):latest
endif
@docker images -q $(IMAGE):$(VERSION) > $@
push-name:

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.10-alpine3.7
FROM golang:1.10-alpine3.8
RUN apk add --update --no-cache git bash && \
mkdir -p /go/src/k8s.io && \

View File

@@ -41,27 +41,41 @@ func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
type typeInfo struct {
PluralName string
ItemType runtime.Object
ItemListType runtime.Object
}
func newTypeInfo(pluralName string, itemType, itemListType runtime.Object) typeInfo {
return typeInfo{
PluralName: pluralName,
ItemType: itemType,
ItemListType: itemListType,
}
}
// CustomResources returns a map of all custom resources within the Ark
// API group, keyed on Kind.
func CustomResources() map[string]typeInfo {
return map[string]typeInfo{
"Backup": newTypeInfo("backups", &Backup{}, &BackupList{}),
"Restore": newTypeInfo("restores", &Restore{}, &RestoreList{}),
"Schedule": newTypeInfo("schedules", &Schedule{}, &ScheduleList{}),
"Config": newTypeInfo("configs", &Config{}, &ConfigList{}),
"DownloadRequest": newTypeInfo("downloadrequests", &DownloadRequest{}, &DownloadRequestList{}),
"DeleteBackupRequest": newTypeInfo("deletebackuprequests", &DeleteBackupRequest{}, &DeleteBackupRequestList{}),
"PodVolumeBackup": newTypeInfo("podvolumebackups", &PodVolumeBackup{}, &PodVolumeBackupList{}),
"PodVolumeRestore": newTypeInfo("podvolumerestores", &PodVolumeRestore{}, &PodVolumeRestoreList{}),
"ResticRepository": newTypeInfo("resticrepositories", &ResticRepository{}, &ResticRepositoryList{}),
}
}
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Backup{},
&BackupList{},
&Schedule{},
&ScheduleList{},
&Restore{},
&RestoreList{},
&Config{},
&ConfigList{},
&DownloadRequest{},
&DownloadRequestList{},
&DeleteBackupRequest{},
&DeleteBackupRequestList{},
&PodVolumeBackup{},
&PodVolumeBackupList{},
&PodVolumeRestore{},
&PodVolumeRestoreList{},
&ResticRepository{},
&ResticRepositoryList{},
)
for _, typeInfo := range CustomResources() {
scheme.AddKnownTypes(SchemeGroupVersion, typeInfo.ItemType, typeInfo.ItemListType)
}
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@@ -204,9 +204,22 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
}
}
if err := ib.executeActions(log, obj, groupResource, name, namespace, metadata); err != nil {
updatedObj, err := ib.executeActions(log, obj, groupResource, name, namespace, metadata)
if err != nil {
log.WithError(err).Error("Error executing item actions")
backupErrs = append(backupErrs, err)
// if there was an error running actions, execute post hooks and return
log.Debug("Executing post hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePost); err != nil {
backupErrs = append(backupErrs, err)
}
return kubeerrs.NewAggregate(backupErrs)
}
obj = updatedObj
if metadata, err = meta.Accessor(obj); err != nil {
return errors.WithStack(err)
}
if groupResource == kuberesource.PersistentVolumes {
@@ -285,7 +298,13 @@ func (ib *defaultItemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *co
return ib.resticBackupper.BackupPodVolumes(ib.backup, pod, log)
}
func (ib *defaultItemBackupper) executeActions(log logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource, name, namespace string, metadata metav1.Object) error {
func (ib *defaultItemBackupper) executeActions(
log logrus.FieldLogger,
obj runtime.Unstructured,
groupResource schema.GroupResource,
name, namespace string,
metadata metav1.Object,
) (runtime.Unstructured, error) {
for _, action := range ib.actions {
if !action.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
log.Debug("Skipping action because it does not apply to this resource")
@@ -308,40 +327,40 @@ func (ib *defaultItemBackupper) executeActions(log logrus.FieldLogger, obj runti
logSetter.SetLog(log)
}
if updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup); err == nil {
obj = updatedItem
for _, additionalItem := range additionalItemIdentifiers {
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
if err != nil {
return err
}
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
if err != nil {
return err
}
additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return err
}
if err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()); err != nil {
return err
}
}
} else {
updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup)
if err != nil {
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
// harder to tell when it happened.
log.WithError(err).Error("error executing custom action")
return errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
return nil, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
obj = updatedItem
for _, additionalItem := range additionalItemIdentifiers {
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
if err != nil {
return nil, err
}
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
if err != nil {
return nil, err
}
additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()); err != nil {
return nil, err
}
}
}
return nil
return obj, nil
}
// zoneLabel is the label that stores availability-zone info

View File

@@ -26,6 +26,7 @@ import (
"github.com/heptio/ark/pkg/apis/ark/v1"
api "github.com/heptio/ark/pkg/apis/ark/v1"
resticmocks "github.com/heptio/ark/pkg/restic/mocks"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
@@ -34,6 +35,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@@ -539,6 +541,152 @@ func TestBackupItemNoSkips(t *testing.T) {
}
}
type addAnnotationAction struct{}
func (a *addAnnotationAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []ResourceIdentifier, error) {
// since item actions run out-of-proc, do a deep-copy here to simulate passing data
// across a process boundary.
copy := item.(*unstructured.Unstructured).DeepCopy()
metadata, err := meta.Accessor(copy)
if err != nil {
return copy, nil, nil
}
annotations := metadata.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations["foo"] = "bar"
metadata.SetAnnotations(annotations)
return copy, nil, nil
}
func (a *addAnnotationAction) AppliesTo() (ResourceSelector, error) {
panic("not implemented")
}
func TestItemActionModificationsToItemPersist(t *testing.T) {
var (
w = &fakeTarWriter{}
obj = &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"namespace": "myns",
"name": "bar",
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
nil,
newPVCSnapshotTracker(),
).(*defaultItemBackupper)
)
// our expected backed-up object is the passed-in object plus the annotation
// that the backup item action adds.
expected := obj.DeepCopy()
expected.SetAnnotations(map[string]string{"foo": "bar"})
// method under test
require.NoError(t, b.backupItem(arktest.NewLogger(), obj, schema.ParseGroupResource("resource.group")))
// get the actual backed-up item
require.Len(t, w.data, 1)
actual, err := arktest.GetAsMap(string(w.data[0]))
require.NoError(t, err)
assert.EqualValues(t, expected.Object, actual)
}
func TestResticAnnotationsPersist(t *testing.T) {
var (
w = &fakeTarWriter{}
obj = &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"namespace": "myns",
"name": "bar",
"annotations": map[string]interface{}{
"backup.ark.heptio.com/backup-volumes": "volume-1,volume-2",
},
},
},
}
actions = []resolvedAction{
{
ItemAction: &addAnnotationAction{},
namespaceIncludesExcludes: collections.NewIncludesExcludes(),
resourceIncludesExcludes: collections.NewIncludesExcludes(),
selector: labels.Everything(),
},
}
resticBackupper = &resticmocks.Backupper{}
b = (&defaultItemBackupperFactory{}).newItemBackupper(
&v1.Backup{},
collections.NewIncludesExcludes(),
collections.NewIncludesExcludes(),
make(map[itemKey]struct{}),
actions,
nil,
w,
nil,
&arktest.FakeDynamicFactory{},
arktest.NewFakeDiscoveryHelper(true, nil),
nil,
resticBackupper,
newPVCSnapshotTracker(),
).(*defaultItemBackupper)
)
resticBackupper.
On("BackupPodVolumes", mock.Anything, mock.Anything, mock.Anything).
Return(map[string]string{"volume-1": "snapshot-1", "volume-2": "snapshot-2"}, nil)
// our expected backed-up object is the passed-in object, plus the annotation
// that the backup item action adds, plus the annotations that the restic
// backupper adds
expected := obj.DeepCopy()
annotations := expected.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations["foo"] = "bar"
annotations["snapshot.ark.heptio.com/volume-1"] = "snapshot-1"
annotations["snapshot.ark.heptio.com/volume-2"] = "snapshot-2"
expected.SetAnnotations(annotations)
// method under test
require.NoError(t, b.backupItem(arktest.NewLogger(), obj, schema.ParseGroupResource("pods")))
// get the actual backed-up item
require.Len(t, w.data, 1)
actual, err := arktest.GetAsMap(string(w.data[0]))
require.NoError(t, err)
assert.EqualValues(t, expected.Object, actual)
}
func TestTakePVSnapshot(t *testing.T) {
iops := int64(1000)

140
pkg/backup/rbac.go Normal file
View File

@@ -0,0 +1,140 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"github.com/pkg/errors"
rbac "k8s.io/api/rbac/v1"
rbacbeta "k8s.io/api/rbac/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
rbacbetaclient "k8s.io/client-go/kubernetes/typed/rbac/v1beta1"
)
// ClusterRoleBindingLister allows for listing ClusterRoleBindings in a version-independent way.
type ClusterRoleBindingLister interface {
// List returns a slice of ClusterRoleBindings which can represent either v1 or v1beta1 ClusterRoleBindings.
List() ([]ClusterRoleBinding, error)
}
// noopClusterRoleBindingLister exists to handle clusters where RBAC is disabled.
type noopClusterRoleBindingLister struct {
}
func (noop noopClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
return []ClusterRoleBinding{}, nil
}
type v1ClusterRoleBindingLister struct {
client rbacclient.ClusterRoleBindingInterface
}
func (v1 v1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
crbList, err := v1.client.List(metav1.ListOptions{})
if err != nil {
return nil, errors.WithStack(err)
}
var crbs []ClusterRoleBinding
for _, crb := range crbList.Items {
crbs = append(crbs, v1ClusterRoleBinding{crb: crb})
}
return crbs, nil
}
type v1beta1ClusterRoleBindingLister struct {
client rbacbetaclient.ClusterRoleBindingInterface
}
func (v1beta1 v1beta1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
crbList, err := v1beta1.client.List(metav1.ListOptions{})
if err != nil {
return nil, errors.WithStack(err)
}
var crbs []ClusterRoleBinding
for _, crb := range crbList.Items {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: crb})
}
return crbs, nil
}
// NewClusterRoleBindingListerMap creates a map of RBAC version strings to their associated
// ClusterRoleBindingLister structs.
// Necessary so that callers to the ClusterRoleBindingLister interfaces don't need the kubernetes.Interface.
func NewClusterRoleBindingListerMap(clientset kubernetes.Interface) map[string]ClusterRoleBindingLister {
return map[string]ClusterRoleBindingLister{
rbac.SchemeGroupVersion.Version: v1ClusterRoleBindingLister{client: clientset.RbacV1().ClusterRoleBindings()},
rbacbeta.SchemeGroupVersion.Version: v1beta1ClusterRoleBindingLister{client: clientset.RbacV1beta1().ClusterRoleBindings()},
"": noopClusterRoleBindingLister{},
}
}
// ClusterRoleBinding abstracts access to ClusterRoleBindings whether they're v1 or v1beta1.
type ClusterRoleBinding interface {
// Name returns the name of a ClusterRoleBinding.
Name() string
// ServiceAccountSubjects returns the names of subjects that are service accounts in the given namespace.
ServiceAccountSubjects(namespace string) []string
// RoleRefName returns the name of a ClusterRoleBinding's RoleRef.
RoleRefName() string
}
type v1ClusterRoleBinding struct {
crb rbac.ClusterRoleBinding
}
func (c v1ClusterRoleBinding) Name() string {
return c.crb.Name
}
func (c v1ClusterRoleBinding) RoleRefName() string {
return c.crb.RoleRef.Name
}
func (c v1ClusterRoleBinding) ServiceAccountSubjects(namespace string) []string {
var saSubjects []string
for _, s := range c.crb.Subjects {
if s.Kind == rbac.ServiceAccountKind && s.Namespace == namespace {
saSubjects = append(saSubjects, s.Name)
}
}
return saSubjects
}
type v1beta1ClusterRoleBinding struct {
crb rbacbeta.ClusterRoleBinding
}
func (c v1beta1ClusterRoleBinding) Name() string {
return c.crb.Name
}
func (c v1beta1ClusterRoleBinding) RoleRefName() string {
return c.crb.RoleRef.Name
}
func (c v1beta1ClusterRoleBinding) ServiceAccountSubjects(namespace string) []string {
var saSubjects []string
for _, s := range c.crb.Subjects {
if s.Kind == rbac.ServiceAccountKind && s.Namespace == namespace {
saSubjects = append(saSubjects, s.Name)
}
}
return saSubjects
}

View File

@@ -25,28 +25,41 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
"github.com/heptio/ark/pkg/apis/ark/v1"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/kuberesource"
)
// serviceAccountAction implements ItemAction.
type serviceAccountAction struct {
log logrus.FieldLogger
clusterRoleBindings []rbac.ClusterRoleBinding
clusterRoleBindings []ClusterRoleBinding
}
// NewServiceAccountAction creates a new ItemAction for service accounts.
func NewServiceAccountAction(log logrus.FieldLogger, client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) {
clusterRoleBindings, err := client.List(metav1.ListOptions{})
func NewServiceAccountAction(log logrus.FieldLogger, clusterRoleBindingListers map[string]ClusterRoleBindingLister, discoveryHelper arkdiscovery.Helper) (ItemAction, error) {
// Look up the supported RBAC version
var supportedAPI metav1.GroupVersionForDiscovery
for _, ag := range discoveryHelper.APIGroups() {
if ag.Name == rbac.GroupName {
supportedAPI = ag.PreferredVersion
break
}
}
crbLister := clusterRoleBindingListers[supportedAPI.Version]
// This should be safe because the List call will return a 0-item slice
// if there's no matching API version.
crbs, err := crbLister.List()
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
return &serviceAccountAction{
log: log,
clusterRoleBindings: clusterRoleBindings.Items,
clusterRoleBindings: crbs,
}, nil
}
@@ -76,14 +89,14 @@ func (a *serviceAccountAction) Execute(item runtime.Unstructured, backup *v1.Bac
roles = sets.NewString()
)
for _, clusterRoleBinding := range a.clusterRoleBindings {
for _, subj := range clusterRoleBinding.Subjects {
if subj.Kind == rbac.ServiceAccountKind && subj.Namespace == namespace && subj.Name == name {
for _, crb := range a.clusterRoleBindings {
for _, s := range crb.ServiceAccountSubjects(namespace) {
if s == name {
a.log.Infof("Adding clusterrole %s and clusterrolebinding %s to additionalItems since serviceaccount %s/%s is a subject",
clusterRoleBinding.RoleRef.Name, clusterRoleBinding.Name, namespace, name)
crb.RoleRefName(), crb.Name(), namespace, name)
bindings.Insert(clusterRoleBinding.Name)
roles.Insert(clusterRoleBinding.RoleRef.Name)
bindings.Insert(crb.Name())
roles.Insert(crb.RoleRefName())
break
}
}

View File

@@ -24,29 +24,61 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/rbac/v1"
rbac "k8s.io/api/rbac/v1"
rbacbeta "k8s.io/api/rbac/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
"github.com/heptio/ark/pkg/kuberesource"
arktest "github.com/heptio/ark/pkg/util/test"
)
type fakeClusterRoleBindingClient struct {
clusterRoleBindings []v1.ClusterRoleBinding
func newV1ClusterRoleBindingList(rbacCRBList []rbac.ClusterRoleBinding) []ClusterRoleBinding {
var crbs []ClusterRoleBinding
for _, c := range rbacCRBList {
crbs = append(crbs, v1ClusterRoleBinding{crb: c})
}
rbacclient.ClusterRoleBindingInterface
return crbs
}
func (c *fakeClusterRoleBindingClient) List(opts metav1.ListOptions) (*v1.ClusterRoleBindingList, error) {
return &v1.ClusterRoleBindingList{
Items: c.clusterRoleBindings,
}, nil
func newV1beta1ClusterRoleBindingList(rbacCRBList []rbacbeta.ClusterRoleBinding) []ClusterRoleBinding {
var crbs []ClusterRoleBinding
for _, c := range rbacCRBList {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: c})
}
return crbs
}
type FakeV1ClusterRoleBindingLister struct {
v1crbs []rbac.ClusterRoleBinding
}
func (f FakeV1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
var crbs []ClusterRoleBinding
for _, c := range f.v1crbs {
crbs = append(crbs, v1ClusterRoleBinding{crb: c})
}
return crbs, nil
}
type FakeV1beta1ClusterRoleBindingLister struct {
v1beta1crbs []rbacbeta.ClusterRoleBinding
}
func (f FakeV1beta1ClusterRoleBindingLister) List() ([]ClusterRoleBinding, error) {
var crbs []ClusterRoleBinding
for _, c := range f.v1beta1crbs {
crbs = append(crbs, v1beta1ClusterRoleBinding{crb: c})
}
return crbs, nil
}
func TestServiceAccountActionAppliesTo(t *testing.T) {
a, _ := NewServiceAccountAction(arktest.NewLogger(), &fakeClusterRoleBindingClient{})
// Instantiating the struct directly since using
// NewServiceAccountAction requires a full kubernetes clientset
a := &serviceAccountAction{}
actual, err := a.AppliesTo()
require.NoError(t, err)
@@ -57,11 +89,119 @@ func TestServiceAccountActionAppliesTo(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestNewServiceAccountAction(t *testing.T) {
tests := []struct {
name string
version string
expectedCRBs []ClusterRoleBinding
}{
{
name: "rbac v1 API instantiates an saAction",
version: rbac.SchemeGroupVersion.Version,
expectedCRBs: []ClusterRoleBinding{
v1ClusterRoleBinding{
crb: rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-1",
},
},
},
v1ClusterRoleBinding{
crb: rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-2",
},
},
},
},
},
{
name: "rbac v1beta1 API instantiates an saAction",
version: rbacbeta.SchemeGroupVersion.Version,
expectedCRBs: []ClusterRoleBinding{
v1beta1ClusterRoleBinding{
crb: rbacbeta.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-1",
},
},
},
v1beta1ClusterRoleBinding{
crb: rbacbeta.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-2",
},
},
},
},
},
{
name: "no RBAC API instantiates an saAction with empty slice",
version: "",
expectedCRBs: []ClusterRoleBinding{},
},
}
// Set up all of our fakes outside the test loop
discoveryHelper := arktest.FakeDiscoveryHelper{}
logger := arktest.NewLogger()
v1crbs := []rbac.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1crb-2",
},
},
}
v1beta1crbs := []rbacbeta.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "v1beta1crb-2",
},
},
}
clusterRoleBindingListers := map[string]ClusterRoleBindingLister{
rbac.SchemeGroupVersion.Version: FakeV1ClusterRoleBindingLister{v1crbs: v1crbs},
rbacbeta.SchemeGroupVersion.Version: FakeV1beta1ClusterRoleBindingLister{v1beta1crbs: v1beta1crbs},
"": noopClusterRoleBindingLister{},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// We only care about the preferred version, nothing else in the list
discoveryHelper.APIGroupsList = []metav1.APIGroup{
{
Name: rbac.GroupName,
PreferredVersion: metav1.GroupVersionForDiscovery{
Version: test.version,
},
},
}
action, err := NewServiceAccountAction(logger, clusterRoleBindingListers, &discoveryHelper)
require.NoError(t, err)
saAction, ok := action.(*serviceAccountAction)
require.True(t, ok)
assert.Equal(t, test.expectedCRBs, saAction.clusterRoleBindings)
})
}
}
func TestServiceAccountActionExecute(t *testing.T) {
tests := []struct {
name string
serviceAccount runtime.Unstructured
crbs []v1.ClusterRoleBinding
crbs []rbac.ClusterRoleBinding
expectedAdditionalItems []ResourceIdentifier
}{
{
@@ -91,9 +231,9 @@ func TestServiceAccountActionExecute(t *testing.T) {
}
}
`),
crbs: []v1.ClusterRoleBinding{
crbs: []rbac.ClusterRoleBinding{
{
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
@@ -105,17 +245,17 @@ func TestServiceAccountActionExecute(t *testing.T) {
Name: "ark",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "non-matching-ns",
Name: "ark",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role",
},
},
@@ -134,19 +274,19 @@ func TestServiceAccountActionExecute(t *testing.T) {
}
}
`),
crbs: []v1.ClusterRoleBinding{
crbs: []rbac.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-1",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-1",
},
},
@@ -154,19 +294,19 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-2",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-2",
},
},
@@ -174,14 +314,14 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-3",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-3",
},
},
@@ -189,9 +329,9 @@ func TestServiceAccountActionExecute(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "crb-4",
},
Subjects: []v1.Subject{
Subjects: []rbac.Subject{
{
Kind: v1.ServiceAccountKind,
Kind: rbac.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
@@ -201,7 +341,7 @@ func TestServiceAccountActionExecute(t *testing.T) {
Name: "non-matching-name",
},
},
RoleRef: v1.RoleRef{
RoleRef: rbac.RoleRef{
Name: "role-4",
},
},
@@ -235,14 +375,221 @@ func TestServiceAccountActionExecute(t *testing.T) {
},
}
crbClient := &fakeClusterRoleBindingClient{}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
crbClient.clusterRoleBindings = test.crbs
action, err := NewServiceAccountAction(arktest.NewLogger(), crbClient)
require.Nil(t, err)
// Create the action struct directly so we don't need to mock a clientset
action := &serviceAccountAction{
log: arktest.NewLogger(),
clusterRoleBindings: newV1ClusterRoleBindingList(test.crbs),
}
res, additional, err := action.Execute(test.serviceAccount, nil)
assert.Equal(t, test.serviceAccount, res)
assert.Nil(t, err)
// ensure slices are ordered for valid comparison
sort.Slice(test.expectedAdditionalItems, func(i, j int) bool {
return fmt.Sprintf("%s.%s", test.expectedAdditionalItems[i].GroupResource.String(), test.expectedAdditionalItems[i].Name) <
fmt.Sprintf("%s.%s", test.expectedAdditionalItems[j].GroupResource.String(), test.expectedAdditionalItems[j].Name)
})
sort.Slice(additional, func(i, j int) bool {
return fmt.Sprintf("%s.%s", additional[i].GroupResource.String(), additional[i].Name) <
fmt.Sprintf("%s.%s", additional[j].GroupResource.String(), additional[j].Name)
})
assert.Equal(t, test.expectedAdditionalItems, additional)
})
}
}
func TestServiceAccountActionExecuteOnBeta1(t *testing.T) {
tests := []struct {
name string
serviceAccount runtime.Unstructured
crbs []rbacbeta.ClusterRoleBinding
expectedAdditionalItems []ResourceIdentifier
}{
{
name: "no crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: nil,
expectedAdditionalItems: nil,
},
{
name: "no matching crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: []rbacbeta.ClusterRoleBinding{
{
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: "non-matching-kind",
Namespace: "heptio-ark",
Name: "ark",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "non-matching-ns",
Name: "ark",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role",
},
},
},
expectedAdditionalItems: nil,
},
{
name: "some matching crbs",
serviceAccount: arktest.UnstructuredOrDie(`
{
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"namespace": "heptio-ark",
"name": "ark"
}
}
`),
crbs: []rbacbeta.ClusterRoleBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-1",
},
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-2",
},
Subjects: []rbacbeta.Subject{
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-2",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-3",
},
Subjects: []rbacbeta.Subject{
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-3",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "crb-4",
},
Subjects: []rbacbeta.Subject{
{
Kind: rbacbeta.ServiceAccountKind,
Namespace: "heptio-ark",
Name: "ark",
},
{
Kind: "non-matching-kind",
Namespace: "non-matching-ns",
Name: "non-matching-name",
},
},
RoleRef: rbacbeta.RoleRef{
Name: "role-4",
},
},
},
expectedAdditionalItems: []ResourceIdentifier{
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-2",
},
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-3",
},
{
GroupResource: kuberesource.ClusterRoleBindings,
Name: "crb-4",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-2",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-3",
},
{
GroupResource: kuberesource.ClusterRoles,
Name: "role-4",
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create the action struct directly so we don't need to mock a clientset
action := &serviceAccountAction{
log: arktest.NewLogger(),
clusterRoleBindings: newV1beta1ClusterRoleBindingList(test.crbs),
}
res, additional, err := action.Execute(test.serviceAccount, nil)

View File

@@ -20,6 +20,7 @@ import (
"regexp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
@@ -219,7 +220,17 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.ec2.DeleteSnapshot(req)
return errors.WithStack(err)
// if it's a NotFound error, we don't need to return an error
// since the snapshot is not there.
// see https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "InvalidSnapshot.NotFound" {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
var ebsVolumeIDRegex = regexp.MustCompile("vol-.*")

View File

@@ -19,6 +19,7 @@ package azure
import (
"context"
"fmt"
"net/http"
"os"
"regexp"
"strings"
@@ -280,7 +281,16 @@ func (b *blockStore) DeleteSnapshot(snapshotID string) error {
err = <-errChan
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if azureErr, ok := err.(autorest.DetailedError); ok && azureErr.StatusCode == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func getComputeResourceName(subscription, resourceGroup, resource, name string) string {

View File

@@ -19,6 +19,7 @@ package gcp
import (
"encoding/json"
"io/ioutil"
"net/http"
"os"
"github.com/pkg/errors"
@@ -27,6 +28,7 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"k8s.io/apimachinery/pkg/runtime"
@@ -212,7 +214,16 @@ func getSnapshotTags(arkTags map[string]string, diskDescription string, log logr
func (b *blockStore) DeleteSnapshot(snapshotID string) error {
_, err := b.gce.Snapshots.Delete(b.project, snapshotID).Do()
return errors.WithStack(err)
// if it's a 404 (not found) error, we don't need to return an error
// since the snapshot is not there.
if gcpErr, ok := err.(*googleapi.Error); ok && gcpErr.Code == http.StatusNotFound {
return nil
}
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (b *blockStore) GetVolumeID(pv runtime.Unstructured) (string, error) {

View File

@@ -28,6 +28,7 @@ import (
"github.com/heptio/ark/pkg/cloudprovider/azure"
"github.com/heptio/ark/pkg/cloudprovider/gcp"
"github.com/heptio/ark/pkg/cmd"
arkdiscovery "github.com/heptio/ark/pkg/discovery"
arkplugin "github.com/heptio/ark/pkg/plugin"
"github.com/heptio/ark/pkg/restore"
)
@@ -87,7 +88,13 @@ func NewCommand(f client.Factory) *cobra.Command {
clientset, err := f.KubeClient()
cmd.CheckError(err)
action, err = backup.NewServiceAccountAction(logger, clientset.RbacV1().ClusterRoleBindings())
discoveryHelper, err := arkdiscovery.NewHelper(clientset.Discovery(), logger)
cmd.CheckError(err)
action, err = backup.NewServiceAccountAction(
logger,
backup.NewClusterRoleBindingListerMap(clientset),
discoveryHelper)
cmd.CheckError(err)
default:
logger.Fatal("Unrecognized plugin name")

View File

@@ -34,11 +34,12 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
@@ -152,6 +153,7 @@ type server struct {
snapshotService cloudprovider.SnapshotService
discoveryClient discovery.DiscoveryInterface
clientPool dynamic.ClientPool
discoveryHelper arkdiscovery.Helper
sharedInformerFactory informers.SharedInformerFactory
ctx context.Context
cancelFunc context.CancelFunc
@@ -203,11 +205,23 @@ func newServer(namespace, baseName, pluginDir, metricsAddr string, logger *logru
}
func (s *server) run() error {
defer s.pluginManager.CleanupClients()
defer s.pluginManager.CloseAllClients()
signals.CancelOnShutdown(s.cancelFunc, s.logger)
if err := s.ensureArkNamespace(); err != nil {
// Since s.namespace, which specifies where backups/restores/schedules/etc. should live,
// *could* be different from the namespace where the Ark server pod runs, check to make
// sure it exists, and fail fast if it doesn't.
if err := s.namespaceExists(s.namespace); err != nil {
return err
}
if err := s.initDiscoveryHelper(); err != nil {
return err
}
// check to ensure all Ark CRDs exist
if err := s.arkResourcesExist(); err != nil {
return err
}
@@ -244,22 +258,78 @@ func (s *server) run() error {
return nil
}
func (s *server) ensureArkNamespace() error {
logContext := s.logger.WithField("namespace", s.namespace)
// namespaceExists returns nil if namespace can be successfully
// gotten from the kubernetes API, or an error otherwise.
func (s *server) namespaceExists(namespace string) error {
s.logger.WithField("namespace", namespace).Info("Checking existence of namespace")
logContext.Info("Ensuring namespace exists for backups")
defaultNamespace := v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: s.namespace,
},
if _, err := s.kubeClient.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}); err != nil {
return errors.WithStack(err)
}
if created, err := kube.EnsureNamespaceExists(&defaultNamespace, s.kubeClient.CoreV1().Namespaces()); created {
logContext.Info("Namespace created")
} else if err != nil {
s.logger.WithField("namespace", namespace).Info("Namespace exists")
return nil
}
// initDiscoveryHelper instantiates the server's discovery helper and spawns a
// goroutine to call Refresh() every 5 minutes.
func (s *server) initDiscoveryHelper() error {
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient, s.logger)
if err != nil {
return err
}
logContext.Info("Namespace already exists")
s.discoveryHelper = discoveryHelper
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
s.logger.WithError(err).Error("Error refreshing discovery")
}
},
5*time.Minute,
s.ctx.Done(),
)
return nil
}
// arkResourcesExist checks for the existence of each Ark CRD via discovery
// and returns an error if any of them don't exist.
func (s *server) arkResourcesExist() error {
s.logger.Info("Checking existence of Ark custom resource definitions")
var arkGroupVersion *metav1.APIResourceList
for _, gv := range s.discoveryHelper.Resources() {
if gv.GroupVersion == api.SchemeGroupVersion.String() {
arkGroupVersion = gv
break
}
}
if arkGroupVersion == nil {
return errors.Errorf("Ark API group %s not found", api.SchemeGroupVersion)
}
foundResources := sets.NewString()
for _, resource := range arkGroupVersion.APIResources {
foundResources.Insert(resource.Kind)
}
var errs []error
for kind := range api.CustomResources() {
if foundResources.Has(kind) {
s.logger.WithField("kind", kind).Debug("Found custom resource")
continue
}
errs = append(errs, errors.Errorf("custom resource %s not found in Ark API group %s", kind, api.SchemeGroupVersion))
}
if len(errs) > 0 {
return kubeerrs.NewAggregate(errs)
}
s.logger.Info("All Ark custom resource definitions exist")
return nil
}
@@ -311,6 +381,7 @@ var defaultResourcePriorities = []string{
"serviceaccounts",
"limitranges",
"pods",
"replicaset",
}
func applyConfigDefaults(c *api.Config, logger logrus.FieldLogger) {
@@ -541,27 +612,13 @@ func (s *server) runControllers(config *api.Config) error {
wg.Done()
}()
discoveryHelper, err := arkdiscovery.NewHelper(s.discoveryClient, s.logger)
if err != nil {
return err
}
go wait.Until(
func() {
if err := discoveryHelper.Refresh(); err != nil {
s.logger.WithError(err).Error("Error refreshing discovery")
}
},
5*time.Minute,
ctx.Done(),
)
if config.RestoreOnlyMode {
s.logger.Info("Restore only mode - not starting the backup, schedule, delete-backup, or GC controllers")
} else {
backupTracker := controller.NewBackupTracker()
backupper, err := backup.NewKubernetesBackupper(
discoveryHelper,
s.discoveryHelper,
client.NewDynamicFactory(s.clientPool),
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.snapshotService,
@@ -595,6 +652,7 @@ func (s *server) runControllers(config *api.Config) error {
s.sharedInformerFactory.Ark().V1().Schedules(),
config.ScheduleSyncPeriod.Duration,
s.logger,
s.metrics,
)
wg.Add(1)
go func() {
@@ -605,6 +663,7 @@ func (s *server) runControllers(config *api.Config) error {
gcController := controller.NewGCController(
s.logger,
s.sharedInformerFactory.Ark().V1().Backups(),
s.sharedInformerFactory.Ark().V1().DeleteBackupRequests(),
s.arkClient.ArkV1(),
config.GCSyncPeriod.Duration,
)
@@ -637,7 +696,7 @@ func (s *server) runControllers(config *api.Config) error {
}
restorer, err := restore.NewKubernetesRestorer(
discoveryHelper,
s.discoveryHelper,
client.NewDynamicFactory(s.clientPool),
s.backupService,
s.snapshotService,

View File

@@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/heptio/ark/pkg/apis/ark/v1"
arktest "github.com/heptio/ark/pkg/util/test"
)
@@ -51,3 +53,47 @@ func TestApplyConfigDefaults(t *testing.T) {
assert.Equal(t, 3*time.Minute, c.ScheduleSyncPeriod.Duration)
assert.Equal(t, []string{"a", "b"}, c.ResourcePriorities)
}
func TestArkResourcesExist(t *testing.T) {
var (
fakeDiscoveryHelper = &arktest.FakeDiscoveryHelper{}
server = &server{
logger: arktest.NewLogger(),
discoveryHelper: fakeDiscoveryHelper,
}
)
// Ark API group doesn't exist in discovery: should error
fakeDiscoveryHelper.ResourceList = []*metav1.APIResourceList{
{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{
Name: "Backups",
Kind: "Backup",
},
},
},
}
assert.Error(t, server.arkResourcesExist())
// Ark API group doesn't contain any custom resources: should error
arkAPIResourceList := &metav1.APIResourceList{
GroupVersion: v1.SchemeGroupVersion.String(),
}
fakeDiscoveryHelper.ResourceList = append(fakeDiscoveryHelper.ResourceList, arkAPIResourceList)
assert.Error(t, server.arkResourcesExist())
// Ark API group contains all custom resources: should not error
for kind := range v1.CustomResources() {
arkAPIResourceList.APIResources = append(arkAPIResourceList.APIResources, metav1.APIResource{
Kind: kind,
})
}
assert.NoError(t, server.arkResourcesExist())
// Ark API group contains some but not all custom resources: should error
arkAPIResourceList.APIResources = arkAPIResourceList.APIResources[:3]
assert.Error(t, server.arkResourcesExist())
}

View File

@@ -92,7 +92,7 @@ func printBackup(backup *v1.Backup, w io.Writer, options printers.PrintOptions)
status = "Deleting"
}
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s", name, status, backup.CreationTimestamp.Time, humanReadableTimeFromNow(expiration), metav1.FormatLabelSelector(backup.Spec.LabelSelector)); err != nil {
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s", name, status, backup.Status.StartTimestamp.Time, humanReadableTimeFromNow(expiration), metav1.FormatLabelSelector(backup.Spec.LabelSelector)); err != nil {
return err
}

View File

@@ -204,7 +204,7 @@ func TestProcessBackup(t *testing.T) {
cloudBackups.On("UploadBackup", "bucket", backup.Name, mock.Anything, mock.Anything, mock.Anything).Return(nil)
pluginManager.On("GetBackupItemActions", backup.Name).Return(nil, nil)
pluginManager.On("CloseBackupItemActions", backup.Name).Return(nil)
pluginManager.On("CloseBackupItemActions", backup.Name).Return()
}
// this is necessary so the Patch() call returns the appropriate object
@@ -321,17 +321,9 @@ type MockManager struct {
}
// CloseBackupItemActions provides a mock function with given fields: backupName
func (_m *MockManager) CloseBackupItemActions(backupName string) error {
ret := _m.Called(backupName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(backupName)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) CloseBackupItemActions(backupName string) {
_ = _m.Called(backupName)
return
}
// GetBackupItemActions provides a mock function with given fields: backupName, logger, level
@@ -358,17 +350,9 @@ func (_m *MockManager) GetBackupItemActions(backupName string) ([]backup.ItemAct
}
// CloseRestoreItemActions provides a mock function with given fields: restoreName
func (_m *MockManager) CloseRestoreItemActions(restoreName string) error {
ret := _m.Called(restoreName)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(restoreName)
} else {
r0 = ret.Error(0)
}
return r0
func (_m *MockManager) CloseRestoreItemActions(restoreName string) {
_ = _m.Called(restoreName)
return
}
// GetRestoreItemActions provides a mock function with given fields: restoreName, logger, level
@@ -440,8 +424,8 @@ func (_m *MockManager) GetObjectStore(name string) (cloudprovider.ObjectStore, e
return r0, r1
}
// CleanupClients provides a mock function
func (_m *MockManager) CleanupClients() {
// CloseAllClients provides a mock function
func (_m *MockManager) CloseAllClients() {
_ = _m.Called()
return
}

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
)
@@ -103,8 +104,7 @@ func NewBackupDeletionController(
deleteBackupRequestInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
AddFunc: c.enqueue,
},
)
@@ -162,6 +162,12 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return err
}
// Remove any existing deletion requests for this backup so we only have
// one at a time
if errs := c.deleteExistingDeletionRequests(req, log); errs != nil {
return kubeerrs.NewAggregate(errs)
}
// Don't allow deleting an in-progress backup
if c.backupTracker.Contains(req.Namespace, req.Spec.BackupName) {
_, err = c.patchDeleteBackupRequest(req, func(r *v1.DeleteBackupRequest) {
@@ -303,6 +309,30 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
return nil
}
func (c *backupDeletionController) deleteExistingDeletionRequests(req *v1.DeleteBackupRequest, log logrus.FieldLogger) []error {
log.Info("Removing existing deletion requests for backup")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
v1.BackupNameLabel: req.Spec.BackupName,
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(req.Namespace).List(selector)
if err != nil {
return []error{errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")}
}
var errs []error
for _, dbr := range dbrs {
if dbr.Name == req.Name {
continue
}
if err := c.deleteBackupRequestClient.DeleteBackupRequests(req.Namespace).Delete(dbr.Name, nil); err != nil {
errs = append(errs, errors.WithStack(err))
}
}
return errs
}
func (c *backupDeletionController) deleteResticSnapshots(backup *v1.Backup) []error {
if c.resticMgr == nil {
return nil

View File

@@ -17,7 +17,6 @@ limitations under the License.
package controller
import (
"context"
"fmt"
"testing"
"time"
@@ -26,7 +25,6 @@ import (
pkgbackup "github.com/heptio/ark/pkg/backup"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/util/kube"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@@ -36,74 +34,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
)
func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) {
req := pkgbackup.NewDeleteBackupRequest("foo", "uid")
req.Namespace = "heptio-ark"
expected := kube.NamespaceAndName(req)
client := fake.NewSimpleClientset(req)
fakeWatch := watch.NewFake()
defer fakeWatch.Stop()
client.PrependWatchReactor("deletebackuprequests", core.DefaultWatchReactor(fakeWatch, nil))
sharedInformers := informers.NewSharedInformerFactory(client, 0)
controller := NewBackupDeletionController(
arktest.NewLogger(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(), // deleteBackupRequestClient
client.ArkV1(), // backupClient
nil, // snapshotService
nil, // backupService
"bucket",
sharedInformers.Ark().V1().Restores(),
client.ArkV1(), // restoreClient
NewBackupTracker(),
nil, // restic repository manager
sharedInformers.Ark().V1().PodVolumeBackups(),
).(*backupDeletionController)
// disable resync handler since we don't want to test it here
controller.resyncFunc = nil
keys := make(chan string)
controller.syncHandler = func(key string) error {
keys <- key
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go sharedInformers.Start(ctx.Done())
go controller.Run(ctx, 1)
// wait for the AddFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for AddFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
req.Status.Phase = v1.DeleteBackupRequestPhaseProcessed
fakeWatch.Add(req)
// wait for the UpdateFunc
select {
case <-ctx.Done():
t.Fatal("test timed out waiting for UpdateFunc")
case key := <-keys:
assert.Equal(t, expected, key)
}
}
func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
@@ -236,6 +169,62 @@ func TestBackupDeletionControllerProcessRequest(t *testing.T) {
assert.Equal(t, expectedActions, td.client.Actions())
})
t.Run("existing deletion requests for the backup are deleted", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)
// add the backup to the tracker so the execution of processRequest doesn't progress
// past checking for an in-progress backup. this makes validation easier.
td.controller.backupTracker.Add(td.req.Namespace, td.req.Spec.BackupName)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(td.req))
existing := &v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar",
Labels: map[string]string{
v1.BackupNameLabel: td.req.Spec.BackupName,
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: td.req.Spec.BackupName,
},
}
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(existing))
_, err := td.client.ArkV1().DeleteBackupRequests(td.req.Namespace).Create(existing)
require.NoError(t, err)
require.NoError(t, td.sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(
&v1.DeleteBackupRequest{
ObjectMeta: metav1.ObjectMeta{
Namespace: td.req.Namespace,
Name: "bar-2",
Labels: map[string]string{
v1.BackupNameLabel: "some-other-backup",
},
},
Spec: v1.DeleteBackupRequestSpec{
BackupName: "some-other-backup",
},
},
))
assert.NoError(t, td.controller.processRequest(td.req))
expectedDeleteAction := core.NewDeleteAction(
v1.SchemeGroupVersion.WithResource("deletebackuprequests"),
td.req.Namespace,
"bar",
)
// first action is the Create of an existing DBR for the backup as part of test data setup
// second action is the Delete of the existing DBR, which we're validating
// third action is the Patch of the DBR to set it to processed with an error
require.Len(t, td.client.Actions(), 3)
assert.Equal(t, expectedDeleteAction, td.client.Actions()[1])
})
t.Run("deleting an in progress backup isn't allowed", func(t *testing.T) {
td := setupBackupDeletionControllerTest()
defer td.backupService.AssertExpectations(t)

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@@ -39,6 +40,7 @@ type gcController struct {
logger logrus.FieldLogger
backupLister listers.BackupLister
deleteBackupRequestLister listers.DeleteBackupRequestLister
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter
syncPeriod time.Duration
@@ -49,6 +51,7 @@ type gcController struct {
func NewGCController(
logger logrus.FieldLogger,
backupInformer informers.BackupInformer,
deleteBackupRequestInformer informers.DeleteBackupRequestInformer,
deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter,
syncPeriod time.Duration,
) Interface {
@@ -62,12 +65,16 @@ func NewGCController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
backupLister: backupInformer.Lister(),
deleteBackupRequestLister: deleteBackupRequestInformer.Lister(),
deleteBackupRequestClient: deleteBackupRequestClient,
logger: logger,
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters, backupInformer.Informer().HasSynced)
c.cacheSyncWaiters = append(c.cacheSyncWaiters,
backupInformer.Informer().HasSynced,
deleteBackupRequestInformer.Informer().HasSynced,
)
c.resyncPeriod = syncPeriod
c.resyncFunc = c.enqueueAllBackups
@@ -130,12 +137,32 @@ func (c *gcController) processQueueItem(key string) error {
return nil
}
log.Info("Backup has expired. Creating a DeleteBackupRequest.")
log.Info("Backup has expired")
selector := labels.SelectorFromSet(labels.Set(map[string]string{
arkv1api.BackupNameLabel: backup.Name,
arkv1api.BackupUIDLabel: string(backup.UID),
}))
dbrs, err := c.deleteBackupRequestLister.DeleteBackupRequests(ns).List(selector)
if err != nil {
return errors.Wrap(err, "error listing existing DeleteBackupRequests for backup")
}
// if there's an existing unprocessed deletion request for this backup, don't create
// another one
for _, dbr := range dbrs {
switch dbr.Status.Phase {
case "", arkv1api.DeleteBackupRequestPhaseNew, arkv1api.DeleteBackupRequestPhaseInProgress:
log.Info("Backup already has a pending deletion request")
return nil
}
}
log.Info("Creating a new deletion request")
req := pkgbackup.NewDeleteBackupRequest(backup.Name, string(backup.UID))
_, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req)
if err != nil {
if _, err = c.deleteBackupRequestClient.DeleteBackupRequests(ns).Create(req); err != nil {
return errors.Wrap(err, "error creating DeleteBackupRequest")
}

View File

@@ -25,7 +25,9 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
@@ -46,6 +48,7 @@ func TestGCControllerEnqueueAllBackups(t *testing.T) {
controller = NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -109,6 +112,7 @@ func TestGCControllerHasUpdateFunc(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -152,6 +156,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
tests := []struct {
name string
backup *api.Backup
deleteBackupRequests []*api.DeleteBackupRequest
expectDeletion bool
createDeleteBackupRequestError bool
expectError bool
@@ -160,19 +165,63 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
name: "can't find backup - no error",
},
{
name: "expired backup is deleted",
name: "unexpired backup is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
Backup,
expectDeletion: false,
},
{
name: "expired backup with no pending deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
expectDeletion: true,
},
{
name: "unexpired backup is not deleted",
name: "expired backup with a pending deletion request is not deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(1 * time.Minute)).
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseInProgress,
},
},
},
expectDeletion: false,
},
{
name: "expired backup with only processed deletion requests is deleted",
backup: arktest.NewTestBackup().WithName("backup-1").
WithExpiration(fakeClock.Now().Add(-1 * time.Second)).
Backup,
deleteBackupRequests: []*api.DeleteBackupRequest{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: api.DefaultNamespace,
Name: "foo",
Labels: map[string]string{
api.BackupNameLabel: "backup-1",
api.BackupUIDLabel: "",
},
},
Status: api.DeleteBackupRequestStatus{
Phase: api.DeleteBackupRequestPhaseProcessed,
},
},
},
expectDeletion: true,
},
{
name: "create DeleteBackupRequest error returns an error",
backup: arktest.NewTestBackup().WithName("backup-1").
@@ -194,6 +243,7 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
controller := NewGCController(
arktest.NewLogger(),
sharedInformers.Ark().V1().Backups(),
sharedInformers.Ark().V1().DeleteBackupRequests(),
client.ArkV1(),
1*time.Millisecond,
).(*gcController)
@@ -205,6 +255,10 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
sharedInformers.Ark().V1().Backups().Informer().GetStore().Add(test.backup)
}
for _, dbr := range test.deleteBackupRequests {
sharedInformers.Ark().V1().DeleteBackupRequests().Informer().GetStore().Add(dbr)
}
if test.createDeleteBackupRequestError {
client.PrependReactor("create", "deletebackuprequests", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.New("foo")
@@ -216,7 +270,12 @@ func TestGCControllerProcessQueueItem(t *testing.T) {
assert.Equal(t, test.expectError, gotErr)
if test.expectDeletion {
assert.Len(t, client.Actions(), 1)
require.Len(t, client.Actions(), 1)
createAction, ok := client.Actions()[0].(core.CreateAction)
require.True(t, ok)
assert.Equal(t, "deletebackuprequests", createAction.GetResource().Resource)
} else {
assert.Len(t, client.Actions(), 0)
}

View File

@@ -377,7 +377,7 @@ func TestProcessRestore(t *testing.T) {
if test.restore != nil {
pluginManager.On("GetRestoreItemActions", test.restore.Name).Return(nil, nil)
pluginManager.On("CloseRestoreItemActions", test.restore.Name).Return(nil)
pluginManager.On("CloseRestoreItemActions", test.restore.Name).Return()
}
err = c.processRestore(key)

View File

@@ -41,6 +41,7 @@ import (
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/metrics"
kubeutil "github.com/heptio/ark/pkg/util/kube"
)
@@ -55,6 +56,7 @@ type scheduleController struct {
syncPeriod time.Duration
clock clock.Clock
logger logrus.FieldLogger
metrics *metrics.ServerMetrics
}
func NewScheduleController(
@@ -64,6 +66,7 @@ func NewScheduleController(
schedulesInformer informers.ScheduleInformer,
syncPeriod time.Duration,
logger logrus.FieldLogger,
metrics *metrics.ServerMetrics,
) *scheduleController {
if syncPeriod < time.Minute {
logger.WithField("syncPeriod", syncPeriod).Info("Provided schedule sync period is too short. Setting to 1 minute")
@@ -80,6 +83,7 @@ func NewScheduleController(
syncPeriod: syncPeriod,
clock: clock.RealClock{},
logger: logger,
metrics: metrics,
}
c.syncHandler = c.processSchedule
@@ -106,6 +110,10 @@ func NewScheduleController(
return
}
c.queue.Add(key)
scheduleName := schedule.GetName()
c.logger.Info("Creating schedule ", scheduleName)
//Init Prometheus metrics to 0 to have them flowing up
metrics.InitSchedule(scheduleName)
},
},
)

View File

@@ -34,6 +34,7 @@ import (
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/versioned/fake"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions"
"github.com/heptio/ark/pkg/metrics"
"github.com/heptio/ark/pkg/util/collections"
arktest "github.com/heptio/ark/pkg/util/test"
)
@@ -129,6 +130,7 @@ func TestProcessSchedule(t *testing.T) {
sharedInformers.Ark().V1().Schedules(),
time.Duration(0),
logger,
metrics.NewServerMetrics(),
)
var (

View File

@@ -45,6 +45,10 @@ type Helper interface {
// Refresh pulls an updated set of Ark-backuppable resources from the
// discovery API.
Refresh() error
// APIGroups gets the current set of supported APIGroups
// in the cluster.
APIGroups() []metav1.APIGroup
}
type helper struct {
@@ -56,6 +60,7 @@ type helper struct {
mapper meta.RESTMapper
resources []*metav1.APIResourceList
resourcesMap map[schema.GroupVersionResource]metav1.APIResource
apiGroups []metav1.APIGroup
}
var _ Helper = &helper{}
@@ -127,6 +132,12 @@ func (h *helper) Refresh() error {
}
}
apiGroupList, err := h.discoveryClient.ServerGroups()
if err != nil {
return errors.WithStack(err)
}
h.apiGroups = apiGroupList.Groups
return nil
}
@@ -165,3 +176,9 @@ func (h *helper) Resources() []*metav1.APIResourceList {
defer h.lock.RUnlock()
return h.resources
}
func (h *helper) APIGroups() []metav1.APIGroup {
h.lock.RLock()
defer h.lock.RUnlock()
return h.apiGroups
}

View File

@@ -27,17 +27,13 @@ import (
// CRDs returns a list of the CRD types for all of the required Ark CRDs
func CRDs() []*apiextv1beta1.CustomResourceDefinition {
return []*apiextv1beta1.CustomResourceDefinition{
crd("Backup", "backups"),
crd("Schedule", "schedules"),
crd("Restore", "restores"),
crd("Config", "configs"),
crd("DownloadRequest", "downloadrequests"),
crd("DeleteBackupRequest", "deletebackuprequests"),
crd("PodVolumeBackup", "podvolumebackups"),
crd("PodVolumeRestore", "podvolumerestores"),
crd("ResticRepository", "resticrepositories"),
var crds []*apiextv1beta1.CustomResourceDefinition
for kind, typeInfo := range arkv1.CustomResources() {
crds = append(crds, crd(kind, typeInfo.PluralName))
}
return crds
}
func crd(kind, plural string) *apiextv1beta1.CustomResourceDefinition {

View File

@@ -106,6 +106,18 @@ func (m *ServerMetrics) RegisterAllMetrics() {
}
}
func (m *ServerMetrics) InitSchedule(scheduleName string) {
if c, ok := m.metrics[backupAttemptCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
if c, ok := m.metrics[backupSuccessCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
if c, ok := m.metrics[backupFailureCount].(*prometheus.CounterVec); ok {
c.WithLabelValues(scheduleName).Set(0)
}
}
// SetBackupTarballSizeBytesGauge records the size, in bytes, of a backup tarball.
func (m *ServerMetrics) SetBackupTarballSizeBytesGauge(backupSchedule string, size int64) {
if g, ok := m.metrics[backupTarballSizeBytesGauge].(*prometheus.GaugeVec); ok {

View File

@@ -38,6 +38,6 @@ func (b *clientBuilder) withCommand(name string, args ...string) *clientBuilder
return b
}
func (b *clientBuilder) client() *hcplugin.Client {
func (b *clientBuilder) client() pluginClient {
return hcplugin.NewClient(b.config)
}

View File

@@ -3,7 +3,6 @@ package plugin
import (
"sync"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
)
@@ -20,7 +19,7 @@ type clientKey struct {
func newClientStore() *clientStore {
return &clientStore{
clients: make(map[clientKey]map[string]*plugin.Client),
clients: make(map[clientKey]map[string]pluginClient),
lock: &sync.RWMutex{},
}
}
@@ -33,13 +32,13 @@ type clientStore struct {
// kind and scope (e.g. all BackupItemActions for a given
// backup), and efficient lookup by kind+name+scope (e.g.
// the AWS ObjectStore.)
clients map[clientKey]map[string]*plugin.Client
clients map[clientKey]map[string]pluginClient
lock *sync.RWMutex
}
// get returns a plugin client for the given kind/name/scope, or an error if none
// is found.
func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client, error) {
func (s *clientStore) get(kind PluginKind, name, scope string) (pluginClient, error) {
s.lock.RLock()
defer s.lock.RUnlock()
@@ -54,12 +53,12 @@ func (s *clientStore) get(kind PluginKind, name, scope string) (*plugin.Client,
// list returns all plugin clients for the given kind/scope, or an
// error if none are found.
func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, error) {
func (s *clientStore) list(kind PluginKind, scope string) ([]pluginClient, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if forScope, found := s.clients[clientKey{kind, scope}]; found {
var clients []*plugin.Client
var clients []pluginClient
for _, client := range forScope {
clients = append(clients, client)
@@ -71,15 +70,31 @@ func (s *clientStore) list(kind PluginKind, scope string) ([]*plugin.Client, err
return nil, errors.New("clients not found")
}
// listAll returns all plugin clients for all kinds/scopes, or a
// zero-valued slice if there are none.
func (s *clientStore) listAll() []pluginClient {
s.lock.RLock()
defer s.lock.RUnlock()
var clients []pluginClient
for _, pluginsByName := range s.clients {
for name := range pluginsByName {
clients = append(clients, pluginsByName[name])
}
}
return clients
}
// add stores a plugin client for the given kind/name/scope.
func (s *clientStore) add(client *plugin.Client, kind PluginKind, name, scope string) {
func (s *clientStore) add(client pluginClient, kind PluginKind, name, scope string) {
s.lock.Lock()
defer s.lock.Unlock()
key := clientKey{kind, scope}
if _, found := s.clients[key]; !found {
s.clients[key] = make(map[string]*plugin.Client)
s.clients[key] = make(map[string]pluginClient)
}
s.clients[key][name] = client
@@ -103,3 +118,11 @@ func (s *clientStore) deleteAll(kind PluginKind, scope string) {
delete(s.clients, clientKey{kind, scope})
}
// clear removes all clients for all kinds/scopes from the store.
func (s *clientStore) clear() {
s.lock.Lock()
defer s.lock.Unlock()
s.clients = make(map[clientKey]map[string]pluginClient)
}

View File

@@ -44,7 +44,6 @@ func baseConfig() *plugin.ClientConfig {
return &plugin.ClientConfig{
HandshakeConfig: Handshake,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Managed: true,
}
}
@@ -112,7 +111,7 @@ type Manager interface {
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
CloseBackupItemActions(backupName string) error
CloseBackupItemActions(backupName string)
// GetRestoreItemActions returns all restore.ItemAction plugins.
// These plugin instances should ONLY be used for a single restore
@@ -123,10 +122,10 @@ type Manager interface {
// CloseRestoreItemActions terminates the plugin sub-processes that
// are hosting RestoreItemAction plugins for the given restore name.
CloseRestoreItemActions(restoreName string) error
CloseRestoreItemActions(restoreName string)
// CleanupClients kills all plugin subprocesses.
CleanupClients()
// CloseAllClients terminates all plugin subprocesses.
CloseAllClients()
}
type manager struct {
@@ -165,7 +164,12 @@ func pluginForKind(kind PluginKind) plugin.Plugin {
}
}
func getPluginInstance(client *plugin.Client, kind PluginKind) (interface{}, error) {
type pluginClient interface {
Client() (plugin.ClientProtocol, error)
Kill()
}
func getPluginInstance(client pluginClient, kind PluginKind) (interface{}, error) {
protocolClient, err := client.Client()
if err != nil {
return nil, errors.WithStack(err)
@@ -349,8 +353,8 @@ func (m *manager) GetBackupItemActions(backupName string) ([]backup.ItemAction,
// CloseBackupItemActions terminates the plugin sub-processes that
// are hosting BackupItemAction plugins for the given backup name.
func (m *manager) CloseBackupItemActions(backupName string) error {
return closeAll(m.clientStore, PluginKindBackupItemAction, backupName)
func (m *manager) CloseBackupItemActions(backupName string) {
closeAll(m.clientStore, PluginKindBackupItemAction, backupName)
}
func (m *manager) GetRestoreItemActions(restoreName string) ([]restore.ItemAction, error) {
@@ -398,14 +402,18 @@ func (m *manager) GetRestoreItemActions(restoreName string) ([]restore.ItemActio
// CloseRestoreItemActions terminates the plugin sub-processes that
// are hosting RestoreItemAction plugins for the given restore name.
func (m *manager) CloseRestoreItemActions(restoreName string) error {
return closeAll(m.clientStore, PluginKindRestoreItemAction, restoreName)
func (m *manager) CloseRestoreItemActions(restoreName string) {
closeAll(m.clientStore, PluginKindRestoreItemAction, restoreName)
}
func closeAll(store *clientStore, kind PluginKind, scope string) error {
func closeAll(store *clientStore, kind PluginKind, scope string) {
clients, err := store.list(kind, scope)
if err != nil {
return err
// store.list(...) only returns an error if no clients are
// found for the specified kind and scope. We don't need
// to treat this as an error when trying to close all clients,
// because this means there are no clients to close.
return
}
for _, client := range clients {
@@ -413,10 +421,12 @@ func closeAll(store *clientStore, kind PluginKind, scope string) error {
}
store.deleteAll(kind, scope)
return nil
}
func (m *manager) CleanupClients() {
plugin.CleanupClients()
func (m *manager) CloseAllClients() {
for _, client := range m.clientStore.listAll() {
client.Kill()
}
m.clientStore.clear()
}

179
pkg/plugin/manager_test.go Normal file
View File

@@ -0,0 +1,179 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"testing"
plugin "github.com/hashicorp/go-plugin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakePluginClient struct {
terminated map[*fakePluginClient]bool
}
func (c *fakePluginClient) Kill() {
c.terminated[c] = true
}
func (c *fakePluginClient) Client() (plugin.ClientProtocol, error) {
panic("not implemented")
}
func TestCloseAllClients(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"scope-1", "scope-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseAllClients()
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// all clients should have their entry in the `terminated` map flipped to true
for _, status := range terminated {
assert.True(t, status)
}
// the store's `clients` map should be empty
assert.Len(t, store.clients, 0)
}
func TestCloseBackupItemActions(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
expectedTerminations = make(map[*fakePluginClient]bool)
backupName = "backup-1"
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"backup-1", "backup-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
if kind == PluginKindBackupItemAction && scope == backupName {
expectedTerminations[client] = true
}
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseBackupItemActions(backupName)
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// only those clients that we expected to be terminated should have
// their entry in the `terminated` map flipped to true
for client, status := range terminated {
_, ok := expectedTerminations[client]
assert.Equal(t, ok, status)
}
// clients for the kind/scope should have been removed
_, err := store.list(PluginKindBackupItemAction, backupName)
assert.EqualError(t, err, "clients not found")
// total number of clients should decrease by the number of terminated
// clients
assert.Len(t, store.listAll(), clientCount-len(expectedTerminations))
}
func TestCloseRestoreItemActions(t *testing.T) {
var (
store = newClientStore()
terminated = make(map[*fakePluginClient]bool)
clientCount = 0
expectedTerminations = make(map[*fakePluginClient]bool)
restoreName = "restore-2"
)
for _, kind := range AllPluginKinds {
for _, scope := range []string{"restore-1", "restore-2"} {
for _, name := range []string{"name-1", "name-2"} {
client := &fakePluginClient{terminated: terminated}
terminated[client] = false
store.add(client, kind, name, scope)
clientCount++
if kind == PluginKindRestoreItemAction && scope == restoreName {
expectedTerminations[client] = true
}
}
}
}
// verify setup
require.Len(t, terminated, clientCount)
for _, status := range terminated {
require.False(t, status)
}
m := &manager{clientStore: store}
m.CloseRestoreItemActions(restoreName)
// we should have no additions to or removals from the `terminated` map
assert.Len(t, terminated, clientCount)
// only those clients that we expected to be terminated should have
// their entry in the `terminated` map flipped to true
for client, status := range terminated {
_, ok := expectedTerminations[client]
assert.Equal(t, ok, status)
}
// clients for the kind/scope should have been removed
_, err := store.list(PluginKindRestoreItemAction, restoreName)
assert.EqualError(t, err, "clients not found")
// total number of clients should decrease by the number of terminated
// clients
assert.Len(t, store.listAll(), clientCount-len(expectedTerminations))
}

View File

@@ -0,0 +1,38 @@
// Code generated by mockery v1.0.0
package mocks
import corev1 "k8s.io/api/core/v1"
import logrus "github.com/sirupsen/logrus"
import mock "github.com/stretchr/testify/mock"
import v1 "github.com/heptio/ark/pkg/apis/ark/v1"
// Backupper is an autogenerated mock type for the Backupper type
type Backupper struct {
mock.Mock
}
// BackupPodVolumes provides a mock function with given fields: backup, pod, log
func (_m *Backupper) BackupPodVolumes(backup *v1.Backup, pod *corev1.Pod, log logrus.FieldLogger) (map[string]string, []error) {
ret := _m.Called(backup, pod, log)
var r0 map[string]string
if rf, ok := ret.Get(0).(func(*v1.Backup, *corev1.Pod, logrus.FieldLogger) map[string]string); ok {
r0 = rf(backup, pod, log)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]string)
}
}
var r1 []error
if rf, ok := ret.Get(1).(func(*v1.Backup, *corev1.Pod, logrus.FieldLogger) []error); ok {
r1 = rf(backup, pod, log)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]error)
}
}
return r0, r1
}

View File

@@ -34,7 +34,7 @@ import (
// Restorer can execute restic restores of volumes in a pod.
type Restorer interface {
// RestorePodVolumes restores all annotated volumes in a pod.
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, log logrus.FieldLogger) []error
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error
}
type restorer struct {
@@ -84,14 +84,14 @@ func newRestorer(
return r
}
func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, log logrus.FieldLogger) []error {
func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error {
// get volumes to restore from pod's annotations
volumesToRestore := GetPodSnapshotAnnotations(pod)
if len(volumesToRestore) == 0 {
return nil
}
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, pod.Namespace)
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, sourceNamespace)
if err != nil {
return []error{err}
}

View File

@@ -47,8 +47,8 @@ func mergeServiceAccounts(fromCluster, fromBackup *unstructured.Unstructured) (*
for i := len(backupSA.Secrets) - 1; i >= 0; i-- {
secret := &backupSA.Secrets[i]
if strings.HasPrefix(secret.Name, "default-token-") {
// Copy all secrets *except* default-token
if strings.HasPrefix(secret.Name, backupSA.Name+"-token-") {
// Copy all secrets *except* -token-
backupSA.Secrets = append(backupSA.Secrets[:i], backupSA.Secrets[i+1:]...)
break
}
@@ -58,9 +58,9 @@ func mergeServiceAccounts(fromCluster, fromBackup *unstructured.Unstructured) (*
desired.ImagePullSecrets = mergeLocalObjectReferenceSlices(desired.ImagePullSecrets, backupSA.ImagePullSecrets)
collections.MergeMaps(desired.Labels, backupSA.Labels)
desired.Labels = collections.MergeMaps(desired.Labels, backupSA.Labels)
collections.MergeMaps(desired.Annotations, backupSA.Annotations)
desired.Annotations = collections.MergeMaps(desired.Annotations, backupSA.Annotations)
desiredUnstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(desired)
if err != nil {

View File

@@ -17,7 +17,7 @@ limitations under the License.
package restore
import (
"regexp"
"strings"
"github.com/sirupsen/logrus"
@@ -43,10 +43,6 @@ func (a *podAction) AppliesTo() (ResourceSelector, error) {
}, nil
}
var (
defaultTokenRegex = regexp.MustCompile("default-token-.*")
)
func (a *podAction) Execute(obj runtime.Unstructured, restore *api.Restore) (runtime.Unstructured, error, error) {
a.logger.Debug("getting spec")
spec, err := collections.GetMap(obj.UnstructuredContent(), "spec")
@@ -57,6 +53,11 @@ func (a *podAction) Execute(obj runtime.Unstructured, restore *api.Restore) (run
a.logger.Debug("deleting spec.NodeName")
delete(spec, "nodeName")
serviceAccountName, err := collections.GetString(spec, "serviceAccountName")
if err != nil {
return nil, nil, err
}
newVolumes := make([]interface{}, 0)
a.logger.Debug("iterating over volumes")
err = collections.ForEach(spec, "volumes", func(volume map[string]interface{}) error {
@@ -66,11 +67,11 @@ func (a *podAction) Execute(obj runtime.Unstructured, restore *api.Restore) (run
}
a.logger.WithField("volumeName", name).Debug("Checking volume")
if !defaultTokenRegex.MatchString(name) {
if strings.HasPrefix(name, serviceAccountName+"-token-") {
a.logger.WithField("volumeName", name).Debug("Excluding volume")
} else {
a.logger.WithField("volumeName", name).Debug("Preserving volume")
newVolumes = append(newVolumes, volume)
} else {
a.logger.WithField("volumeName", name).Debug("Excluding volume")
}
return nil
@@ -92,11 +93,42 @@ func (a *podAction) Execute(obj runtime.Unstructured, restore *api.Restore) (run
}
a.logger.WithField("volumeMount", name).Debug("Checking volumeMount")
if !defaultTokenRegex.MatchString(name) {
if strings.HasPrefix(name, serviceAccountName+"-token-") {
a.logger.WithField("volumeMount", name).Debug("Excluding volumeMount")
} else {
a.logger.WithField("volumeMount", name).Debug("Preserving volumeMount")
newVolumeMounts = append(newVolumeMounts, volumeMount)
} else {
}
return nil
})
if err != nil {
return err
}
container["volumeMounts"] = newVolumeMounts
return nil
})
if err != nil {
return nil, nil, err
}
a.logger.Debug("iterating over init containers")
err = collections.ForEach(spec, "initContainers", func(container map[string]interface{}) error {
var newVolumeMounts []interface{}
err := collections.ForEach(container, "volumeMounts", func(volumeMount map[string]interface{}) error {
name, err := collections.GetString(volumeMount, "name")
if err != nil {
return err
}
a.logger.WithField("volumeMount", name).Debug("Checking volumeMount")
if strings.HasPrefix(name, serviceAccountName+"-token-") {
a.logger.WithField("volumeMount", name).Debug("Excluding volumeMount")
} else {
a.logger.WithField("volumeMount", name).Debug("Preserving volumeMount")
newVolumeMounts = append(newVolumeMounts, volumeMount)
}
return nil

View File

@@ -40,32 +40,42 @@ func TestPodActionExecute(t *testing.T) {
{
name: "nodeName (only) should be deleted from spec",
obj: NewTestUnstructured().WithName("pod-1").WithSpec("nodeName", "foo").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("containers", []interface{}{}).
WithSpecField("initContainers", []interface{}{}).
Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("pod-1").WithSpec("foo").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("containers", []interface{}{}).
WithSpecField("initContainers", []interface{}{}).
Unstructured,
},
{
name: "volumes matching default-token regex should be deleted",
name: "volumes matching prefix ServiceAccount-token- should be deleted",
obj: NewTestUnstructured().WithName("pod-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("initContainers", []interface{}{}).
WithSpecField("volumes", []interface{}{
map[string]interface{}{"name": "foo"},
map[string]interface{}{"name": "default-token-foo"},
map[string]interface{}{"name": "foo-token-foo"},
}).WithSpecField("containers", []interface{}{}).Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("pod-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("initContainers", []interface{}{}).
WithSpecField("volumes", []interface{}{
map[string]interface{}{"name": "foo"},
}).WithSpecField("containers", []interface{}{}).Unstructured,
},
{
name: "container volumeMounts matching default-token regex should be deleted",
name: "container volumeMounts matching prefix ServiceAccount-token- should be deleted",
obj: NewTestUnstructured().WithName("svc-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("initContainers", []interface{}{}).
WithSpecField("containers", []interface{}{
map[string]interface{}{
"volumeMounts": []interface{}{
@@ -73,7 +83,7 @@ func TestPodActionExecute(t *testing.T) {
"name": "foo",
},
map[string]interface{}{
"name": "default-token-foo",
"name": "foo-token-foo",
},
},
},
@@ -81,7 +91,9 @@ func TestPodActionExecute(t *testing.T) {
Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("initContainers", []interface{}{}).
WithSpecField("containers", []interface{}{
map[string]interface{}{
"volumeMounts": []interface{}{
@@ -93,6 +105,41 @@ func TestPodActionExecute(t *testing.T) {
}).
Unstructured,
},
{
name: "initContainer volumeMounts matching prefix ServiceAccount-token- should be deleted",
obj: NewTestUnstructured().WithName("svc-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("containers", []interface{}{}).
WithSpecField("initContainers", []interface{}{
map[string]interface{}{
"volumeMounts": []interface{}{
map[string]interface{}{
"name": "foo",
},
map[string]interface{}{
"name": "foo-token-foo",
},
},
},
}).
Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithSpec("serviceAccountName", "foo").
WithSpecField("volumes", []interface{}{}).
WithSpecField("containers", []interface{}{}).
WithSpecField("initContainers", []interface{}{
map[string]interface{}{
"volumeMounts": []interface{}{
map[string]interface{}{
"name": "foo",
},
},
},
}).
Unstructured,
},
}
for _, test := range tests {

View File

@@ -243,7 +243,7 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup,
restore: restore,
prioritizedResources: prioritizedResources,
selector: selector,
logger: log,
log: log,
dynamicFactory: kr.dynamicFactory,
fileSystem: kr.fileSystem,
namespaceClient: kr.namespaceClient,
@@ -324,7 +324,7 @@ type context struct {
restore *api.Restore
prioritizedResources []schema.GroupResource
selector labels.Selector
logger logrus.FieldLogger
log logrus.FieldLogger
dynamicFactory client.DynamicFactory
fileSystem FileSystem
namespaceClient corev1.NamespaceInterface
@@ -338,16 +338,12 @@ type context struct {
pvRestorer PVRestorer
}
func (ctx *context) infof(msg string, args ...interface{}) {
ctx.logger.Infof(msg, args...)
}
func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) {
ctx.infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
ctx.log.Infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
dir, err := ctx.unzipAndExtractBackup(ctx.backupReader)
if err != nil {
ctx.infof("error unzipping and extracting: %v", err)
ctx.log.Infof("error unzipping and extracting: %v", err)
return api.RestoreResult{}, api.RestoreResult{Ark: []string{err.Error()}}
}
defer ctx.fileSystem.RemoveAll(dir)
@@ -452,7 +448,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
nsPath := filepath.Join(nsSubDir, nsName)
if !namespaceFilter.ShouldInclude(nsName) {
ctx.infof("Skipping namespace %s", nsName)
ctx.log.Infof("Skipping namespace %s", nsName)
continue
}
@@ -467,7 +463,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if !existingNamespaces.Has(mappedNsName) {
logger := ctx.logger.WithField("namespace", nsName)
logger := ctx.log.WithField("namespace", nsName)
ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName)
if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil {
addArkError(&errs, err)
@@ -485,15 +481,15 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
}
// TODO timeout?
ctx.logger.Debugf("Waiting on resource wait group for resource=%s", resource.String())
ctx.log.Debugf("Waiting on resource wait group for resource=%s", resource.String())
ctx.resourceWaitGroup.Wait()
ctx.logger.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
ctx.log.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
}
// TODO timeout?
ctx.logger.Debug("Waiting on global wait group")
ctx.log.Debug("Waiting on global wait group")
waitErrs := ctx.globalWaitGroup.Wait()
ctx.logger.Debug("Done waiting on global wait group")
ctx.log.Debug("Done waiting on global wait group")
for _, err := range waitErrs {
// TODO not ideal to be adding these to Ark-level errors
@@ -579,14 +575,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
warnings, errs := api.RestoreResult{}, api.RestoreResult{}
if ctx.restore.Spec.IncludeClusterResources != nil && !*ctx.restore.Spec.IncludeClusterResources && namespace == "" {
ctx.infof("Skipping resource %s because it's cluster-scoped", resource)
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
return warnings, errs
}
if namespace != "" {
ctx.infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
ctx.log.Infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
} else {
ctx.infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
ctx.log.Infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
}
files, err := ctx.fileSystem.ReadDir(resourcePath)
@@ -631,29 +627,20 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
continue
}
if hasControllerOwner(obj.GetOwnerReferences()) {
// non-pods with controller owners shouldn't be restored; pods with controller
// owners should only be restored if they have restic snapshots to restore
if groupResource != kuberesource.Pods || !restic.PodHasSnapshotAnnotation(obj) {
ctx.infof("%s has a controller owner - skipping", kube.NamespaceAndName(obj))
continue
}
}
complete, err := isCompleted(obj, groupResource)
if err != nil {
addToResult(&errs, namespace, fmt.Errorf("error checking completion %q: %v", fullPath, err))
continue
}
if complete {
ctx.infof("%s is complete - skipping", kube.NamespaceAndName(obj))
ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj))
continue
}
if resourceClient == nil {
// initialize client for this Resource. we need
// metadata from an object to do this.
ctx.infof("Getting client for %v", obj.GroupVersionKind())
ctx.log.Infof("Getting client for %v", obj.GroupVersionKind())
resource := metav1.APIResource{
Namespaced: len(namespace) > 0,
@@ -672,7 +659,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
// TODO: move to restore item action if/when we add a ShouldRestore() method to the interface
if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" {
ctx.infof("Not restoring pod because it's a mirror pod")
ctx.log.Infof("Not restoring pod because it's a mirror pod")
continue
}
@@ -680,7 +667,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
_, found := ctx.backup.Status.VolumeBackups[name]
reclaimPolicy, err := collections.GetString(obj.Object, "spec.persistentVolumeReclaimPolicy")
if err == nil && !found && reclaimPolicy == "Delete" {
ctx.infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.log.Infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.pvsToProvision.Insert(name)
@@ -706,8 +693,8 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
go func() {
defer ctx.resourceWaitGroup.Done()
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.logger); err != nil {
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.log); err != nil {
ctx.log.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
}
}()
@@ -722,7 +709,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}
if volumeName, exists := spec["volumeName"]; exists && ctx.pvsToProvision.Has(volumeName.(string)) {
ctx.infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
ctx.log.Infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
delete(spec, "volumeName")
@@ -738,10 +725,10 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
continue
}
ctx.infof("Executing item action for %v", &groupResource)
ctx.log.Infof("Executing item action for %v", &groupResource)
if logSetter, ok := action.ItemAction.(logging.LogSetter); ok {
logSetter.SetLog(ctx.logger)
logSetter.SetLog(ctx.log)
}
updatedObj, warning, err := action.Execute(obj, ctx.restore)
@@ -770,6 +757,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
// necessary because we may have remapped the namespace
// if the namespace is blank, don't create the key
originalNamespace := obj.GetNamespace()
if namespace != "" {
obj.SetNamespace(namespace)
}
@@ -777,19 +765,19 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
// add an ark-restore label to each resource for easy ID
addLabel(obj, api.RestoreLabelKey, ctx.restore.Name)
ctx.infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
ctx.log.Infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
createdObj, restoreErr := resourceClient.Create(obj)
if apierrors.IsAlreadyExists(restoreErr) {
fromCluster, err := resourceClient.Get(name, metav1.GetOptions{})
if err != nil {
ctx.infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
// Remove insubstantial metadata
fromCluster, err = resetMetadataAndStatus(fromCluster)
if err != nil {
ctx.infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
@@ -804,14 +792,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
case kuberesource.ServiceAccounts:
desired, err := mergeServiceAccounts(fromCluster, obj)
if err != nil {
ctx.infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
patchBytes, err := generatePatch(fromCluster, desired)
if err != nil {
ctx.infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
@@ -825,7 +813,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
if err != nil {
addToResult(&warnings, namespace, err)
} else {
ctx.infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
ctx.log.Infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
}
default:
e := errors.Errorf("not restored: %s and is different from backed up version.", restoreErr)
@@ -836,24 +824,24 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}
// Error was something other than an AlreadyExists
if restoreErr != nil {
ctx.infof("error restoring %s: %v", name, err)
ctx.log.Infof("error restoring %s: %v", name, err)
addToResult(&errs, namespace, fmt.Errorf("error restoring %s: %v", fullPath, restoreErr))
continue
}
if groupResource == kuberesource.Pods && len(restic.GetPodSnapshotAnnotations(obj)) > 0 {
if ctx.resticRestorer == nil {
ctx.logger.Warn("No restic restorer, not restoring pod's volumes")
ctx.log.Warn("No restic restorer, not restoring pod's volumes")
} else {
ctx.globalWaitGroup.GoErrorSlice(func() []error {
pod := new(v1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
ctx.logger.WithError(err).Error("error converting unstructured pod")
ctx.log.WithError(err).Error("error converting unstructured pod")
return []error{err}
}
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, ctx.logger); errs != nil {
ctx.logger.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, originalNamespace, ctx.log); errs != nil {
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
return errs
}
@@ -1084,7 +1072,7 @@ func (ctx *context) unmarshal(filePath string) (*unstructured.Unstructured, erro
func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
gzr, err := gzip.NewReader(src)
if err != nil {
ctx.infof("error creating gzip reader: %v", err)
ctx.log.Infof("error creating gzip reader: %v", err)
return "", err
}
defer gzr.Close()
@@ -1097,7 +1085,7 @@ func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
dir, err := ctx.fileSystem.TempDir("", "")
if err != nil {
ctx.infof("error creating temp dir: %v", err)
ctx.log.Infof("error creating temp dir: %v", err)
return "", err
}
@@ -1108,7 +1096,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
break
}
if err != nil {
ctx.infof("error reading tar: %v", err)
ctx.log.Infof("error reading tar: %v", err)
return "", err
}
@@ -1118,7 +1106,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
case tar.TypeDir:
err := ctx.fileSystem.MkdirAll(target, header.FileInfo().Mode())
if err != nil {
ctx.infof("mkdirall error: %v", err)
ctx.log.Infof("mkdirall error: %v", err)
return "", err
}
@@ -1126,7 +1114,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
// make sure we have the directory created
err := ctx.fileSystem.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
if err != nil {
ctx.infof("mkdirall error: %v", err)
ctx.log.Infof("mkdirall error: %v", err)
return "", err
}
@@ -1138,7 +1126,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
defer file.Close()
if _, err := io.Copy(file, tarRdr); err != nil {
ctx.infof("error copying: %v", err)
ctx.log.Infof("error copying: %v", err)
return "", err
}
}

View File

@@ -189,7 +189,7 @@ func TestRestoreNamespaceFiltering(t *testing.T) {
restore: test.restore,
namespaceClient: &fakeNamespaceClient{},
fileSystem: test.fileSystem,
logger: log,
log: log,
prioritizedResources: test.prioritizedResources,
}
@@ -282,7 +282,7 @@ func TestRestorePriority(t *testing.T) {
namespaceClient: &fakeNamespaceClient{},
fileSystem: test.fileSystem,
prioritizedResources: test.prioritizedResources,
logger: log,
log: log,
}
warnings, errors := ctx.restoreFromDir(test.baseDir)
@@ -330,7 +330,7 @@ func TestNamespaceRemapping(t *testing.T) {
prioritizedResources: prioritizedResources,
restore: restore,
backup: &api.Backup{},
logger: arktest.NewLogger(),
log: arktest.NewLogger(),
}
warnings, errors := ctx.restoreFromDir(baseDir)
@@ -432,16 +432,6 @@ func TestRestoreResourceForNamespace(t *testing.T) {
labelSelector: labels.SelectorFromSet(labels.Set(map[string]string{"foo": "not-bar"})),
fileSystem: arktest.NewFakeFileSystem().WithFile("configmaps/cm-1.json", newTestConfigMap().WithLabels(map[string]string{"foo": "bar"}).ToJSON()),
},
{
name: "items with controller owner are skipped",
namespace: "ns-1",
resourcePath: "configmaps",
labelSelector: labels.NewSelector(),
fileSystem: arktest.NewFakeFileSystem().
WithFile("configmaps/cm-1.json", newTestConfigMap().WithControllerOwner().ToJSON()).
WithFile("configmaps/cm-2.json", newNamedTestConfigMap("cm-2").ToJSON()),
expectedObjs: toUnstructured(newNamedTestConfigMap("cm-2").WithArkLabel("my-restore").ConfigMap),
},
{
name: "namespace is remapped",
namespace: "ns-2",
@@ -628,7 +618,7 @@ func TestRestoreResourceForNamespace(t *testing.T) {
},
},
backup: &api.Backup{},
logger: arktest.NewLogger(),
log: arktest.NewLogger(),
pvRestorer: &pvRestorer{},
}
@@ -714,7 +704,7 @@ func TestRestoringExistingServiceAccount(t *testing.T) {
},
},
backup: &api.Backup{},
logger: arktest.NewLogger(),
log: arktest.NewLogger(),
}
warnings, errors := ctx.restoreResource("serviceaccounts", "ns-1", "foo/resources/serviceaccounts/namespaces/ns-1/")
@@ -900,7 +890,7 @@ status:
},
},
backup: backup,
logger: arktest.NewLogger(),
log: arktest.NewLogger(),
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
}
@@ -1404,9 +1394,18 @@ func (obj *testUnstructured) WithStatusField(field string, value interface{}) *t
}
func (obj *testUnstructured) WithAnnotations(fields ...string) *testUnstructured {
annotations := make(map[string]interface{})
vals := map[string]string{}
for _, field := range fields {
annotations[field] = "foo"
vals[field] = "foo"
}
return obj.WithAnnotationValues(vals)
}
func (obj *testUnstructured) WithAnnotationValues(fieldVals map[string]string) *testUnstructured {
annotations := make(map[string]interface{})
for field, val := range fieldVals {
annotations[field] = val
}
obj = obj.WithMetadataField("annotations", annotations)

View File

@@ -17,14 +17,21 @@ limitations under the License.
package restore
import (
"encoding/json"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/util/collections"
)
const annotationLastAppliedConfig = "kubectl.kubernetes.io/last-applied-configuration"
type serviceAction struct {
log logrus.FieldLogger
}
@@ -52,15 +59,55 @@ func (a *serviceAction) Execute(obj runtime.Unstructured, restore *api.Restore)
delete(spec, "clusterIP")
}
ports, err := collections.GetSlice(obj.UnstructuredContent(), "spec.ports")
err = deleteNodePorts(obj, &spec)
if err != nil {
return nil, nil, err
}
return obj, nil, nil
}
func getPreservedPorts(obj runtime.Unstructured) (map[string]bool, error) {
preservedPorts := map[string]bool{}
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, errors.WithStack(err)
}
if lac, ok := metadata.GetAnnotations()[annotationLastAppliedConfig]; ok {
var svc corev1api.Service
if err := json.Unmarshal([]byte(lac), &svc); err != nil {
return nil, errors.WithStack(err)
}
for _, port := range svc.Spec.Ports {
if port.NodePort > 0 {
preservedPorts[port.Name] = true
}
}
}
return preservedPorts, nil
}
func deleteNodePorts(obj runtime.Unstructured, spec *map[string]interface{}) error {
preservedPorts, err := getPreservedPorts(obj)
if err != nil {
return err
}
ports, err := collections.GetSlice(obj.UnstructuredContent(), "spec.ports")
serviceType, _ := collections.GetString(*spec, "type")
if err != nil && serviceType != "ExternalName" {
return err
}
for _, port := range ports {
p := port.(map[string]interface{})
var name string
if nameVal, ok := p["name"]; ok {
name = nameVal.(string)
}
if preservedPorts[name] {
continue
}
delete(p, "nodePort")
}
return obj, nil, nil
return nil
}

View File

@@ -17,15 +17,33 @@ limitations under the License.
package restore
import (
"encoding/json"
"testing"
arktest "github.com/heptio/ark/pkg/util/test"
"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
func svcJSON(ports ...corev1api.ServicePort) string {
svc := corev1api.Service{
Spec: corev1api.ServiceSpec{
Ports: ports,
},
}
data, err := json.Marshal(svc)
if err != nil {
panic(err)
}
return string(data)
}
func TestServiceActionExecute(t *testing.T) {
tests := []struct {
name string
obj runtime.Unstructured
@@ -37,6 +55,11 @@ func TestServiceActionExecute(t *testing.T) {
obj: NewTestUnstructured().WithName("svc-1").Unstructured,
expectedErr: true,
},
{
name: "no spec ports should error",
obj: NewTestUnstructured().WithName("svc-1").WithSpec().Unstructured,
expectedErr: true,
},
{
name: "clusterIP (only) should be deleted from spec",
obj: NewTestUnstructured().WithName("svc-1").WithSpec("clusterIP", "foo").WithSpecField("ports", []interface{}{}).Unstructured,
@@ -63,6 +86,97 @@ func TestServiceActionExecute(t *testing.T) {
map[string]interface{}{"foo": "bar"},
}).Unstructured,
},
{
name: "unnamed nodePort should be deleted when missing in annotation",
obj: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{"nodePort": 8080},
}).Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{},
}).Unstructured,
},
{
name: "unnamed nodePort should be preserved when specified in annotation",
obj: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{
"nodePort": 8080,
},
}).Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{
"nodePort": 8080,
},
}).Unstructured,
},
{
name: "unnamed nodePort should be deleted when named nodePort specified in annotation",
obj: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{Name: "http", NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{
"nodePort": 8080,
},
}).Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{Name: "http", NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{},
}).Unstructured,
},
{
name: "named nodePort should be preserved when specified in annotation",
obj: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{Name: "http", NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{
"name": "http",
"nodePort": 8080,
},
map[string]interface{}{
"name": "admin",
"nodePort": 9090,
},
}).Unstructured,
expectedErr: false,
expectedRes: NewTestUnstructured().WithName("svc-1").
WithAnnotationValues(map[string]string{
annotationLastAppliedConfig: svcJSON(corev1api.ServicePort{Name: "http", NodePort: 8080}),
}).
WithSpecField("ports", []interface{}{
map[string]interface{}{
"name": "http",
"nodePort": 8080,
},
map[string]interface{}{
"name": "admin",
},
}).Unstructured,
},
}
for _, test := range tests {

View File

@@ -125,11 +125,18 @@ func Exists(root map[string]interface{}, path string) bool {
// MergeMaps takes two map[string]string and merges missing keys from the second into the first.
// If a key already exists, its value is not overwritten.
func MergeMaps(first, second map[string]string) {
func MergeMaps(first, second map[string]string) map[string]string {
// If the first map passed in is empty, just use all of the second map's data
if first == nil {
first = map[string]string{}
}
for k, v := range second {
_, ok := first[k]
if !ok {
first[k] = v
}
}
return first
}

View File

@@ -18,6 +18,8 @@ package collections
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetString(t *testing.T) {
@@ -44,3 +46,57 @@ func TestGetString(t *testing.T) {
}
}
}
func TestMergeMaps(t *testing.T) {
var testCases = []struct {
name string
source map[string]string
destination map[string]string
expected map[string]string
}{
{
name: "nil destination should result in source being copied",
destination: nil,
source: map[string]string{
"k1": "v1",
},
expected: map[string]string{
"k1": "v1",
},
},
{
name: "keys missing from destination should be copied from source",
destination: map[string]string{
"k2": "v2",
},
source: map[string]string{
"k1": "v1",
},
expected: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
{
name: "matching key should not have value copied from source",
destination: map[string]string{
"k1": "v1",
},
source: map[string]string{
"k1": "v2",
},
expected: map[string]string{
"k1": "v1",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := MergeMaps(tc.destination, tc.source)
assert.Equal(t, tc.expected, result)
})
}
}

View File

@@ -29,6 +29,7 @@ type FakeDiscoveryHelper struct {
ResourceList []*metav1.APIResourceList
Mapper meta.RESTMapper
AutoReturnResource bool
APIGroupsList []metav1.APIGroup
}
func NewFakeDiscoveryHelper(autoReturnResource bool, resources map[schema.GroupVersionResource]schema.GroupVersionResource) *FakeDiscoveryHelper {
@@ -54,6 +55,14 @@ func NewFakeDiscoveryHelper(autoReturnResource bool, resources map[schema.GroupV
}
apiResourceMap[gvString] = append(apiResourceMap[gvString], metav1.APIResource{Name: gvr.Resource})
helper.APIGroupsList = append(helper.APIGroupsList,
metav1.APIGroup{
Name: gvr.Group,
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: gvString,
Version: gvr.Version,
},
})
}
for group, resources := range apiResourceMap {
@@ -110,3 +119,7 @@ func (dh *FakeDiscoveryHelper) ResourceFor(input schema.GroupVersionResource) (s
return schema.GroupVersionResource{}, metav1.APIResource{}, errors.New("APIResource not found")
}
func (dh *FakeDiscoveryHelper) APIGroups() []metav1.APIGroup {
return dh.APIGroupsList
}