mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-08 14:21:18 +00:00
add restic integration for doing pod volume backups/restores
Signed-off-by: Steve Kriss <steve@heptio.com>
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
||||
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/restic"
|
||||
"github.com/heptio/ark/pkg/util/kube"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -50,6 +51,8 @@ type backupDeletionController struct {
|
||||
restoreLister listers.RestoreLister
|
||||
restoreClient arkv1client.RestoresGetter
|
||||
backupTracker BackupTracker
|
||||
resticMgr restic.RepositoryManager
|
||||
podvolumeBackupLister listers.PodVolumeBackupLister
|
||||
|
||||
processRequestFunc func(*v1.DeleteBackupRequest) error
|
||||
clock clock.Clock
|
||||
@@ -67,6 +70,8 @@ func NewBackupDeletionController(
|
||||
restoreInformer informers.RestoreInformer,
|
||||
restoreClient arkv1client.RestoresGetter,
|
||||
backupTracker BackupTracker,
|
||||
resticMgr restic.RepositoryManager,
|
||||
podvolumeBackupInformer informers.PodVolumeBackupInformer,
|
||||
) Interface {
|
||||
c := &backupDeletionController{
|
||||
genericController: newGenericController("backup-deletion", logger),
|
||||
@@ -79,11 +84,18 @@ func NewBackupDeletionController(
|
||||
restoreLister: restoreInformer.Lister(),
|
||||
restoreClient: restoreClient,
|
||||
backupTracker: backupTracker,
|
||||
clock: &clock.RealClock{},
|
||||
resticMgr: resticMgr,
|
||||
podvolumeBackupLister: podvolumeBackupInformer.Lister(),
|
||||
clock: &clock.RealClock{},
|
||||
}
|
||||
|
||||
c.syncHandler = c.processQueueItem
|
||||
c.cacheSyncWaiters = append(c.cacheSyncWaiters, deleteBackupRequestInformer.Informer().HasSynced, restoreInformer.Informer().HasSynced)
|
||||
c.cacheSyncWaiters = append(
|
||||
c.cacheSyncWaiters,
|
||||
deleteBackupRequestInformer.Informer().HasSynced,
|
||||
restoreInformer.Informer().HasSynced,
|
||||
podvolumeBackupInformer.Informer().HasSynced,
|
||||
)
|
||||
c.processRequestFunc = c.processRequest
|
||||
|
||||
deleteBackupRequestInformer.Informer().AddEventHandler(
|
||||
@@ -225,6 +237,18 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e
|
||||
}
|
||||
}
|
||||
|
||||
// Try to delete restic snapshots
|
||||
log.Info("Removing restic snapshots")
|
||||
if snapshots, err := restic.GetSnapshotsInBackup(backup, c.podvolumeBackupLister); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
} else {
|
||||
for _, snapshot := range snapshots {
|
||||
if err := c.resticMgr.Forget(snapshot); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to delete backup from object storage
|
||||
log.Info("Removing backup from object storage")
|
||||
if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil {
|
||||
|
||||
@@ -64,6 +64,8 @@ func TestBackupDeletionControllerControllerHasUpdateFunc(t *testing.T) {
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(), // restoreClient
|
||||
NewBackupTracker(),
|
||||
nil, // restic repository manager
|
||||
sharedInformers.Ark().V1().PodVolumeBackups(),
|
||||
).(*backupDeletionController)
|
||||
|
||||
// disable resync handler since we don't want to test it here
|
||||
@@ -117,6 +119,8 @@ func TestBackupDeletionControllerProcessQueueItem(t *testing.T) {
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(), // restoreClient
|
||||
NewBackupTracker(),
|
||||
nil, // restic repository manager
|
||||
sharedInformers.Ark().V1().PodVolumeBackups(),
|
||||
).(*backupDeletionController)
|
||||
|
||||
// Error splitting key
|
||||
@@ -198,6 +202,8 @@ func setupBackupDeletionControllerTest(objects ...runtime.Object) *backupDeletio
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(), // restoreClient
|
||||
NewBackupTracker(),
|
||||
nil, // restic repository manager
|
||||
sharedInformers.Ark().V1().PodVolumeBackups(),
|
||||
).(*backupDeletionController),
|
||||
|
||||
req: req,
|
||||
@@ -577,6 +583,8 @@ func TestBackupDeletionControllerDeleteExpiredRequests(t *testing.T) {
|
||||
sharedInformers.Ark().V1().Restores(),
|
||||
client.ArkV1(), // restoreClient
|
||||
NewBackupTracker(),
|
||||
nil,
|
||||
sharedInformers.Ark().V1().PodVolumeBackups(),
|
||||
).(*backupDeletionController)
|
||||
|
||||
fakeClock := &clock.FakeClock{}
|
||||
|
||||
299
pkg/controller/pod_volume_backup_controller.go
Normal file
299
pkg/controller/pod_volume_backup_controller.go
Normal file
@@ -0,0 +1,299 @@
|
||||
/*
|
||||
Copyright 2018 the Heptio Ark contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
||||
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/restic"
|
||||
"github.com/heptio/ark/pkg/util/kube"
|
||||
)
|
||||
|
||||
type podVolumeBackupController struct {
|
||||
*genericController
|
||||
|
||||
podVolumeBackupClient arkv1client.PodVolumeBackupsGetter
|
||||
podVolumeBackupLister listers.PodVolumeBackupLister
|
||||
secretLister corev1listers.SecretLister
|
||||
podLister corev1listers.PodLister
|
||||
pvcLister corev1listers.PersistentVolumeClaimLister
|
||||
nodeName string
|
||||
|
||||
processBackupFunc func(*arkv1api.PodVolumeBackup) error
|
||||
}
|
||||
|
||||
// NewPodVolumeBackupController creates a new pod volume backup controller.
|
||||
func NewPodVolumeBackupController(
|
||||
logger logrus.FieldLogger,
|
||||
podVolumeBackupInformer informers.PodVolumeBackupInformer,
|
||||
podVolumeBackupClient arkv1client.PodVolumeBackupsGetter,
|
||||
podInformer cache.SharedIndexInformer,
|
||||
secretInformer corev1informers.SecretInformer,
|
||||
pvcInformer corev1informers.PersistentVolumeClaimInformer,
|
||||
nodeName string,
|
||||
) Interface {
|
||||
c := &podVolumeBackupController{
|
||||
genericController: newGenericController("pod-volume-backup", logger),
|
||||
podVolumeBackupClient: podVolumeBackupClient,
|
||||
podVolumeBackupLister: podVolumeBackupInformer.Lister(),
|
||||
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
|
||||
secretLister: secretInformer.Lister(),
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
nodeName: nodeName,
|
||||
}
|
||||
|
||||
c.syncHandler = c.processQueueItem
|
||||
c.cacheSyncWaiters = append(
|
||||
c.cacheSyncWaiters,
|
||||
podVolumeBackupInformer.Informer().HasSynced,
|
||||
secretInformer.Informer().HasSynced,
|
||||
podInformer.HasSynced,
|
||||
pvcInformer.Informer().HasSynced,
|
||||
)
|
||||
c.processBackupFunc = c.processBackup
|
||||
|
||||
podVolumeBackupInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.enqueue,
|
||||
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *podVolumeBackupController) processQueueItem(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
log.Debug("Running processItem")
|
||||
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error splitting queue key")
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := c.podVolumeBackupLister.PodVolumeBackups(ns).Get(name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.Debug("Unable to find PodVolumeBackup")
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting PodVolumeBackup")
|
||||
}
|
||||
|
||||
// only process new items
|
||||
switch req.Status.Phase {
|
||||
case "", arkv1api.PodVolumeBackupPhaseNew:
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
// only process items for this node
|
||||
if req.Spec.Node != c.nodeName {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Don't mutate the shared cache
|
||||
reqCopy := req.DeepCopy()
|
||||
return c.processBackupFunc(reqCopy)
|
||||
}
|
||||
|
||||
func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) error {
|
||||
log := c.logger.WithFields(logrus.Fields{
|
||||
"namespace": req.Namespace,
|
||||
"name": req.Name,
|
||||
})
|
||||
|
||||
var err error
|
||||
|
||||
// update status to InProgress
|
||||
req, err = c.patchPodVolumeBackup(req, updatePhaseFunc(arkv1api.PodVolumeBackupPhaseInProgress))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error setting phase to InProgress")
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
pod, err := c.podLister.Pods(req.Spec.Pod.Namespace).Get(req.Spec.Pod.Name)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Error getting pod %s/%s", req.Spec.Pod.Namespace, req.Spec.Pod.Name)
|
||||
return c.fail(req, errors.Wrap(err, "error getting pod").Error(), log)
|
||||
}
|
||||
|
||||
volumeDir, err := kube.GetVolumeDirectory(pod, req.Spec.Volume, c.pvcLister)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting volume directory name")
|
||||
return c.fail(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
|
||||
}
|
||||
|
||||
path, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error uniquely identifying volume path")
|
||||
return c.fail(req, errors.Wrap(err, "error getting volume path on host").Error(), log)
|
||||
}
|
||||
|
||||
// temp creds
|
||||
file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error creating temp restic credentials file")
|
||||
return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)
|
||||
}
|
||||
// ignore error since there's nothing we can do and it's a temp file.
|
||||
defer os.Remove(file)
|
||||
|
||||
resticCmd := restic.BackupCommand(
|
||||
req.Spec.RepoPrefix,
|
||||
req.Spec.Pod.Namespace,
|
||||
file,
|
||||
path,
|
||||
req.Spec.Tags,
|
||||
)
|
||||
|
||||
var stdout, stderr string
|
||||
|
||||
if stdout, stderr, err = runCommand(resticCmd.Cmd()); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("Error running command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
return c.fail(req, fmt.Sprintf("error running restic backup, stderr=%s: %s", stderr, err.Error()), log)
|
||||
}
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
|
||||
snapshotID, err := restic.GetSnapshotID(req.Spec.RepoPrefix, req.Spec.Pod.Namespace, file, req.Spec.Tags)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting SnapshotID")
|
||||
return c.fail(req, errors.Wrap(err, "error getting snapshot id").Error(), log)
|
||||
}
|
||||
|
||||
// update status to Completed with path & snapshot id
|
||||
req, err = c.patchPodVolumeBackup(req, func(r *arkv1api.PodVolumeBackup) {
|
||||
r.Status.Path = path
|
||||
r.Status.SnapshotID = snapshotID
|
||||
r.Status.Phase = arkv1api.PodVolumeBackupPhaseCompleted
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error setting phase to Completed")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runCommand runs a command and returns its stdout, stderr, and its returned
|
||||
// error (if any). If there are errors reading stdout or stderr, their return
|
||||
// value(s) will contain the error as a string.
|
||||
func runCommand(cmd *exec.Cmd) (string, string, error) {
|
||||
stdoutBuf := new(bytes.Buffer)
|
||||
stderrBuf := new(bytes.Buffer)
|
||||
|
||||
cmd.Stdout = stdoutBuf
|
||||
cmd.Stderr = stderrBuf
|
||||
|
||||
runErr := cmd.Run()
|
||||
|
||||
var stdout, stderr string
|
||||
|
||||
if res, readErr := ioutil.ReadAll(stdoutBuf); readErr != nil {
|
||||
stdout = errors.Wrap(readErr, "error reading command's stdout").Error()
|
||||
} else {
|
||||
stdout = string(res)
|
||||
}
|
||||
|
||||
if res, readErr := ioutil.ReadAll(stderrBuf); readErr != nil {
|
||||
stderr = errors.Wrap(readErr, "error reading command's stderr").Error()
|
||||
} else {
|
||||
stderr = string(res)
|
||||
}
|
||||
|
||||
return stdout, stderr, runErr
|
||||
}
|
||||
|
||||
func (c *podVolumeBackupController) patchPodVolumeBackup(req *arkv1api.PodVolumeBackup, mutate func(*arkv1api.PodVolumeBackup)) (*arkv1api.PodVolumeBackup, error) {
|
||||
// Record original json
|
||||
oldData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling original PodVolumeBackup")
|
||||
}
|
||||
|
||||
// Mutate
|
||||
mutate(req)
|
||||
|
||||
// Record new json
|
||||
newData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling updated PodVolumeBackup")
|
||||
}
|
||||
|
||||
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating json merge patch for PodVolumeBackup")
|
||||
}
|
||||
|
||||
req, err = c.podVolumeBackupClient.PodVolumeBackups(req.Namespace).Patch(req.Name, types.MergePatchType, patchBytes)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error patching PodVolumeBackup")
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (c *podVolumeBackupController) fail(req *arkv1api.PodVolumeBackup, msg string, log logrus.FieldLogger) error {
|
||||
if _, err := c.patchPodVolumeBackup(req, func(r *arkv1api.PodVolumeBackup) {
|
||||
r.Status.Phase = arkv1api.PodVolumeBackupPhaseFailed
|
||||
r.Status.Message = msg
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("Error setting phase to Failed")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updatePhaseFunc(phase arkv1api.PodVolumeBackupPhase) func(r *arkv1api.PodVolumeBackup) {
|
||||
return func(r *arkv1api.PodVolumeBackup) {
|
||||
r.Status.Phase = phase
|
||||
}
|
||||
}
|
||||
|
||||
func singlePathMatch(path string) (string, error) {
|
||||
matches, err := filepath.Glob(path)
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
||||
if len(matches) != 1 {
|
||||
return "", errors.Errorf("expected one matching path, got %d", len(matches))
|
||||
}
|
||||
|
||||
return matches[0], nil
|
||||
}
|
||||
356
pkg/controller/pod_volume_restore_controller.go
Normal file
356
pkg/controller/pod_volume_restore_controller.go
Normal file
@@ -0,0 +1,356 @@
|
||||
/*
|
||||
Copyright 2018 the Heptio Ark contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
arkv1api "github.com/heptio/ark/pkg/apis/ark/v1"
|
||||
arkv1client "github.com/heptio/ark/pkg/generated/clientset/versioned/typed/ark/v1"
|
||||
informers "github.com/heptio/ark/pkg/generated/informers/externalversions/ark/v1"
|
||||
listers "github.com/heptio/ark/pkg/generated/listers/ark/v1"
|
||||
"github.com/heptio/ark/pkg/restic"
|
||||
"github.com/heptio/ark/pkg/util/boolptr"
|
||||
"github.com/heptio/ark/pkg/util/kube"
|
||||
)
|
||||
|
||||
type podVolumeRestoreController struct {
|
||||
*genericController
|
||||
|
||||
podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter
|
||||
podVolumeRestoreLister listers.PodVolumeRestoreLister
|
||||
secretLister corev1listers.SecretLister
|
||||
podLister corev1listers.PodLister
|
||||
pvcLister corev1listers.PersistentVolumeClaimLister
|
||||
nodeName string
|
||||
|
||||
processRestoreFunc func(*arkv1api.PodVolumeRestore) error
|
||||
}
|
||||
|
||||
// NewPodVolumeRestoreController creates a new pod volume restore controller.
|
||||
func NewPodVolumeRestoreController(
|
||||
logger logrus.FieldLogger,
|
||||
podVolumeRestoreInformer informers.PodVolumeRestoreInformer,
|
||||
podVolumeRestoreClient arkv1client.PodVolumeRestoresGetter,
|
||||
podInformer cache.SharedIndexInformer,
|
||||
secretInformer corev1informers.SecretInformer,
|
||||
pvcInformer corev1informers.PersistentVolumeClaimInformer,
|
||||
nodeName string,
|
||||
) Interface {
|
||||
c := &podVolumeRestoreController{
|
||||
genericController: newGenericController("pod-volume-restore", logger),
|
||||
podVolumeRestoreClient: podVolumeRestoreClient,
|
||||
podVolumeRestoreLister: podVolumeRestoreInformer.Lister(),
|
||||
podLister: corev1listers.NewPodLister(podInformer.GetIndexer()),
|
||||
secretLister: secretInformer.Lister(),
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
nodeName: nodeName,
|
||||
}
|
||||
|
||||
c.syncHandler = c.processQueueItem
|
||||
c.cacheSyncWaiters = append(
|
||||
c.cacheSyncWaiters,
|
||||
podVolumeRestoreInformer.Informer().HasSynced,
|
||||
secretInformer.Informer().HasSynced,
|
||||
podInformer.HasSynced,
|
||||
pvcInformer.Informer().HasSynced,
|
||||
)
|
||||
c.processRestoreFunc = c.processRestore
|
||||
|
||||
podVolumeRestoreInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.pvrHandler,
|
||||
UpdateFunc: func(_, obj interface{}) {
|
||||
c.pvrHandler(obj)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
podInformer.AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.podHandler,
|
||||
UpdateFunc: func(_, obj interface{}) {
|
||||
c.podHandler(obj)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) pvrHandler(obj interface{}) {
|
||||
pvr := obj.(*arkv1api.PodVolumeRestore)
|
||||
log := c.logger.WithField("key", kube.NamespaceAndName(pvr))
|
||||
|
||||
if !shouldEnqueuePVR(pvr, c.podLister, c.nodeName, log) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("enqueueing")
|
||||
c.enqueue(obj)
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) podHandler(obj interface{}) {
|
||||
pod := obj.(*corev1api.Pod)
|
||||
log := c.logger.WithField("key", kube.NamespaceAndName(pod))
|
||||
|
||||
for _, pvr := range pvrsToEnqueueForPod(pod, c.podVolumeRestoreLister, c.nodeName, log) {
|
||||
c.enqueue(pvr)
|
||||
}
|
||||
}
|
||||
|
||||
func shouldProcessPod(pod *corev1api.Pod, nodeName string, log logrus.FieldLogger) bool {
|
||||
// if the pod lister being used is filtered to pods on this node, this is superfluous,
|
||||
// but retaining for safety.
|
||||
if pod.Spec.NodeName != nodeName {
|
||||
log.Debugf("Pod is scheduled on node %s, not enqueueing.", pod.Spec.NodeName)
|
||||
return false
|
||||
}
|
||||
|
||||
// only process items for pods that have the restic initContainer running
|
||||
if !isPodWaiting(pod) {
|
||||
log.Debugf("Pod is not running restic initContainer, not enqueueing.")
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func shouldProcessPVR(pvr *arkv1api.PodVolumeRestore, log logrus.FieldLogger) bool {
|
||||
// only process new items
|
||||
if pvr.Status.Phase != "" && pvr.Status.Phase != arkv1api.PodVolumeRestorePhaseNew {
|
||||
log.Debugf("Item has phase %s, not enqueueing.", pvr.Status.Phase)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func pvrsToEnqueueForPod(pod *corev1api.Pod, pvrLister listers.PodVolumeRestoreLister, nodeName string, log logrus.FieldLogger) []*arkv1api.PodVolumeRestore {
|
||||
if !shouldProcessPod(pod, nodeName, log) {
|
||||
return nil
|
||||
}
|
||||
|
||||
selector, err := labels.Parse(fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to parse label selector %s", fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
|
||||
return nil
|
||||
}
|
||||
|
||||
pvrs, err := pvrLister.List(selector)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to list pod volume restores")
|
||||
return nil
|
||||
}
|
||||
|
||||
var res []*arkv1api.PodVolumeRestore
|
||||
for i, pvr := range pvrs {
|
||||
if shouldProcessPVR(pvr, log) {
|
||||
res = append(res, pvrs[i])
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func shouldEnqueuePVR(pvr *arkv1api.PodVolumeRestore, podLister corev1listers.PodLister, nodeName string, log logrus.FieldLogger) bool {
|
||||
if !shouldProcessPVR(pvr, log) {
|
||||
return false
|
||||
}
|
||||
|
||||
pod, err := podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Unable to get item's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
if !shouldProcessPod(pod, nodeName, log) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func isPodWaiting(pod *corev1api.Pod) bool {
|
||||
return len(pod.Spec.InitContainers) == 0 ||
|
||||
pod.Spec.InitContainers[0].Name != restic.InitContainer ||
|
||||
len(pod.Status.InitContainerStatuses) == 0 ||
|
||||
pod.Status.InitContainerStatuses[0].State.Running == nil
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) processQueueItem(key string) error {
|
||||
log := c.logger.WithField("key", key)
|
||||
log.Debug("Running processItem")
|
||||
|
||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
log.WithError(errors.WithStack(err)).Error("error splitting queue key")
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := c.podVolumeRestoreLister.PodVolumeRestores(ns).Get(name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
log.Debug("Unable to find PodVolumeRestore")
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting PodVolumeRestore")
|
||||
}
|
||||
|
||||
// Don't mutate the shared cache
|
||||
reqCopy := req.DeepCopy()
|
||||
return c.processRestoreFunc(reqCopy)
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error {
|
||||
log := c.logger.WithFields(logrus.Fields{
|
||||
"namespace": req.Namespace,
|
||||
"name": req.Name,
|
||||
})
|
||||
|
||||
var err error
|
||||
|
||||
// update status to InProgress
|
||||
req, err = c.patchPodVolumeRestore(req, updatePodVolumeRestorePhaseFunc(arkv1api.PodVolumeRestorePhaseInProgress))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error setting phase to InProgress")
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
pod, err := c.podLister.Pods(req.Spec.Pod.Namespace).Get(req.Spec.Pod.Name)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Error getting pod %s/%s", req.Spec.Pod.Namespace, req.Spec.Pod.Name)
|
||||
return c.fail(req, errors.Wrap(err, "error getting pod").Error(), log)
|
||||
}
|
||||
|
||||
volumeDir, err := kube.GetVolumeDirectory(pod, req.Spec.Volume, c.pvcLister)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error getting volume directory name")
|
||||
return c.fail(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
|
||||
}
|
||||
|
||||
// temp creds
|
||||
file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error creating temp restic credentials file")
|
||||
return c.fail(req, errors.Wrap(err, "error creating temp restic credentials file").Error(), log)
|
||||
}
|
||||
// ignore error since there's nothing we can do and it's a temp file.
|
||||
defer os.Remove(file)
|
||||
|
||||
resticCmd := restic.RestoreCommand(
|
||||
req.Spec.RepoPrefix,
|
||||
req.Spec.Pod.Namespace,
|
||||
file,
|
||||
string(req.Spec.Pod.UID),
|
||||
req.Spec.SnapshotID,
|
||||
)
|
||||
|
||||
var stdout, stderr string
|
||||
|
||||
if stdout, stderr, err = runCommand(resticCmd.Cmd()); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("Error running command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
return c.fail(req, fmt.Sprintf("error running restic restore, stderr=%s: %s", stderr, err.Error()), log)
|
||||
}
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
|
||||
var restoreUID types.UID
|
||||
for _, owner := range req.OwnerReferences {
|
||||
if boolptr.IsSetToTrue(owner.Controller) {
|
||||
restoreUID = owner.UID
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
cmd := exec.Command("/bin/sh", "-c", strings.Join([]string{"/complete-restore.sh", string(req.Spec.Pod.UID), volumeDir, string(restoreUID)}, " "))
|
||||
|
||||
if stdout, stderr, err = runCommand(cmd); err != nil {
|
||||
log.WithError(errors.WithStack(err)).Errorf("Error running command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
return c.fail(req, fmt.Sprintf("error running restic restore, stderr=%s: %s", stderr, err.Error()), log)
|
||||
}
|
||||
log.Debugf("Ran command=%s, stdout=%s, stderr=%s", resticCmd.String(), stdout, stderr)
|
||||
|
||||
// update status to Completed
|
||||
if _, err = c.patchPodVolumeRestore(req, updatePodVolumeRestorePhaseFunc(arkv1api.PodVolumeRestorePhaseCompleted)); err != nil {
|
||||
log.WithError(err).Error("Error setting phase to Completed")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) patchPodVolumeRestore(req *arkv1api.PodVolumeRestore, mutate func(*arkv1api.PodVolumeRestore)) (*arkv1api.PodVolumeRestore, error) {
|
||||
// Record original json
|
||||
oldData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling original PodVolumeRestore")
|
||||
}
|
||||
|
||||
// Mutate
|
||||
mutate(req)
|
||||
|
||||
// Record new json
|
||||
newData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error marshalling updated PodVolumeRestore")
|
||||
}
|
||||
|
||||
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating json merge patch for PodVolumeRestore")
|
||||
}
|
||||
|
||||
req, err = c.podVolumeRestoreClient.PodVolumeRestores(req.Namespace).Patch(req.Name, types.MergePatchType, patchBytes)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error patching PodVolumeRestore")
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (c *podVolumeRestoreController) fail(req *arkv1api.PodVolumeRestore, msg string, log logrus.FieldLogger) error {
|
||||
if _, err := c.patchPodVolumeRestore(req, func(pvr *arkv1api.PodVolumeRestore) {
|
||||
pvr.Status.Phase = arkv1api.PodVolumeRestorePhaseFailed
|
||||
pvr.Status.Message = msg
|
||||
}); err != nil {
|
||||
log.WithError(err).Error("Error setting phase to Failed")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updatePodVolumeRestorePhaseFunc(phase arkv1api.PodVolumeRestorePhase) func(r *arkv1api.PodVolumeRestore) {
|
||||
return func(r *arkv1api.PodVolumeRestore) {
|
||||
r.Status.Phase = phase
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user