mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 04:55:22 +00:00
add azure-specific code to support multi-location restic
Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
@@ -33,6 +33,10 @@ type PodVolumeBackupSpec struct {
|
||||
// up.
|
||||
Volume string `json:"volume"`
|
||||
|
||||
// BackupStorageLocation is the name of the backup storage location
|
||||
// where the restic repository is stored.
|
||||
BackupStorageLocation string `json:"backupStorageLocation"`
|
||||
|
||||
// RepoIdentifier is the restic repository identifier.
|
||||
RepoIdentifier string `json:"repoIdentifier"`
|
||||
|
||||
|
||||
@@ -29,6 +29,10 @@ type PodVolumeRestoreSpec struct {
|
||||
// Volume is the name of the volume within the Pod to be restored.
|
||||
Volume string `json:"volume"`
|
||||
|
||||
// BackupStorageLocation is the name of the backup storage location
|
||||
// where the restic repository is stored.
|
||||
BackupStorageLocation string `json:"backupStorageLocation"`
|
||||
|
||||
// RepoIdentifier is the restic repository identifier.
|
||||
RepoIdentifier string `json:"repoIdentifier"`
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package azure
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/go-autorest/autorest/adal"
|
||||
@@ -32,19 +31,19 @@ const (
|
||||
clientSecretEnvVar = "AZURE_CLIENT_SECRET"
|
||||
)
|
||||
|
||||
// SetResticEnvVars sets the environment variables that restic
|
||||
// GetResticEnvVars gets the environment variables that restic
|
||||
// relies on (AZURE_ACCOUNT_NAME and AZURE_ACCOUNT_KEY) based
|
||||
// on info in the provided object storage location config map.
|
||||
func SetResticEnvVars(config map[string]string) error {
|
||||
os.Setenv("AZURE_ACCOUNT_NAME", config[storageAccountConfigKey])
|
||||
|
||||
func GetResticEnvVars(config map[string]string) (map[string]string, error) {
|
||||
storageAccountKey, err := getStorageAccountKey(config)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
os.Setenv("AZURE_ACCOUNT_KEY", storageAccountKey)
|
||||
|
||||
return nil
|
||||
return map[string]string{
|
||||
"AZURE_ACCOUNT_NAME": config[storageAccountConfigKey],
|
||||
"AZURE_ACCOUNT_KEY": storageAccountKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newServicePrincipalToken(tenantID, clientID, clientSecret, scope string) (*adal.ServicePrincipalToken, error) {
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
|
||||
"github.com/heptio/ark/pkg/buildinfo"
|
||||
"github.com/heptio/ark/pkg/client"
|
||||
"github.com/heptio/ark/pkg/cloudprovider/azure"
|
||||
"github.com/heptio/ark/pkg/cmd"
|
||||
"github.com/heptio/ark/pkg/cmd/util/signals"
|
||||
"github.com/heptio/ark/pkg/controller"
|
||||
@@ -45,12 +44,9 @@ import (
|
||||
)
|
||||
|
||||
func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
var (
|
||||
logLevelFlag = logging.LogLevelFlag(logrus.InfoLevel)
|
||||
location = "default"
|
||||
)
|
||||
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
|
||||
|
||||
var command = &cobra.Command{
|
||||
command := &cobra.Command{
|
||||
Use: "server",
|
||||
Short: "Run the ark restic server",
|
||||
Long: "Run the ark restic server",
|
||||
@@ -61,7 +57,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
logger := logging.DefaultLogger(logLevel)
|
||||
logger.Infof("Starting Ark restic server %s", buildinfo.FormattedGitSHA())
|
||||
|
||||
s, err := newResticServer(logger, fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()), location)
|
||||
s, err := newResticServer(logger, fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
|
||||
cmd.CheckError(err)
|
||||
|
||||
s.run()
|
||||
@@ -69,7 +65,6 @@ func NewServerCommand(f client.Factory) *cobra.Command {
|
||||
}
|
||||
|
||||
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("the level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
|
||||
command.Flags().StringVar(&location, "default-backup-storage-location", location, "name of the default backup storage location")
|
||||
|
||||
return command
|
||||
}
|
||||
@@ -86,7 +81,7 @@ type resticServer struct {
|
||||
cancelFunc context.CancelFunc
|
||||
}
|
||||
|
||||
func newResticServer(logger logrus.FieldLogger, baseName, locationName string) (*resticServer, error) {
|
||||
func newResticServer(logger logrus.FieldLogger, baseName string) (*resticServer, error) {
|
||||
clientConfig, err := client.Config("", "", baseName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -102,17 +97,6 @@ func newResticServer(logger logrus.FieldLogger, baseName, locationName string) (
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
location, err := arkClient.ArkV1().BackupStorageLocations(os.Getenv("HEPTIO_ARK_NAMESPACE")).Get(locationName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if location.Spec.Provider == "azure" {
|
||||
if err := azure.SetResticEnvVars(location.Spec.Config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// use a stand-alone pod informer because we want to use a field selector to
|
||||
// filter to only pods scheduled on this node.
|
||||
podInformer := corev1informers.NewFilteredPodInformer(
|
||||
@@ -170,6 +154,7 @@ func (s *resticServer) run() {
|
||||
s.podInformer,
|
||||
s.secretInformer,
|
||||
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
s.arkInformerFactory.Ark().V1().BackupStorageLocations(),
|
||||
os.Getenv("NODE_NAME"),
|
||||
)
|
||||
wg.Add(1)
|
||||
@@ -185,6 +170,7 @@ func (s *resticServer) run() {
|
||||
s.podInformer,
|
||||
s.secretInformer,
|
||||
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
s.arkInformerFactory.Ark().V1().BackupStorageLocations(),
|
||||
os.Getenv("NODE_NAME"),
|
||||
)
|
||||
wg.Add(1)
|
||||
|
||||
@@ -54,7 +54,6 @@ import (
|
||||
"github.com/heptio/ark/pkg/buildinfo"
|
||||
"github.com/heptio/ark/pkg/client"
|
||||
"github.com/heptio/ark/pkg/cloudprovider"
|
||||
"github.com/heptio/ark/pkg/cloudprovider/azure"
|
||||
"github.com/heptio/ark/pkg/cmd"
|
||||
"github.com/heptio/ark/pkg/cmd/util/signals"
|
||||
"github.com/heptio/ark/pkg/controller"
|
||||
@@ -538,13 +537,6 @@ func (s *server) initRestic(location *api.BackupStorageLocation) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// set the env vars that restic uses for creds purposes
|
||||
if location.Spec.Provider == string(restic.AzureBackend) {
|
||||
if err := azure.SetResticEnvVars(location.Spec.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// use a stand-alone secrets informer so we can filter to only the restic credentials
|
||||
// secret(s) within the heptio-ark namespace
|
||||
//
|
||||
@@ -569,6 +561,7 @@ func (s *server) initRestic(location *api.BackupStorageLocation) error {
|
||||
secretsInformer,
|
||||
s.sharedInformerFactory.Ark().V1().ResticRepositories(),
|
||||
s.arkClient.ArkV1(),
|
||||
s.sharedInformerFactory.Ark().V1().BackupStorageLocations(),
|
||||
s.logger,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/pkg/errors"
|
||||
@@ -50,6 +51,7 @@ type podVolumeBackupController struct {
|
||||
secretLister corev1listers.SecretLister
|
||||
podLister corev1listers.PodLister
|
||||
pvcLister corev1listers.PersistentVolumeClaimLister
|
||||
backupLocationLister listers.BackupStorageLocationLister
|
||||
nodeName string
|
||||
|
||||
processBackupFunc func(*arkv1api.PodVolumeBackup) error
|
||||
@@ -64,6 +66,7 @@ func NewPodVolumeBackupController(
|
||||
podInformer cache.SharedIndexInformer,
|
||||
secretInformer cache.SharedIndexInformer,
|
||||
pvcInformer corev1informers.PersistentVolumeClaimInformer,
|
||||
backupLocationInformer informers.BackupStorageLocationInformer,
|
||||
nodeName string,
|
||||
) Interface {
|
||||
c := &podVolumeBackupController{
|
||||
@@ -73,6 +76,7 @@ func NewPodVolumeBackupController(
|
||||
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
|
||||
secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()),
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
backupLocationLister: backupLocationInformer.Lister(),
|
||||
nodeName: nodeName,
|
||||
|
||||
fileSystem: filesystem.NewFileSystem(),
|
||||
@@ -85,6 +89,7 @@ func NewPodVolumeBackupController(
|
||||
podInformer.HasSynced,
|
||||
secretInformer.HasSynced,
|
||||
pvcInformer.Informer().HasSynced,
|
||||
backupLocationInformer.Informer().HasSynced,
|
||||
)
|
||||
c.processBackupFunc = c.processBackup
|
||||
|
||||
@@ -213,6 +218,15 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
|
||||
req.Spec.Tags,
|
||||
)
|
||||
|
||||
// if this is azure, set resticCmd.Env appropriately
|
||||
var env []string
|
||||
if strings.HasPrefix(req.Spec.RepoIdentifier, "azure") {
|
||||
if env, err = restic.AzureCmdEnv(c.backupLocationLister, req.Namespace, req.Spec.BackupStorageLocation); err != nil {
|
||||
return c.fail(req, errors.Wrap(err, "error setting restic cmd env").Error(), log)
|
||||
}
|
||||
resticCmd.Env = env
|
||||
}
|
||||
|
||||
var stdout, stderr string
|
||||
|
||||
if stdout, stderr, err = arkexec.RunCommand(resticCmd.Cmd()); err != nil {
|
||||
@@ -221,7 +235,7 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
|
||||
}
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
|
||||
snapshotID, err := restic.GetSnapshotID(req.Spec.RepoIdentifier, file, req.Spec.Tags)
|
||||
snapshotID, err := restic.GetSnapshotID(req.Spec.RepoIdentifier, file, req.Spec.Tags, env)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting SnapshotID")
|
||||
return c.fail(req, errors.Wrap(err, "error getting snapshot id").Error(), log)
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/pkg/errors"
|
||||
@@ -54,6 +55,7 @@ type podVolumeRestoreController struct {
|
||||
podLister corev1listers.PodLister
|
||||
secretLister corev1listers.SecretLister
|
||||
pvcLister corev1listers.PersistentVolumeClaimLister
|
||||
backupLocationLister listers.BackupStorageLocationLister
|
||||
nodeName string
|
||||
|
||||
processRestoreFunc func(*arkv1api.PodVolumeRestore) error
|
||||
@@ -68,6 +70,7 @@ func NewPodVolumeRestoreController(
|
||||
podInformer cache.SharedIndexInformer,
|
||||
secretInformer cache.SharedIndexInformer,
|
||||
pvcInformer corev1informers.PersistentVolumeClaimInformer,
|
||||
backupLocationInformer informers.BackupStorageLocationInformer,
|
||||
nodeName string,
|
||||
) Interface {
|
||||
c := &podVolumeRestoreController{
|
||||
@@ -77,6 +80,7 @@ func NewPodVolumeRestoreController(
|
||||
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
|
||||
secretLister: corev1listers.NewSecretLister(secretInformer.GetIndexer()),
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
backupLocationLister: backupLocationInformer.Lister(),
|
||||
nodeName: nodeName,
|
||||
|
||||
fileSystem: filesystem.NewFileSystem(),
|
||||
@@ -89,6 +93,7 @@ func NewPodVolumeRestoreController(
|
||||
podInformer.HasSynced,
|
||||
secretInformer.HasSynced,
|
||||
pvcInformer.Informer().HasSynced,
|
||||
backupLocationInformer.Informer().HasSynced,
|
||||
)
|
||||
c.processRestoreFunc = c.processRestore
|
||||
|
||||
@@ -281,7 +286,7 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto
|
||||
defer os.Remove(credsFile)
|
||||
|
||||
// execute the restore process
|
||||
if err := restorePodVolume(req, credsFile, volumeDir, log); err != nil {
|
||||
if err := c.restorePodVolume(req, credsFile, volumeDir, log); err != nil {
|
||||
log.WithError(err).Error("Error restoring volume")
|
||||
return c.failRestore(req, errors.Wrap(err, "error restoring volume").Error(), log)
|
||||
}
|
||||
@@ -297,7 +302,7 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto
|
||||
return nil
|
||||
}
|
||||
|
||||
func restorePodVolume(req *arkv1api.PodVolumeRestore, credsFile, volumeDir string, log logrus.FieldLogger) error {
|
||||
func (c *podVolumeRestoreController) restorePodVolume(req *arkv1api.PodVolumeRestore, credsFile, volumeDir string, log logrus.FieldLogger) error {
|
||||
// Get the full path of the new volume's directory as mounted in the daemonset pod, which
|
||||
// will look like: /host_pods/<new-pod-uid>/volumes/<volume-plugin-name>/<volume-dir>
|
||||
volumePath, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir))
|
||||
@@ -312,6 +317,15 @@ func restorePodVolume(req *arkv1api.PodVolumeRestore, credsFile, volumeDir strin
|
||||
volumePath,
|
||||
)
|
||||
|
||||
// if this is azure, set resticCmd.Env appropriately
|
||||
if strings.HasPrefix(req.Spec.RepoIdentifier, "azure") {
|
||||
env, err := restic.AzureCmdEnv(c.backupLocationLister, req.Namespace, req.Spec.BackupStorageLocation)
|
||||
if err != nil {
|
||||
return c.failRestore(req, errors.Wrap(err, "error setting restic cmd env").Error(), log)
|
||||
}
|
||||
resticCmd.Env = env
|
||||
}
|
||||
|
||||
var stdout, stderr string
|
||||
|
||||
if stdout, stderr, err = arkexec.RunCommand(resticCmd.Cmd()); err != nil {
|
||||
|
||||
@@ -153,7 +153,7 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil {
|
||||
if err := ensureRepo(req, c.repositoryManager); err != nil {
|
||||
return c.patchResticRepository(req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
@@ -165,12 +165,12 @@ func (c *resticRepositoryController) initializeRepo(req *v1.ResticRepository, lo
|
||||
|
||||
// ensureRepo first checks the repo, and returns if check passes. If it fails,
|
||||
// attempts to init the repo, and returns the result.
|
||||
func ensureRepo(name, identifier string, repoManager restic.RepositoryManager) error {
|
||||
if repoManager.CheckRepo(name, identifier) == nil {
|
||||
func ensureRepo(repo *v1.ResticRepository, repoManager restic.RepositoryManager) error {
|
||||
if repoManager.CheckRepo(repo) == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return repoManager.InitRepo(name, identifier)
|
||||
return repoManager.InitRepo(repo)
|
||||
}
|
||||
|
||||
func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepository, log logrus.FieldLogger) error {
|
||||
@@ -186,14 +186,14 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor
|
||||
log.Info("Running maintenance on restic repository")
|
||||
|
||||
log.Debug("Checking repo before prune")
|
||||
if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
|
||||
if err := c.repositoryManager.CheckRepo(req); err != nil {
|
||||
return c.patchResticRepository(req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
// prune failures should be displayed in the `.status.message` field but
|
||||
// should not cause the repo to move to `NotReady`.
|
||||
log.Debug("Pruning repo")
|
||||
if err := c.repositoryManager.PruneRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
|
||||
if err := c.repositoryManager.PruneRepo(req); err != nil {
|
||||
log.WithError(err).Warn("error pruning repository")
|
||||
if patchErr := c.patchResticRepository(req, func(r *v1.ResticRepository) {
|
||||
r.Status.Message = err.Error()
|
||||
@@ -203,7 +203,7 @@ func (c *resticRepositoryController) runMaintenanceIfDue(req *v1.ResticRepositor
|
||||
}
|
||||
|
||||
log.Debug("Checking repo after prune")
|
||||
if err := c.repositoryManager.CheckRepo(req.Name, req.Spec.ResticIdentifier); err != nil {
|
||||
if err := c.repositoryManager.CheckRepo(req); err != nil {
|
||||
return c.patchResticRepository(req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
@@ -221,7 +221,7 @@ func (c *resticRepositoryController) checkNotReadyRepo(req *v1.ResticRepository,
|
||||
|
||||
// we need to ensure it (first check, if check fails, attempt to init)
|
||||
// because we don't know if it's been successfully initialized yet.
|
||||
if err := ensureRepo(req.Name, req.Spec.ResticIdentifier, c.repositoryManager); err != nil {
|
||||
if err := ensureRepo(req, c.repositoryManager); err != nil {
|
||||
return c.patchResticRepository(req, repoNotReady(err.Error()))
|
||||
}
|
||||
|
||||
|
||||
@@ -220,7 +220,8 @@ func newPodVolumeBackup(backup *arkv1api.Backup, pod *corev1api.Pod, volumeName,
|
||||
"ns": pod.Namespace,
|
||||
"volume": volumeName,
|
||||
},
|
||||
RepoIdentifier: repoIdentifier,
|
||||
BackupStorageLocation: backup.Spec.StorageLocation,
|
||||
RepoIdentifier: repoIdentifier,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ type Command struct {
|
||||
Dir string
|
||||
Args []string
|
||||
ExtraFlags []string
|
||||
Env []string
|
||||
}
|
||||
|
||||
func (c *Command) RepoName() string {
|
||||
@@ -75,6 +76,10 @@ func (c *Command) Cmd() *exec.Cmd {
|
||||
cmd := exec.Command(parts[0], parts[1:]...)
|
||||
cmd.Dir = c.Dir
|
||||
|
||||
if len(c.Env) > 0 {
|
||||
cmd.Env = c.Env
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ package restic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -28,6 +29,7 @@ import (
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
"github.com/heptio/ark/pkg/cloudprovider/azure"
|
||||
arkv1listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/util/filesystem"
|
||||
)
|
||||
@@ -196,3 +198,26 @@ func NewPodVolumeRestoreListOptions(name, uid string) metav1.ListOptions {
|
||||
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", arkv1api.RestoreNameLabel, name, arkv1api.RestoreUIDLabel, uid),
|
||||
}
|
||||
}
|
||||
|
||||
// AzureCmdEnv returns a list of environment variables (in the format var=val) that
|
||||
// should be used when running a restic command for an Azure backend. This list is
|
||||
// the current environment, plus the Azure-specific variables restic needs, namely
|
||||
// a storage account name and key.
|
||||
func AzureCmdEnv(backupLocationLister arkv1listers.BackupStorageLocationLister, namespace, backupLocation string) ([]string, error) {
|
||||
loc, err := backupLocationLister.BackupStorageLocations(namespace).Get(backupLocation)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting backup storage location")
|
||||
}
|
||||
|
||||
azureVars, err := azure.GetResticEnvVars(loc.Spec.Config)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error getting azure restic env vars")
|
||||
}
|
||||
|
||||
env := os.Environ()
|
||||
for k, v := range azureVars {
|
||||
env = append(env, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
return env, nil
|
||||
}
|
||||
|
||||
@@ -27,8 +27,13 @@ import (
|
||||
// GetSnapshotID runs a 'restic snapshots' command to get the ID of the snapshot
|
||||
// in the specified repo matching the set of provided tags, or an error if a
|
||||
// unique snapshot cannot be identified.
|
||||
func GetSnapshotID(repoIdentifier, passwordFile string, tags map[string]string) (string, error) {
|
||||
stdout, stderr, err := exec.RunCommand(GetSnapshotCommand(repoIdentifier, passwordFile, tags).Cmd())
|
||||
func GetSnapshotID(repoIdentifier, passwordFile string, tags map[string]string, env []string) (string, error) {
|
||||
cmd := GetSnapshotCommand(repoIdentifier, passwordFile, tags)
|
||||
if len(env) > 0 {
|
||||
cmd.Env = env
|
||||
}
|
||||
|
||||
stdout, stderr, err := exec.RunCommand(cmd.Cmd())
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "error running command, stderr=%s", stderr)
|
||||
}
|
||||
|
||||
@@ -101,8 +101,7 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
|
||||
return repos[0], nil
|
||||
}
|
||||
|
||||
// if we're here, it means we didn't find a repo, meaning we need to create a new
|
||||
// one and wait for it to be ready
|
||||
// no repo found: create one and wait for it to be ready
|
||||
|
||||
repo := &arkv1api.ResticRepository{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -40,13 +41,13 @@ import (
|
||||
// RepositoryManager executes commands against restic repositories.
|
||||
type RepositoryManager interface {
|
||||
// InitRepo initializes a repo with the specified name and identifier.
|
||||
InitRepo(name, identifier string) error
|
||||
InitRepo(repo *arkv1api.ResticRepository) error
|
||||
|
||||
// CheckRepo checks the specified repo for errors.
|
||||
CheckRepo(name, identifier string) error
|
||||
CheckRepo(repo *arkv1api.ResticRepository) error
|
||||
|
||||
// PruneRepo deletes unused data from a repo.
|
||||
PruneRepo(name, identifier string) error
|
||||
PruneRepo(repo *arkv1api.ResticRepository) error
|
||||
|
||||
// Forget removes a snapshot from the list of
|
||||
// available snapshots in a repo.
|
||||
@@ -72,15 +73,18 @@ type RestorerFactory interface {
|
||||
}
|
||||
|
||||
type repositoryManager struct {
|
||||
namespace string
|
||||
arkClient clientset.Interface
|
||||
secretsLister corev1listers.SecretLister
|
||||
repoLister arkv1listers.ResticRepositoryLister
|
||||
repoInformerSynced cache.InformerSynced
|
||||
log logrus.FieldLogger
|
||||
repoLocker *repoLocker
|
||||
repoEnsurer *repositoryEnsurer
|
||||
fileSystem filesystem.Interface
|
||||
namespace string
|
||||
arkClient clientset.Interface
|
||||
secretsLister corev1listers.SecretLister
|
||||
repoLister arkv1listers.ResticRepositoryLister
|
||||
repoInformerSynced cache.InformerSynced
|
||||
backupLocationLister arkv1listers.BackupStorageLocationLister
|
||||
backupLocationInformerSynced cache.InformerSynced
|
||||
log logrus.FieldLogger
|
||||
repoLocker *repoLocker
|
||||
repoEnsurer *repositoryEnsurer
|
||||
fileSystem filesystem.Interface
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewRepositoryManager constructs a RepositoryManager.
|
||||
@@ -91,15 +95,19 @@ func NewRepositoryManager(
|
||||
secretsInformer cache.SharedIndexInformer,
|
||||
repoInformer arkv1informers.ResticRepositoryInformer,
|
||||
repoClient arkv1client.ResticRepositoriesGetter,
|
||||
backupLocationInformer arkv1informers.BackupStorageLocationInformer,
|
||||
log logrus.FieldLogger,
|
||||
) (RepositoryManager, error) {
|
||||
rm := &repositoryManager{
|
||||
namespace: namespace,
|
||||
arkClient: arkClient,
|
||||
secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()),
|
||||
repoLister: repoInformer.Lister(),
|
||||
repoInformerSynced: repoInformer.Informer().HasSynced,
|
||||
log: log,
|
||||
namespace: namespace,
|
||||
arkClient: arkClient,
|
||||
secretsLister: corev1listers.NewSecretLister(secretsInformer.GetIndexer()),
|
||||
repoLister: repoInformer.Lister(),
|
||||
repoInformerSynced: repoInformer.Informer().HasSynced,
|
||||
backupLocationLister: backupLocationInformer.Lister(),
|
||||
backupLocationInformerSynced: backupLocationInformer.Informer().HasSynced,
|
||||
log: log,
|
||||
ctx: ctx,
|
||||
|
||||
repoLocker: newRepoLocker(),
|
||||
repoEnsurer: newRepositoryEnsurer(repoInformer, repoClient, log),
|
||||
@@ -155,28 +163,28 @@ func (rm *repositoryManager) NewRestorer(ctx context.Context, restore *arkv1api.
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (rm *repositoryManager) InitRepo(name, identifier string) error {
|
||||
func (rm *repositoryManager) InitRepo(repo *arkv1api.ResticRepository) error {
|
||||
// restic init requires an exclusive lock
|
||||
rm.repoLocker.LockExclusive(name)
|
||||
defer rm.repoLocker.UnlockExclusive(name)
|
||||
rm.repoLocker.LockExclusive(repo.Name)
|
||||
defer rm.repoLocker.UnlockExclusive(repo.Name)
|
||||
|
||||
return rm.exec(InitCommand(identifier))
|
||||
return rm.exec(InitCommand(repo.Spec.ResticIdentifier), repo.Spec.BackupStorageLocation)
|
||||
}
|
||||
|
||||
func (rm *repositoryManager) CheckRepo(name, identifier string) error {
|
||||
func (rm *repositoryManager) CheckRepo(repo *arkv1api.ResticRepository) error {
|
||||
// restic check requires an exclusive lock
|
||||
rm.repoLocker.LockExclusive(name)
|
||||
defer rm.repoLocker.UnlockExclusive(name)
|
||||
rm.repoLocker.LockExclusive(repo.Name)
|
||||
defer rm.repoLocker.UnlockExclusive(repo.Name)
|
||||
|
||||
return rm.exec(CheckCommand(identifier))
|
||||
return rm.exec(CheckCommand(repo.Spec.ResticIdentifier), repo.Spec.BackupStorageLocation)
|
||||
}
|
||||
|
||||
func (rm *repositoryManager) PruneRepo(name, identifier string) error {
|
||||
func (rm *repositoryManager) PruneRepo(repo *arkv1api.ResticRepository) error {
|
||||
// restic prune requires an exclusive lock
|
||||
rm.repoLocker.LockExclusive(name)
|
||||
defer rm.repoLocker.UnlockExclusive(name)
|
||||
rm.repoLocker.LockExclusive(repo.Name)
|
||||
defer rm.repoLocker.UnlockExclusive(repo.Name)
|
||||
|
||||
return rm.exec(PruneCommand(identifier))
|
||||
return rm.exec(PruneCommand(repo.Spec.ResticIdentifier), repo.Spec.BackupStorageLocation)
|
||||
}
|
||||
|
||||
func (rm *repositoryManager) Forget(ctx context.Context, snapshot SnapshotIdentifier) error {
|
||||
@@ -197,10 +205,10 @@ func (rm *repositoryManager) Forget(ctx context.Context, snapshot SnapshotIdenti
|
||||
rm.repoLocker.LockExclusive(repo.Name)
|
||||
defer rm.repoLocker.UnlockExclusive(repo.Name)
|
||||
|
||||
return rm.exec(ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID))
|
||||
return rm.exec(ForgetCommand(repo.Spec.ResticIdentifier, snapshot.SnapshotID), repo.Spec.BackupStorageLocation)
|
||||
}
|
||||
|
||||
func (rm *repositoryManager) exec(cmd *Command) error {
|
||||
func (rm *repositoryManager) exec(cmd *Command, backupLocation string) error {
|
||||
file, err := TempCredentialsFile(rm.secretsLister, rm.namespace, cmd.RepoName(), rm.fileSystem)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -210,6 +218,18 @@ func (rm *repositoryManager) exec(cmd *Command) error {
|
||||
|
||||
cmd.PasswordFile = file
|
||||
|
||||
if strings.HasPrefix(cmd.RepoIdentifier, "azure") {
|
||||
if !cache.WaitForCacheSync(rm.ctx.Done(), rm.backupLocationInformerSynced) {
|
||||
return errors.New("timed out waiting for cache to sync")
|
||||
}
|
||||
|
||||
env, err := AzureCmdEnv(rm.backupLocationLister, rm.namespace, backupLocation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.Env = env
|
||||
}
|
||||
|
||||
stdout, stderr, err := arkexec.RunCommand(cmd.Cmd())
|
||||
rm.log.WithFields(logrus.Fields{
|
||||
"repository": cmd.RepoName(),
|
||||
|
||||
@@ -113,7 +113,7 @@ func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.P
|
||||
)
|
||||
|
||||
for volume, snapshot := range volumesToRestore {
|
||||
volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, repo.Spec.ResticIdentifier)
|
||||
volumeRestore := newPodVolumeRestore(restore, pod, volume, snapshot, backupLocation, repo.Spec.ResticIdentifier)
|
||||
|
||||
if err := errorOnly(r.repoManager.arkClient.ArkV1().PodVolumeRestores(volumeRestore.Namespace).Create(volumeRestore)); err != nil {
|
||||
errs = append(errs, errors.WithStack(err))
|
||||
@@ -142,7 +142,7 @@ ForEachVolume:
|
||||
return errs
|
||||
}
|
||||
|
||||
func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume, snapshot, repoIdentifier string) *arkv1api.PodVolumeRestore {
|
||||
func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume, snapshot, backupLocation, repoIdentifier string) *arkv1api.PodVolumeRestore {
|
||||
return &arkv1api.PodVolumeRestore{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: restore.Namespace,
|
||||
@@ -169,9 +169,10 @@ func newPodVolumeRestore(restore *arkv1api.Restore, pod *corev1api.Pod, volume,
|
||||
Name: pod.Name,
|
||||
UID: pod.UID,
|
||||
},
|
||||
Volume: volume,
|
||||
SnapshotID: snapshot,
|
||||
RepoIdentifier: repoIdentifier,
|
||||
Volume: volume,
|
||||
SnapshotID: snapshot,
|
||||
BackupStorageLocation: backupLocation,
|
||||
RepoIdentifier: repoIdentifier,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user