use repo identifier from ResticRepo CR when backing up/restoring

Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
Steve Kriss
2018-06-14 20:24:01 -07:00
parent 8306566216
commit f2072e5868
12 changed files with 148 additions and 99 deletions

View File

@@ -33,9 +33,8 @@ type PodVolumeBackupSpec struct {
// up.
Volume string `json:"volume"`
// RepoPrefix is the restic repository prefix (i.e. not containing
// the repository name itself).
RepoPrefix string `json:"repoPrefix"`
// RepoIdentifier is the restic repository identifier.
RepoIdentifier string `json:"repoIdentifier"`
// Tags are a map of key-value pairs that should be applied to the
// volume backup as tags.

View File

@@ -29,9 +29,8 @@ type PodVolumeRestoreSpec struct {
// Volume is the name of the volume within the Pod to be restored.
Volume string `json:"volume"`
// RepoPrefix is the restic repository prefix (i.e. not containing
// the repository name itself).
RepoPrefix string `json:"repoPrefix"`
// RepoIdentifier is the restic repository identifier.
RepoIdentifier string `json:"repoIdentifier"`
// SnapshotID is the ID of the volume snapshot to be restored.
SnapshotID string `json:"snapshotID"`

View File

@@ -476,7 +476,7 @@ func (s *server) initRestic(config api.ObjectStorageProviderConfig) error {
res, err := restic.NewRepositoryManager(
s.ctx,
config,
s.namespace,
s.arkClient,
secretsInformer,
s.kubeClient.CoreV1(),

View File

@@ -173,8 +173,7 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
defer os.Remove(file)
resticCmd := restic.BackupCommand(
req.Spec.RepoPrefix,
req.Spec.Pod.Namespace,
req.Spec.RepoIdentifier,
file,
path,
req.Spec.Tags,
@@ -188,7 +187,7 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
}
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
snapshotID, err := restic.GetSnapshotID(req.Spec.RepoPrefix, req.Spec.Pod.Namespace, file, req.Spec.Tags)
snapshotID, err := restic.GetSnapshotID(req.Spec.RepoIdentifier, file, req.Spec.Tags)
if err != nil {
log.WithError(err).Error("Error getting SnapshotID")
return c.fail(req, errors.Wrap(err, "error getting snapshot id").Error(), log)

View File

@@ -300,8 +300,7 @@ func restorePodVolume(req *arkv1api.PodVolumeRestore, credsFile, volumeDir strin
}
resticCmd := restic.RestoreCommand(
req.Spec.RepoPrefix,
req.Spec.Pod.Namespace,
req.Spec.RepoIdentifier,
credsFile,
req.Spec.SnapshotID,
volumePath,

View File

@@ -84,6 +84,31 @@ func resultsKey(ns, name string) string {
return fmt.Sprintf("%s/%s", ns, name)
}
func getRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) {
repo, err := repoLister.ResticRepositories(ns).Get(name)
if apierrors.IsNotFound(err) {
return nil, errors.Wrapf(err, "restic repository not found")
}
if err != nil {
return nil, errors.Wrapf(err, "error getting restic repository")
}
return repo, nil
}
func getReadyRepo(repoLister arkv1listers.ResticRepositoryLister, ns, name string) (*arkv1api.ResticRepository, error) {
repo, err := getRepo(repoLister, ns, name)
if err != nil {
return nil, err
}
if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository not ready")
}
return repo, nil
}
func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod, log logrus.FieldLogger) (map[string]string, []error) {
// get volumes to backup from pod's annotations
volumesToBackup := GetVolumesToBackup(pod)
@@ -91,15 +116,9 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod
return nil, nil
}
repo, err := b.repoLister.ResticRepositories(backup.Namespace).Get(pod.Namespace)
if apierrors.IsNotFound(err) {
return nil, []error{errors.Wrapf(err, "restic repository not found")}
}
repo, err := getReadyRepo(b.repoLister, backup.Namespace, pod.Namespace)
if err != nil {
return nil, []error{errors.Wrapf(err, "error getting restic repository")}
}
if repo.Status.Phase != arkv1api.ResticRepositoryPhaseReady {
return nil, []error{errors.New("restic repository not ready")}
return nil, []error{err}
}
resultsChan := make(chan *arkv1api.PodVolumeBackup)
@@ -117,7 +136,7 @@ func (b *backupper) BackupPodVolumes(backup *arkv1api.Backup, pod *corev1api.Pod
b.repoManager.repoLocker.Lock(pod.Namespace)
defer b.repoManager.repoLocker.Unlock(pod.Namespace)
volumeBackup := newPodVolumeBackup(backup, pod, volumeName, b.repoManager.repoPrefix)
volumeBackup := newPodVolumeBackup(backup, pod, volumeName, repo.Spec.ResticIdentifier)
if err := errorOnly(b.repoManager.arkClient.ArkV1().PodVolumeBackups(volumeBackup.Namespace).Create(volumeBackup)); err != nil {
errs = append(errs, err)
@@ -151,7 +170,7 @@ ForEachVolume:
return volumeSnapshots, errs
}
func newPodVolumeBackup(backup *arkv1api.Backup, pod *corev1api.Pod, volumeName, repoPrefix string) *arkv1api.PodVolumeBackup {
func newPodVolumeBackup(backup *arkv1api.Backup, pod *corev1api.Pod, volumeName, repoIdentifier string) *arkv1api.PodVolumeBackup {
return &arkv1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: backup.Namespace,
@@ -187,7 +206,7 @@ func newPodVolumeBackup(backup *arkv1api.Backup, pod *corev1api.Pod, volumeName,
"ns": pod.Namespace,
"volume": volumeName,
},
RepoPrefix: repoPrefix,
RepoIdentifier: repoIdentifier,
},
}
}

View File

@@ -24,20 +24,27 @@ import (
// Command represents a restic command.
type Command struct {
Command string
RepoPrefix string
Repo string
PasswordFile string
Dir string
Args []string
ExtraFlags []string
Command string
RepoIdentifier string
PasswordFile string
Dir string
Args []string
ExtraFlags []string
}
func (c *Command) RepoName() string {
if c.RepoIdentifier == "" {
return ""
}
return c.RepoIdentifier[strings.LastIndex(c.RepoIdentifier, "/")+1:]
}
// StringSlice returns the command as a slice of strings.
func (c *Command) StringSlice() []string {
res := []string{"restic"}
res = append(res, c.Command, repoFlag(c.RepoPrefix, c.Repo))
res = append(res, c.Command, repoFlag(c.RepoIdentifier))
if c.PasswordFile != "" {
res = append(res, passwordFlag(c.PasswordFile))
}
@@ -61,8 +68,8 @@ func (c *Command) Cmd() *exec.Cmd {
return cmd
}
func repoFlag(prefix, repo string) string {
return fmt.Sprintf("--repo=%s/%s", prefix, repo)
func repoFlag(repoIdentifier string) string {
return fmt.Sprintf("--repo=%s", repoIdentifier)
}
func passwordFlag(file string) string {

View File

@@ -6,15 +6,14 @@ import (
)
// BackupCommand returns a Command for running a restic backup.
func BackupCommand(repoPrefix, repo, passwordFile, path string, tags map[string]string) *Command {
func BackupCommand(repoIdentifier, passwordFile, path string, tags map[string]string) *Command {
return &Command{
Command: "backup",
RepoPrefix: repoPrefix,
Repo: repo,
PasswordFile: passwordFile,
Dir: path,
Args: []string{"."},
ExtraFlags: backupTagFlags(tags),
Command: "backup",
RepoIdentifier: repoIdentifier,
PasswordFile: passwordFile,
Dir: path,
Args: []string{"."},
ExtraFlags: backupTagFlags(tags),
}
}
@@ -27,26 +26,24 @@ func backupTagFlags(tags map[string]string) []string {
}
// RestoreCommand returns a Command for running a restic restore.
func RestoreCommand(repoPrefix, repo, passwordFile, snapshotID, target string) *Command {
func RestoreCommand(repoIdentifier, passwordFile, snapshotID, target string) *Command {
return &Command{
Command: "restore",
RepoPrefix: repoPrefix,
Repo: repo,
PasswordFile: passwordFile,
Dir: target,
Args: []string{snapshotID},
ExtraFlags: []string{"--target=."},
Command: "restore",
RepoIdentifier: repoIdentifier,
PasswordFile: passwordFile,
Dir: target,
Args: []string{snapshotID},
ExtraFlags: []string{"--target=."},
}
}
// GetSnapshotCommand returns a Command for running a restic (get) snapshots.
func GetSnapshotCommand(repoPrefix, repo, passwordFile string, tags map[string]string) *Command {
func GetSnapshotCommand(repoIdentifier, passwordFile string, tags map[string]string) *Command {
return &Command{
Command: "snapshots",
RepoPrefix: repoPrefix,
Repo: repo,
PasswordFile: passwordFile,
ExtraFlags: []string{"--json", "--last", getSnapshotTagFlag(tags)},
Command: "snapshots",
RepoIdentifier: repoIdentifier,
PasswordFile: passwordFile,
ExtraFlags: []string{"--json", "--last", getSnapshotTagFlag(tags)},
}
}
@@ -59,35 +56,31 @@ func getSnapshotTagFlag(tags map[string]string) string {
return fmt.Sprintf("--tag=%s", strings.Join(tagFilters, ","))
}
func InitCommand(repoPrefix, repo string) *Command {
func InitCommand(repoIdentifier string) *Command {
return &Command{
Command: "init",
RepoPrefix: repoPrefix,
Repo: repo,
Command: "init",
RepoIdentifier: repoIdentifier,
}
}
func CheckCommand(repoPrefix, repo string) *Command {
func CheckCommand(repoIdentifier string) *Command {
return &Command{
Command: "check",
RepoPrefix: repoPrefix,
Repo: repo,
Command: "check",
RepoIdentifier: repoIdentifier,
}
}
func PruneCommand(repoPrefix, repo string) *Command {
func PruneCommand(repoIdentifier string) *Command {
return &Command{
Command: "prune",
RepoPrefix: repoPrefix,
Repo: repo,
Command: "prune",
RepoIdentifier: repoIdentifier,
}
}
func ForgetCommand(repoPrefix, repo, snapshotID string) *Command {
func ForgetCommand(repoIdentifier, snapshotID string) *Command {
return &Command{
Command: "forget",
RepoPrefix: repoPrefix,
Repo: repo,
Args: []string{snapshotID},
Command: "forget",
RepoIdentifier: repoIdentifier,
Args: []string{snapshotID},
}
}

View File

@@ -23,6 +23,14 @@ import (
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
)
type BackendType string
const (
AWSBackend BackendType = "aws"
AzureBackend BackendType = "azure"
GCPBackend BackendType = "gcp"
)
// getRepoPrefix returns the prefix of the value of the --repo flag for
// restic commands, i.e. everything except the "/<repo-name>".
func getRepoPrefix(config arkv1api.ObjectStorageProviderConfig) string {

View File

@@ -10,8 +10,8 @@ import (
// GetSnapshotID runs a 'restic snapshots' command to get the ID of the snapshot
// in the specified repo matching the set of provided tags, or an error if a
// unique snapshot cannot be identified.
func GetSnapshotID(repoPrefix, repo, passwordFile string, tags map[string]string) (string, error) {
output, err := GetSnapshotCommand(repoPrefix, repo, passwordFile, tags).Cmd().Output()
func GetSnapshotID(repoIdentifier, passwordFile string, tags map[string]string) (string, error) {
output, err := GetSnapshotCommand(repoIdentifier, passwordFile, tags).Cmd().Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
return "", errors.Wrapf(err, "error running command, stderr=%s", exitErr.Stderr)

View File

@@ -70,16 +70,8 @@ type RestorerFactory interface {
NewRestorer(context.Context, *arkv1api.Restore) (Restorer, error)
}
type BackendType string
const (
AWSBackend BackendType = "aws"
AzureBackend BackendType = "azure"
GCPBackend BackendType = "gcp"
)
type repositoryManager struct {
repoPrefix string
namespace string
arkClient clientset.Interface
secretsLister corev1listers.SecretLister
secretsClient corev1client.SecretsGetter
@@ -92,7 +84,7 @@ type repositoryManager struct {
// NewRepositoryManager constructs a RepositoryManager.
func NewRepositoryManager(
ctx context.Context,
config arkv1api.ObjectStorageProviderConfig,
namespace string,
arkClient clientset.Interface,
secretsInformer cache.SharedIndexInformer,
secretsClient corev1client.SecretsGetter,
@@ -100,7 +92,7 @@ func NewRepositoryManager(
log logrus.FieldLogger,
) (RepositoryManager, error) {
rm := &repositoryManager{
repoPrefix: getRepoPrefix(config),
namespace: namespace,
arkClient: arkClient,
secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()),
secretsClient: secretsClient,
@@ -149,7 +141,7 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api.
},
)
r := newRestorer(ctx, rm, informer)
r := newRestorer(ctx, rm, informer, rm.repoLister)
go informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
@@ -160,41 +152,61 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api.
}
func (rm *repositoryManager) InitRepo(name string) error {
repo, err := getRepo(rm.repoLister, rm.namespace, name)
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name)
return rm.exec(InitCommand(rm.repoPrefix, name))
return rm.exec(InitCommand(repo.Spec.ResticIdentifier))
}
func (rm *repositoryManager) CheckRepo(name string) error {
repo, err := getRepo(rm.repoLister, rm.namespace, name)
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name)
cmd := CheckCommand(rm.repoPrefix, name)
cmd := CheckCommand(repo.Spec.ResticIdentifier)
return rm.exec(cmd)
}
func (rm *repositoryManager) PruneRepo(name string) error {
repo, err := getReadyRepo(rm.repoLister, rm.namespace, name)
if err != nil {
return err
}
rm.repoLocker.LockExclusive(name)
defer rm.repoLocker.UnlockExclusive(name)
cmd := PruneCommand(rm.repoPrefix, name)
cmd := PruneCommand(repo.Spec.ResticIdentifier)
return rm.exec(cmd)
}
func (rm *repositoryManager) Forget(snapshot SnapshotIdentifier) error {
repo, err := getReadyRepo(rm.repoLister, rm.namespace, snapshot.Repo)
if err != nil {
return err
}
rm.repoLocker.LockExclusive(snapshot.Repo)
defer rm.repoLocker.UnlockExclusive(snapshot.Repo)
cmd := ForgetCommand(rm.repoPrefix, snapshot.Repo, snapshot.SnapshotID)
cmd := ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID)
return rm.exec(cmd)
}
func (rm *repositoryManager) exec(cmd *Command) error {
file, err := TempCredentialsFile(rm.secretsLister, cmd.Repo)
file, err := TempCredentialsFile(rm.secretsLister, cmd.RepoName())
if err != nil {
return err
}
@@ -205,7 +217,7 @@ func (rm *repositoryManager) exec(cmd *Command) error {
stdout, stderr, err := arkexec.RunCommand(cmd.Cmd())
rm.log.WithFields(logrus.Fields{
"repository": cmd.Repo,
"repository": cmd.RepoName(),
"command": cmd.String(),
"stdout": stdout,
"stderr": stderr,

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/client-go/tools/cache"
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
"github.com/heptio/ark/pkg/util/boolptr"
)
@@ -40,16 +41,24 @@ type Restorer interface {
type restorer struct {
ctx context.Context
repoManager *repositoryManager
repoLister arkv1listers.ResticRepositoryLister
resultsLock sync.Mutex
results map[string]chan *arkv1api.PodVolumeRestore
}
func newRestorer(ctx context.Context, rm *repositoryManager, podVolumeRestoreInformer cache.SharedIndexInformer) *restorer {
func newRestorer(
ctx context.Context,
rm *repositoryManager,
podVolumeRestoreInformer cache.SharedIndexInformer,
repoLister arkv1listers.ResticRepositoryLister,
) *restorer {
r := &restorer{
ctx: ctx,
repoManager: rm,
results: make(map[string]chan *arkv1api.PodVolumeRestore),
repoLister: repoLister,
results: make(map[string]chan *arkv1api.PodVolumeRestore),
}
podVolumeRestoreInformer.AddEventHandler(
@@ -76,6 +85,11 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P
return nil
}
repo, err := getReadyRepo(r.repoLister, restore.Namespace, pod.Namespace)
if err != nil {
return []error{err}
}
resultsChan := make(chan *arkv1api.PodVolumeRestore)
r.resultsLock.Lock()
@@ -91,7 +105,7 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P
r.repoManager.repoLocker.Lock(pod.Namespace)
defer r.repoManager.repoLocker.Unlock(pod.Namespace)
volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, r.repoManager.repoPrefix)
volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, repo.Spec.ResticIdentifier)
if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil {
errs = append(errs, errors.WithStack(err))
@@ -120,7 +134,7 @@ ForEachVolume:
return errs
}
func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume, snapshot, repoPrefix string) *arkv1api.PodVolumeRestore {
func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume, snapshot, repoIdentifier string) *arkv1api.PodVolumeRestore {
return &arkv1api.PodVolumeRestore{
ObjectMeta: metav1.ObjectMeta{
Namespace: restore.Namespace,
@@ -147,9 +161,9 @@ func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume,
Name: pod.Name,
UID: pod.UID,
},
Volume: volume,
SnapshotID: snapshot,
RepoPrefix: repoPrefix,
Volume: volume,
SnapshotID: snapshot,
RepoIdentifier: repoIdentifier,
},
}
}