update restic to support multiple backup storage locations

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2018-09-25 14:20:58 -06:00
parent 257917767f
commit d009163b67
11 changed files with 85 additions and 44 deletions

View File

@@ -40,4 +40,8 @@ const (
// StorageLocationLabel is the label key used to identify the storage
// location of a backup.
StorageLocationLabel = "ark.heptio.com/storage-location"
// ResticVolumeNamespaceLabel is the label key used to identify which
// namespace a restic repository stores pod volume backups for.
ResticVolumeNamespaceLabel = "ark.heptio.com/volume-namespace"
)

View File

@@ -22,12 +22,20 @@ import (
// ResticRepositorySpec is the specification for a ResticRepository.
type ResticRepositorySpec struct {
// MaintenanceFrequency is how often maintenance should be run.
MaintenanceFrequency metav1.Duration `json:"maintenanceFrequency"`
// VolumeNamespace is the namespace this restic repository contains
// pod volume backups for.
VolumeNamespace string `json:"volumeNamespace"`
// BackupStorageLocation is the name of the BackupStorageLocation
// that should contain this repository.
BackupStorageLocation string `json:"backupStorageLocation"`
// ResticIdentifier is the full restic-compatible string for identifying
// this repository.
ResticIdentifier string `json:"resticIdentifier"`
// MaintenanceFrequency is how often maintenance should be run.
MaintenanceFrequency metav1.Duration `json:"maintenanceFrequency"`
}
// ResticRepositoryPhase represents the lifecycle phase of a ResticRepository.

View File

@@ -753,7 +753,7 @@ func (s *server) runControllers(config *api.Config, defaultBackupLocation *api.B
s.logger,
s.sharedInformerFactory.Ark().V1().ResticRepositories(),
s.arkClient.ArkV1(),
defaultBackupLocation,
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
s.resticManager,
)
wg.Add(1)

View File

@@ -32,7 +32,6 @@ import (
"k8s.io/client-go/tools/cache"
"github.com/heptio/ark/pkg/apis/ark/v1"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
@@ -44,7 +43,7 @@ type resticRepositoryController struct {
resticRepositoryClient arkv1client.ResticRepositoriesGetter
resticRepositoryLister listers.ResticRepositoryLister
storageLocation *arkv1api.BackupStorageLocation
backupLocationLister listers.BackupStorageLocationLister
repositoryManager restic.RepositoryManager
clock clock.Clock
@@ -55,20 +54,20 @@ func NewResticRepositoryController(
logger logrus.FieldLogger,
resticRepositoryInformer informers.ResticRepositoryInformer,
resticRepositoryClient arkv1client.ResticRepositoriesGetter,
storageLocation *arkv1api.BackupStorageLocation,
backupLocationInformer informers.BackupStorageLocationInformer,
repositoryManager restic.RepositoryManager,
) Interface {
c := &resticRepositoryController{
genericController: newGenericController("restic-repository", logger),
resticRepositoryClient: resticRepositoryClient,
resticRepositoryLister: resticRepositoryInformer.Lister(),
storageLocation: storageLocation,
backupLocationLister: backupLocationInformer.Lister(),
repositoryManager: repositoryManager,
clock: &clock.RealClock{},
}
c.syncHandler = c.processQueueItem
c.cacheSyncWaiters = append(c.cacheSyncWaiters, resticRepositoryInformer.Informer().HasSynced)
c.cacheSyncWaiters = append(c.cacheSyncWaiters, resticRepositoryInformer.Informer().HasSynced, backupLocationInformer.Informer().HasSynced)
resticRepositoryInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
@@ -137,9 +136,15 @@ func (c *resticRepositoryController) processQueueItem(key string) error {
func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, log logrus.FieldLogger) error {
log.Info("Initializing restic repository")
// confirm the repo's BackupStorageLocation is valid
loc, err := c.backupLocationLister.BackupStorageLocations(req.Namespace).Get(req.Spec.BackupStorageLocation)
if err != nil {
return c.patchResticRepository(req, repoNotReady(err.Error()))
}
// defaulting - if the patch fails, return an error so the item is returned to the queue
if err := c.patchResticRepository(req, func(r *v1.ResticRepository) {
r.Spec.ResticIdentifier = restic.GetRepoIdentifier(c.storageLocation, r.Name)
r.Spec.ResticIdentifier = restic.GetRepoIdentifier(loc, r.Spec.VolumeNamespace)
if r.Spec.MaintenanceFrequency.Duration <= 0 {
r.Spec.MaintenanceFrequency = metav1.Duration{Duration: restic.DefaultMaintenanceFrequency}

View File

@@ -96,15 +96,15 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod
return nil, nil
}
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace)
repo, err := b.repoEnsurer.EnsureRepo(b.ctx, backup.Namespace, pod.Namespace, backup.Spec.StorageLocation)
if err != nil {
return nil, []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual
// backups to be complete before releasing it.
b.repoManager.repoLocker.Lock(pod.Namespace)
defer b.repoManager.repoLocker.Unlock(pod.Namespace)
b.repoManager.repoLocker.Lock(repo.Name)
defer b.repoManager.repoLocker.Unlock(repo.Name)
resultsChan := make(chan *arkv1api.PodVolumeBackup)

View File

@@ -105,11 +105,15 @@ func GetVolumesToBackup(obj metav1.Object) []string {
// SnapshotIdentifier uniquely identifies a restic snapshot
// taken by Ark.
type SnapshotIdentifier struct {
// Repo is the name of the restic repository where the
// snapshot is located
Repo string
// VolumeNamespace is the namespace of the pod/volume that
// the restic snapshot is for.
VolumeNamespace string
// SnapshotID is the short ID of the restic snapshot
// BackupStorageLocation is the backup's storage location
// name.
BackupStorageLocation string
// SnapshotID is the short ID of the restic snapshot.
SnapshotID string
}
@@ -131,8 +135,9 @@ func GetSnapshotsInBackup(backup *arkv1api.Backup, podVolumeBackupLister arkv1li
continue
}
res = append(res, SnapshotIdentifier{
Repo: item.Spec.Pod.Namespace,
SnapshotID: item.Status.SnapshotID,
VolumeNamespace: item.Spec.Pod.Namespace,
BackupStorageLocation: backup.Spec.StorageLocation,
SnapshotID: item.Status.SnapshotID,
})
}

View File

@@ -287,12 +287,12 @@ func TestGetSnapshotsInBackup(t *testing.T) {
},
expected: []SnapshotIdentifier{
{
Repo: "ns-1",
SnapshotID: "snap-3",
VolumeNamespace: "ns-1",
SnapshotID: "snap-3",
},
{
Repo: "ns-1",
SnapshotID: "snap-4",
VolumeNamespace: "ns-1",
SnapshotID: "snap-4",
},
},
},
@@ -319,10 +319,10 @@ func TestGetSnapshotsInBackup(t *testing.T) {
// sort to ensure good compare of slices
less := func(snapshots []SnapshotIdentifier) func(i, j int) bool {
return func(i, j int) bool {
if snapshots[i].Repo == snapshots[j].Repo {
if snapshots[i].VolumeNamespace == snapshots[j].VolumeNamespace {
return snapshots[i].SnapshotID < snapshots[j].SnapshotID
}
return snapshots[i].Repo < snapshots[j].Repo
return snapshots[i].VolumeNamespace < snapshots[j].VolumeNamespace
}
}

View File

@@ -18,13 +18,14 @@ package restic
import (
"context"
"fmt"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
@@ -59,7 +60,8 @@ func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer,
r.readyChansLock.Lock()
defer r.readyChansLock.Unlock()
readyChan, ok := r.readyChans[newObj.Name]
key := repoLabels(newObj.Spec.VolumeNamespace, newObj.Spec.BackupStorageLocation).String()
readyChan, ok := r.readyChans[key]
if !ok {
log.Errorf("No ready channel found for repository %s/%s", newObj.Namespace, newObj.Name)
return
@@ -75,30 +77,47 @@ func newRepositoryEnsurer(repoInformer arkv1informers.ResticRepositoryInformer,
return r
}
func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, name string) (*arkv1api.ResticRepository, error) {
if repo, err := r.repoLister.ResticRepositories(namespace).Get(name); err != nil && !apierrors.IsNotFound(err) {
func repoLabels(volumeNamespace, backupLocation string) labels.Set {
return map[string]string{
arkv1api.ResticVolumeNamespaceLabel: volumeNamespace,
arkv1api.StorageLocationLabel: backupLocation,
}
}
func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*arkv1api.ResticRepository, error) {
selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))
repos, err := r.repoLister.ResticRepositories(namespace).List(selector)
if err != nil {
return nil, errors.WithStack(err)
} else if err == nil {
if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady {
}
if len(repos) > 1 {
return nil, errors.Errorf("more than one ResticRepository found for workload namespace %q, backup storage location %q", volumeNamespace, backupLocation)
}
if len(repos) == 1 {
if repos[0].Status.Phase != arkv1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository is not ready")
}
return repo, nil
return repos[0], nil
}
// if we're here, it means we got an IsNotFound error, meaning we need to create a new
// repo and wait for it to be ready
// if we're here, it means we didn't find a repo, meaning we need to create a new
// one and wait for it to be ready
repo := &arkv1api.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
Namespace: namespace,
GenerateName: fmt.Sprintf("%s-%s-", volumeNamespace, backupLocation),
Labels: repoLabels(volumeNamespace, backupLocation),
},
Spec: arkv1api.ResticRepositorySpec{
MaintenanceFrequency: metav1.Duration{Duration: DefaultMaintenanceFrequency},
VolumeNamespace: volumeNamespace,
BackupStorageLocation: backupLocation,
MaintenanceFrequency: metav1.Duration{Duration: DefaultMaintenanceFrequency},
},
}
readyChan := r.getReadyChan(name)
readyChan := r.getReadyChan(selector.String())
defer close(readyChan)
if _, err := r.repoClient.ResticRepositories(namespace).Create(repo); err != nil {

View File

@@ -188,7 +188,7 @@ func (rm *repositoryManager) Forget(ctx context.Context, snapshot SnapshotIdenti
return errors.New("timed out waiting for cache to sync")
}
repo, err := rm.repoEnsurer.EnsureRepo(ctx, rm.namespace, snapshot.Repo)
repo, err := rm.repoEnsurer.EnsureRepo(ctx, rm.namespace, snapshot.VolumeNamespace, snapshot.BackupStorageLocation)
if err != nil {
return err
}

View File

@@ -34,7 +34,7 @@ import (
// Restorer can execute restic restores of volumes in a pod.
type Restorer interface {
// RestorePodVolumes restores all annotated volumes in a pod.
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace, backupLocation string, log logrus.FieldLogger) []error
}
type restorer struct {
@@ -84,22 +84,22 @@ func newRestorer(
return r
}
func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error {
func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace, backupLocation string, log logrus.FieldLogger) []error {
// get volumes to restore from pod's annotations
volumesToRestore := GetPodSnapshotAnnotations(pod)
if len(volumesToRestore) == 0 {
return nil
}
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, sourceNamespace)
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, sourceNamespace, backupLocation)
if err != nil {
return []error{err}
}
// get a single non-exclusive lock since we'll wait for all individual
// restores to be complete before releasing it.
r.repoManager.repoLocker.Lock(pod.Namespace)
defer r.repoManager.repoLocker.Unlock(pod.Namespace)
r.repoManager.repoLocker.Lock(repo.Name)
defer r.repoManager.repoLocker.Unlock(repo.Name)
resultsChan := make(chan *arkv1api.PodVolumeRestore)

View File

@@ -828,7 +828,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
return []error{err}
}
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, originalNamespace, ctx.log); errs != nil {
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, originalNamespace, ctx.backup.Spec.StorageLocation, ctx.log); errs != nil {
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
return errs
}