Restore hooks exec (#2804)

* Exec hooks in restored pods

Signed-off-by: Andrew Reed <andrew@replicated.com>

* WaitExecHookHandler implements ItemHookHandler

This required adding a context.Context argument to the ItemHookHandler
interface which is unused by the DefaultItemHookHandler implementation.
It also means passing nil for the []ResourceHook argument since that
holds BackupResourceHook.

Signed-off-by: Andrew Reed <andrew@replicated.com>

* WaitExecHookHandler unit tests

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Changelog and go fmt

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Fix double import

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Default to first contaienr in pod

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Use constants for hook error modes in tests

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Revert to separate WaitExecHookHandler interface

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Negative tests for invalid timeout annotations

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Rename NamedExecRestoreHook PodExecRestoreHook

Also make field names more descriptive.

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Cleanup test names

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Separate maxHookWait and add unit tests

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Comment on maxWait <= 0

Also info log container is not running for hooks to execute in.
Also add context error to hooks not executed errors.

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Remove log about default for invalid timeout

There is no default wait or exec timeout.

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Linting

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Fix log message and rename controller to podWatcher

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Comment on exactly-once semantics for handler

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Fix logging and comments

Use filed logger for pod in handler.
Add comment about pod changes in unit tests.
Use kube util NamespaceAndName in messages.

Signed-off-by: Andrew Reed <andrew@replicated.com>

* Fix maxHookWait

Signed-off-by: Andrew Reed <andrew@replicated.com>
This commit is contained in:
Andrew Reed
2020-09-08 11:33:15 -07:00
committed by GitHub
parent a179ae01ca
commit 0547b1d945
9 changed files with 1925 additions and 1 deletions

View File

@@ -34,6 +34,14 @@ func WithName(val string) func(obj metav1.Object) {
}
}
// WithResourceVersion is a functional option that applies the specified
// resourceVersion to an object
func WithResourceVersion(val string) func(obj metav1.Object) {
return func(obj metav1.Object) {
obj.SetResourceVersion(val)
}
}
// WithLabels is a functional option that applies the specified
// label keys/values to an object.
func WithLabels(vals ...string) func(obj metav1.Object) {

View File

@@ -82,3 +82,17 @@ func (b *PodBuilder) InitContainers(containers ...*corev1api.Container) *PodBuil
}
return b
}
func (b *PodBuilder) Containers(containers ...*corev1api.Container) *PodBuilder {
for _, c := range containers {
b.object.Spec.Containers = append(b.object.Spec.Containers, *c)
}
return b
}
func (b *PodBuilder) ContainerStatuses(containerStatuses ...*corev1api.ContainerStatus) *PodBuilder {
for _, c := range containerStatuses {
b.object.Status.ContainerStatuses = append(b.object.Status.ContainerStatuses, *c)
}
return b
}

View File

@@ -720,6 +720,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
s.config.podVolumeOperationTimeout,
s.config.resourceTerminatingTimeout,
s.logger,
podexec.NewPodCommandExecutor(s.kubeClientConfig, s.kubeClient.CoreV1().RESTClient()),
s.kubeClient.CoreV1().RESTClient(),
)
cmd.CheckError(err)

View File

@@ -42,7 +42,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/vmware-tanzu/velero/internal/hook"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/archive"
"github.com/vmware-tanzu/velero/pkg/client"
@@ -51,6 +53,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/kuberesource"
"github.com/vmware-tanzu/velero/pkg/label"
"github.com/vmware-tanzu/velero/pkg/plugin/velero"
"github.com/vmware-tanzu/velero/pkg/podexec"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/collections"
@@ -95,6 +98,8 @@ type kubernetesRestorer struct {
fileSystem filesystem.Interface
pvRenamer func(string) (string, error)
logger logrus.FieldLogger
podCommandExecutor podexec.PodCommandExecutor
podGetter cache.Getter
}
// NewKubernetesRestorer creates a new kubernetesRestorer.
@@ -107,6 +112,8 @@ func NewKubernetesRestorer(
resticTimeout time.Duration,
resourceTerminatingTimeout time.Duration,
logger logrus.FieldLogger,
podCommandExecutor podexec.PodCommandExecutor,
podGetter cache.Getter,
) (Restorer, error) {
return &kubernetesRestorer{
discoveryHelper: discoveryHelper,
@@ -125,7 +132,9 @@ func NewKubernetesRestorer(
veleroCloneName := "velero-clone-" + veleroCloneUuid.String()
return veleroCloneName, nil
},
fileSystem: filesystem.NewFileSystem(),
fileSystem: filesystem.NewFileSystem(),
podCommandExecutor: podCommandExecutor,
podGetter: podGetter,
}, nil
}
@@ -186,6 +195,18 @@ func (kr *kubernetesRestorer) Restore(
}
}
resourceRestoreHooks, err := hook.GetRestoreHooksFromSpec(&req.Restore.Spec.Hooks)
if err != nil {
return Result{}, Result{Velero: []string{err.Error()}}
}
hooksCtx, hooksCancelFunc := go_context.WithCancel(go_context.Background())
waitExecHookHandler := &hook.DefaultWaitExecHookHandler{
PodCommandExecutor: kr.podCommandExecutor,
ListWatchFactory: &hook.DefaultListWatchFactory{
PodsGetter: kr.podGetter,
},
}
pvRestorer := &pvRestorer{
logger: req.Log,
backup: req.Backup,
@@ -222,6 +243,11 @@ func (kr *kubernetesRestorer) Restore(
pvRenamer: kr.pvRenamer,
discoveryHelper: kr.discoveryHelper,
resourcePriorities: kr.resourcePriorities,
resourceRestoreHooks: resourceRestoreHooks,
hooksErrs: make(chan error),
waitExecHookHandler: waitExecHookHandler,
hooksContext: hooksCtx,
hooksCancelFunc: hooksCancelFunc,
}
return restoreCtx.execute()
@@ -297,6 +323,10 @@ type restoreContext struct {
resourcePriorities []string
hooksWaitGroup sync.WaitGroup
hooksErrs chan error
resourceRestoreHooks []hook.ResourceRestoreHook
waitExecHookHandler hook.WaitExecHookHandler
hooksContext go_context.Context
hooksCancelFunc go_context.CancelFunc
}
type resourceClientKey struct {
@@ -465,6 +495,18 @@ func (ctx *restoreContext) execute() (Result, Result) {
}
ctx.log.Info("Done waiting for all restic restores to complete")
// wait for all post-restore exec hooks with same logic as restic wait above
go func() {
ctx.log.Info("Waiting for all post-restore-exec hooks to complete")
ctx.hooksWaitGroup.Wait()
close(ctx.hooksErrs)
}()
for err := range ctx.hooksErrs {
errs.Velero = append(errs.Velero, err.Error())
}
ctx.log.Info("Done waiting for all post-restore exec hooks to complete")
return warnings, errs
}
@@ -1116,6 +1158,10 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
restorePodVolumeBackups(ctx, createdObj, originalNamespace)
}
if groupResource == kuberesource.Pods {
ctx.waitExec(createdObj)
}
// Wait for a CRD to be available for instantiating resources
// before continuing.
if groupResource == kuberesource.CustomResourceDefinitions {
@@ -1205,6 +1251,43 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr
}
}
// waitExec executes hooks in a restored pod's containers when they become ready
func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) {
ctx.hooksWaitGroup.Add(1)
go func() {
// Done() will only be called after all errors have been successfully sent
// on the ctx.resticErrs channel
defer ctx.hooksWaitGroup.Done()
pod := new(v1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
ctx.log.WithError(err).Error("error converting unstructured pod")
ctx.hooksErrs <- err
return
}
execHooksByContainer, err := hook.GroupRestoreExecHooks(
ctx.resourceRestoreHooks,
pod,
ctx.log,
)
if err != nil {
ctx.log.WithError(err).Errorf("error getting exec hooks for pod %s/%s", pod.Namespace, pod.Name)
ctx.hooksErrs <- err
return
}
if errs := ctx.waitExecHookHandler.HandleHooks(ctx.hooksContext, ctx.log, pod, execHooksByContainer); len(errs) > 0 {
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully execute post-restore hooks")
ctx.hooksCancelFunc()
for _, err := range errs {
// Errors are already logged in the HandleHooks method
ctx.hooksErrs <- err
}
}
}()
}
func hasSnapshot(pvName string, snapshots []*volume.Snapshot) bool {
for _, snapshot := range snapshots {
if snapshot.Spec.PersistentVolumeName == pvName {