mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 04:55:22 +00:00
Merge pull request #7619 from allenxu404/post_restore_hook_enhancement
Wait for results of restore exec hook executions in Finalizing phase instead of InProgress phase
This commit is contained in:
1
changelogs/unreleased/7619-allenxu404
Normal file
1
changelogs/unreleased/7619-allenxu404
Normal file
@@ -0,0 +1 @@
|
||||
Wait for results of restore exec hook executions in Finalizing phase instead of InProgress phase
|
||||
@@ -26,8 +26,8 @@ const (
|
||||
HookSourceSpec = "spec"
|
||||
)
|
||||
|
||||
// hookTrackerKey identifies a backup/restore hook
|
||||
type hookTrackerKey struct {
|
||||
// hookKey identifies a backup/restore hook
|
||||
type hookKey struct {
|
||||
// PodNamespace indicates the namespace of pod where hooks are executed.
|
||||
// For hooks specified in the backup/restore spec, this field is the namespace of an applicable pod.
|
||||
// For hooks specified in pod annotation, this field is the namespace of pod where hooks are annotated.
|
||||
@@ -48,37 +48,46 @@ type hookTrackerKey struct {
|
||||
container string
|
||||
}
|
||||
|
||||
// hookTrackerVal records the execution status of a specific hook.
|
||||
// hookTrackerVal is extensible to accommodate additional fields as needs develop.
|
||||
type hookTrackerVal struct {
|
||||
// hookStatus records the execution status of a specific hook.
|
||||
// hookStatus is extensible to accommodate additional fields as needs develop.
|
||||
type hookStatus struct {
|
||||
// HookFailed indicates if hook failed to execute.
|
||||
hookFailed bool
|
||||
// hookExecuted indicates if hook already execute.
|
||||
hookExecuted bool
|
||||
}
|
||||
|
||||
// HookTracker tracks all hooks' execution status
|
||||
// HookTracker tracks all hooks' execution status in a single backup/restore.
|
||||
type HookTracker struct {
|
||||
lock *sync.RWMutex
|
||||
tracker map[hookTrackerKey]hookTrackerVal
|
||||
lock *sync.RWMutex
|
||||
// tracker records all hook info for a single backup/restore.
|
||||
tracker map[hookKey]hookStatus
|
||||
// hookAttemptedCnt indicates the number of attempted hooks.
|
||||
hookAttemptedCnt int
|
||||
// hookFailedCnt indicates the number of failed hooks.
|
||||
hookFailedCnt int
|
||||
// HookExecutedCnt indicates the number of executed hooks.
|
||||
hookExecutedCnt int
|
||||
// hookErrs records hook execution errors if any.
|
||||
hookErrs []HookErrInfo
|
||||
}
|
||||
|
||||
// NewHookTracker creates a hookTracker.
|
||||
// NewHookTracker creates a hookTracker instance.
|
||||
func NewHookTracker() *HookTracker {
|
||||
return &HookTracker{
|
||||
lock: &sync.RWMutex{},
|
||||
tracker: make(map[hookTrackerKey]hookTrackerVal),
|
||||
tracker: make(map[hookKey]hookStatus),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a hook to the tracker
|
||||
// Add adds a hook to the hook tracker
|
||||
// Add must precede the Record for each individual hook.
|
||||
// In other words, a hook must be added to the tracker before its execution result is recorded.
|
||||
func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName string, hookPhase hookPhase) {
|
||||
ht.lock.Lock()
|
||||
defer ht.lock.Unlock()
|
||||
|
||||
key := hookTrackerKey{
|
||||
key := hookKey{
|
||||
podNamespace: podNamespace,
|
||||
podName: podName,
|
||||
hookSource: source,
|
||||
@@ -88,21 +97,22 @@ func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName st
|
||||
}
|
||||
|
||||
if _, ok := ht.tracker[key]; !ok {
|
||||
ht.tracker[key] = hookTrackerVal{
|
||||
ht.tracker[key] = hookStatus{
|
||||
hookFailed: false,
|
||||
hookExecuted: false,
|
||||
}
|
||||
ht.hookAttemptedCnt++
|
||||
}
|
||||
}
|
||||
|
||||
// Record records the hook's execution status
|
||||
// Add must precede the Record for each individual hook.
|
||||
// In other words, a hook must be added to the tracker before its execution result is recorded.
|
||||
func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool) error {
|
||||
func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error {
|
||||
ht.lock.Lock()
|
||||
defer ht.lock.Unlock()
|
||||
|
||||
key := hookTrackerKey{
|
||||
key := hookKey{
|
||||
podNamespace: podNamespace,
|
||||
podName: podName,
|
||||
hookSource: source,
|
||||
@@ -111,38 +121,125 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName
|
||||
hookName: hookName,
|
||||
}
|
||||
|
||||
var err error
|
||||
if _, ok := ht.tracker[key]; ok {
|
||||
ht.tracker[key] = hookTrackerVal{
|
||||
if _, ok := ht.tracker[key]; !ok {
|
||||
return fmt.Errorf("hook not exist in hook tracker, hook: %+v", key)
|
||||
}
|
||||
|
||||
if !ht.tracker[key].hookExecuted {
|
||||
ht.tracker[key] = hookStatus{
|
||||
hookFailed: hookFailed,
|
||||
hookExecuted: true,
|
||||
}
|
||||
ht.hookExecutedCnt++
|
||||
if hookFailed {
|
||||
ht.hookFailedCnt++
|
||||
ht.hookErrs = append(ht.hookErrs, HookErrInfo{Namespace: key.podNamespace, Err: hookErr})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat returns the number of attempted hooks and failed hooks
|
||||
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
|
||||
return ht.hookAttemptedCnt, ht.hookFailedCnt
|
||||
}
|
||||
|
||||
// IsComplete returns whether the execution of all hooks has finished or not
|
||||
func (ht *HookTracker) IsComplete() bool {
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
|
||||
return ht.hookAttemptedCnt == ht.hookExecutedCnt
|
||||
}
|
||||
|
||||
// HooksErr returns hook execution errors
|
||||
func (ht *HookTracker) HookErrs() []HookErrInfo {
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
|
||||
return ht.hookErrs
|
||||
}
|
||||
|
||||
// MultiHookTrackers tracks all hooks' execution status for multiple backups/restores.
|
||||
type MultiHookTracker struct {
|
||||
lock *sync.RWMutex
|
||||
// trackers is a map that uses the backup/restore name as the key and stores a HookTracker as value.
|
||||
trackers map[string]*HookTracker
|
||||
}
|
||||
|
||||
// NewMultiHookTracker creates a multiHookTracker instance.
|
||||
func NewMultiHookTracker() *MultiHookTracker {
|
||||
return &MultiHookTracker{
|
||||
lock: &sync.RWMutex{},
|
||||
trackers: make(map[string]*HookTracker),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a backup/restore hook to the tracker
|
||||
func (mht *MultiHookTracker) Add(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase) {
|
||||
mht.lock.Lock()
|
||||
defer mht.lock.Unlock()
|
||||
|
||||
if _, ok := mht.trackers[name]; !ok {
|
||||
mht.trackers[name] = NewHookTracker()
|
||||
}
|
||||
mht.trackers[name].Add(podNamespace, podName, container, source, hookName, hookPhase)
|
||||
}
|
||||
|
||||
// Record records a backup/restore hook execution status
|
||||
func (mht *MultiHookTracker) Record(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error {
|
||||
mht.lock.RLock()
|
||||
defer mht.lock.RUnlock()
|
||||
|
||||
var err error
|
||||
if _, ok := mht.trackers[name]; ok {
|
||||
err = mht.trackers[name].Record(podNamespace, podName, container, source, hookName, hookPhase, hookFailed, hookErr)
|
||||
} else {
|
||||
err = fmt.Errorf("hook not exist in hooks tracker, hook key: %v", key)
|
||||
err = fmt.Errorf("the backup/restore not exist in hook tracker, backup/restore name: %s", name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Stat calculates the number of attempted hooks and failed hooks
|
||||
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailed int) {
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
// Stat returns the number of attempted hooks and failed hooks for a particular backup/restore
|
||||
func (mht *MultiHookTracker) Stat(name string) (hookAttemptedCnt int, hookFailedCnt int) {
|
||||
mht.lock.RLock()
|
||||
defer mht.lock.RUnlock()
|
||||
|
||||
for _, hookInfo := range ht.tracker {
|
||||
if hookInfo.hookExecuted {
|
||||
hookAttemptedCnt++
|
||||
if hookInfo.hookFailed {
|
||||
hookFailed++
|
||||
}
|
||||
}
|
||||
if _, ok := mht.trackers[name]; ok {
|
||||
return mht.trackers[name].Stat()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetTracker gets the tracker inside HookTracker
|
||||
func (ht *HookTracker) GetTracker() map[hookTrackerKey]hookTrackerVal {
|
||||
ht.lock.RLock()
|
||||
defer ht.lock.RUnlock()
|
||||
// Delete removes the hook data for a particular backup/restore
|
||||
func (mht *MultiHookTracker) Delete(name string) {
|
||||
mht.lock.Lock()
|
||||
defer mht.lock.Unlock()
|
||||
|
||||
return ht.tracker
|
||||
delete(mht.trackers, name)
|
||||
}
|
||||
|
||||
// IsComplete returns whether the execution of all hooks for a particular backup/restore has finished or not
|
||||
func (mht *MultiHookTracker) IsComplete(name string) bool {
|
||||
mht.lock.RLock()
|
||||
defer mht.lock.RUnlock()
|
||||
|
||||
if _, ok := mht.trackers[name]; ok {
|
||||
return mht.trackers[name].IsComplete()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// HooksErr returns hook execution errors for a particular backup/restore
|
||||
func (mht *MultiHookTracker) HookErrs(name string) []HookErrInfo {
|
||||
mht.lock.RLock()
|
||||
defer mht.lock.RUnlock()
|
||||
|
||||
if _, ok := mht.trackers[name]; ok {
|
||||
return mht.trackers[name].HookErrs()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package hook
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -32,13 +33,13 @@ func TestNewHookTracker(t *testing.T) {
|
||||
func TestHookTracker_Add(t *testing.T) {
|
||||
tracker := NewHookTracker()
|
||||
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre)
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
|
||||
key := hookTrackerKey{
|
||||
key := hookKey{
|
||||
podNamespace: "ns1",
|
||||
podName: "pod1",
|
||||
container: "container1",
|
||||
hookPhase: PhasePre,
|
||||
hookPhase: "",
|
||||
hookSource: HookSourceAnnotation,
|
||||
hookName: "h1",
|
||||
}
|
||||
@@ -49,44 +50,148 @@ func TestHookTracker_Add(t *testing.T) {
|
||||
|
||||
func TestHookTracker_Record(t *testing.T) {
|
||||
tracker := NewHookTracker()
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre)
|
||||
err := tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true)
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
err := tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
|
||||
key := hookTrackerKey{
|
||||
key := hookKey{
|
||||
podNamespace: "ns1",
|
||||
podName: "pod1",
|
||||
container: "container1",
|
||||
hookPhase: PhasePre,
|
||||
hookPhase: "",
|
||||
hookSource: HookSourceAnnotation,
|
||||
hookName: "h1",
|
||||
}
|
||||
|
||||
info := tracker.tracker[key]
|
||||
assert.True(t, info.hookFailed)
|
||||
assert.True(t, info.hookExecuted)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = tracker.Record("ns2", "pod2", "container1", HookSourceAnnotation, "h1", PhasePre, true)
|
||||
err = tracker.Record("ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
assert.NotNil(t, err)
|
||||
|
||||
err = tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", false, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, info.hookFailed)
|
||||
}
|
||||
|
||||
func TestHookTracker_Stat(t *testing.T) {
|
||||
tracker := NewHookTracker()
|
||||
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre)
|
||||
tracker.Add("ns2", "pod2", "container1", HookSourceAnnotation, "h2", PhasePre)
|
||||
tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true)
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
tracker.Add("ns2", "pod2", "container1", HookSourceAnnotation, "h2", "")
|
||||
tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
|
||||
attempted, failed := tracker.Stat()
|
||||
assert.Equal(t, 1, attempted)
|
||||
assert.Equal(t, 2, attempted)
|
||||
assert.Equal(t, 1, failed)
|
||||
}
|
||||
|
||||
func TestHookTracker_Get(t *testing.T) {
|
||||
func TestHookTracker_IsComplete(t *testing.T) {
|
||||
tracker := NewHookTracker()
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre)
|
||||
tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true, fmt.Errorf("err"))
|
||||
assert.True(t, tracker.IsComplete())
|
||||
|
||||
tr := tracker.GetTracker()
|
||||
assert.NotNil(t, tr)
|
||||
|
||||
t.Logf("tracker :%+v", tr)
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
assert.False(t, tracker.IsComplete())
|
||||
}
|
||||
|
||||
func TestHookTracker_HookErrs(t *testing.T) {
|
||||
tracker := NewHookTracker()
|
||||
tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
|
||||
hookErrs := tracker.HookErrs()
|
||||
assert.Len(t, hookErrs, 1)
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_Add(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
|
||||
key := hookKey{
|
||||
podNamespace: "ns1",
|
||||
podName: "pod1",
|
||||
container: "container1",
|
||||
hookPhase: "",
|
||||
hookSource: HookSourceAnnotation,
|
||||
hookName: "h1",
|
||||
}
|
||||
|
||||
_, ok := mht.trackers["restore1"].tracker[key]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_Record(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
err := mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
|
||||
key := hookKey{
|
||||
podNamespace: "ns1",
|
||||
podName: "pod1",
|
||||
container: "container1",
|
||||
hookPhase: "",
|
||||
hookSource: HookSourceAnnotation,
|
||||
hookName: "h1",
|
||||
}
|
||||
|
||||
info := mht.trackers["restore1"].tracker[key]
|
||||
assert.True(t, info.hookFailed)
|
||||
assert.True(t, info.hookExecuted)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = mht.Record("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
assert.NotNil(t, err)
|
||||
|
||||
err = mht.Record("restore2", "ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_Stat(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
mht.Add("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h2", "")
|
||||
mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
mht.Record("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h2", "", false, nil)
|
||||
|
||||
attempted, failed := mht.Stat("restore1")
|
||||
assert.Equal(t, 2, attempted)
|
||||
assert.Equal(t, 1, failed)
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_Delete(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
mht.Delete("restore1")
|
||||
|
||||
_, ok := mht.trackers["restore1"]
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_IsComplete(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
mht.Add("backup1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre)
|
||||
mht.Record("backup1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true, fmt.Errorf("err"))
|
||||
assert.True(t, mht.IsComplete("backup1"))
|
||||
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
assert.False(t, mht.IsComplete("restore1"))
|
||||
|
||||
assert.True(t, mht.IsComplete("restore2"))
|
||||
}
|
||||
|
||||
func TestMultiHookTracker_HookErrs(t *testing.T) {
|
||||
mht := NewMultiHookTracker()
|
||||
mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "")
|
||||
mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err"))
|
||||
|
||||
hookErrs := mht.HookErrs("restore1")
|
||||
assert.Len(t, hookErrs, 1)
|
||||
|
||||
hookErrs2 := mht.HookErrs("restore2")
|
||||
assert.Empty(t, hookErrs2)
|
||||
}
|
||||
|
||||
@@ -239,7 +239,7 @@ func (h *DefaultItemHookHandler) HandleHooks(
|
||||
hookLog.WithError(errExec).Error("Error executing hook")
|
||||
hookFailed = true
|
||||
}
|
||||
errTracker := hookTracker.Record(namespace, name, hookFromAnnotations.Container, HookSourceAnnotation, "", phase, hookFailed)
|
||||
errTracker := hookTracker.Record(namespace, name, hookFromAnnotations.Container, HookSourceAnnotation, "", phase, hookFailed, errExec)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
@@ -291,7 +291,7 @@ func (h *DefaultItemHookHandler) HandleHooks(
|
||||
modeFailError = err
|
||||
}
|
||||
}
|
||||
errTracker := hookTracker.Record(namespace, name, hook.Exec.Container, HookSourceSpec, resourceHook.Name, phase, hookFailed)
|
||||
errTracker := hookTracker.Record(namespace, name, hook.Exec.Container, HookSourceSpec, resourceHook.Name, phase, hookFailed, err)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
@@ -540,10 +540,11 @@ type PodExecRestoreHook struct {
|
||||
// container name. If an exec hook is defined in annotation that is used, else applicable exec
|
||||
// hooks from the restore resource are accumulated.
|
||||
func GroupRestoreExecHooks(
|
||||
restoreName string,
|
||||
resourceRestoreHooks []ResourceRestoreHook,
|
||||
pod *corev1api.Pod,
|
||||
log logrus.FieldLogger,
|
||||
hookTrack *HookTracker,
|
||||
hookTrack *MultiHookTracker,
|
||||
) (map[string][]PodExecRestoreHook, error) {
|
||||
byContainer := map[string][]PodExecRestoreHook{}
|
||||
|
||||
@@ -560,7 +561,7 @@ func GroupRestoreExecHooks(
|
||||
if hookFromAnnotation.Container == "" {
|
||||
hookFromAnnotation.Container = pod.Spec.Containers[0].Name
|
||||
}
|
||||
hookTrack.Add(metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
byContainer[hookFromAnnotation.Container] = []PodExecRestoreHook{
|
||||
{
|
||||
HookName: "<from-annotation>",
|
||||
@@ -595,7 +596,7 @@ func GroupRestoreExecHooks(
|
||||
if named.Hook.Container == "" {
|
||||
named.Hook.Container = pod.Spec.Containers[0].Name
|
||||
}
|
||||
hookTrack.Add(metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, hookPhase(""))
|
||||
hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, hookPhase(""))
|
||||
byContainer[named.Hook.Container] = append(byContainer[named.Hook.Container], named)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1195,10 +1195,10 @@ func TestGroupRestoreExecHooks(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
hookTracker := NewHookTracker()
|
||||
hookTracker := NewMultiHookTracker()
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual, err := GroupRestoreExecHooks(tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), hookTracker)
|
||||
actual, err := GroupRestoreExecHooks("restore1", tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), hookTracker)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, tc.expected, actual)
|
||||
})
|
||||
@@ -2108,7 +2108,7 @@ func TestBackupHookTracker(t *testing.T) {
|
||||
phase: PhasePre,
|
||||
groupResource: "pods",
|
||||
hookTracker: NewHookTracker(),
|
||||
expectedHookAttempted: 3,
|
||||
expectedHookAttempted: 4,
|
||||
expectedHookFailed: 2,
|
||||
pods: []podWithHook{
|
||||
{
|
||||
@@ -2364,14 +2364,14 @@ func TestRestoreHookTrackerAdd(t *testing.T) {
|
||||
name string
|
||||
resourceRestoreHooks []ResourceRestoreHook
|
||||
pod *corev1api.Pod
|
||||
hookTracker *HookTracker
|
||||
hookTracker *MultiHookTracker
|
||||
expectedCnt int
|
||||
}{
|
||||
{
|
||||
name: "neither spec hooks nor annotations hooks are set",
|
||||
resourceRestoreHooks: nil,
|
||||
pod: builder.ForPod("default", "my-pod").Result(),
|
||||
hookTracker: NewHookTracker(),
|
||||
hookTracker: NewMultiHookTracker(),
|
||||
expectedCnt: 0,
|
||||
},
|
||||
{
|
||||
@@ -2390,7 +2390,7 @@ func TestRestoreHookTrackerAdd(t *testing.T) {
|
||||
Name: "container1",
|
||||
}).
|
||||
Result(),
|
||||
hookTracker: NewHookTracker(),
|
||||
hookTracker: NewMultiHookTracker(),
|
||||
expectedCnt: 1,
|
||||
},
|
||||
{
|
||||
@@ -2428,7 +2428,7 @@ func TestRestoreHookTrackerAdd(t *testing.T) {
|
||||
Name: "container2",
|
||||
}).
|
||||
Result(),
|
||||
hookTracker: NewHookTracker(),
|
||||
hookTracker: NewMultiHookTracker(),
|
||||
expectedCnt: 2,
|
||||
},
|
||||
{
|
||||
@@ -2463,15 +2463,18 @@ func TestRestoreHookTrackerAdd(t *testing.T) {
|
||||
Name: "container1",
|
||||
}).
|
||||
Result(),
|
||||
hookTracker: NewHookTracker(),
|
||||
hookTracker: NewMultiHookTracker(),
|
||||
expectedCnt: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, _ = GroupRestoreExecHooks(tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), tc.hookTracker)
|
||||
tracker := tc.hookTracker.GetTracker()
|
||||
_, _ = GroupRestoreExecHooks("restore1", tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), tc.hookTracker)
|
||||
if _, ok := tc.hookTracker.trackers["restore1"]; !ok {
|
||||
return
|
||||
}
|
||||
tracker := tc.hookTracker.trackers["restore1"].tracker
|
||||
assert.Len(t, tracker, tc.expectedCnt)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -39,7 +39,8 @@ type WaitExecHookHandler interface {
|
||||
log logrus.FieldLogger,
|
||||
pod *v1.Pod,
|
||||
byContainer map[string][]PodExecRestoreHook,
|
||||
hookTrack *HookTracker,
|
||||
multiHookTracker *MultiHookTracker,
|
||||
restoreName string,
|
||||
) []error
|
||||
}
|
||||
|
||||
@@ -74,7 +75,8 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
||||
log logrus.FieldLogger,
|
||||
pod *v1.Pod,
|
||||
byContainer map[string][]PodExecRestoreHook,
|
||||
hookTracker *HookTracker,
|
||||
multiHookTracker *MultiHookTracker,
|
||||
restoreName string,
|
||||
) []error {
|
||||
if pod == nil {
|
||||
return nil
|
||||
@@ -167,7 +169,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
||||
hookLog.Error(err)
|
||||
errors = append(errors, err)
|
||||
|
||||
errTracker := hookTracker.Record(newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true)
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
@@ -193,7 +195,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
||||
hookFailed = true
|
||||
}
|
||||
|
||||
errTracker := hookTracker.Record(newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), hookFailed)
|
||||
errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), hookFailed, hookErr)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
@@ -245,7 +247,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks(
|
||||
},
|
||||
)
|
||||
|
||||
errTracker := hookTracker.Record(pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true)
|
||||
errTracker := multiHookTracker.Record(restoreName, pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err)
|
||||
if errTracker != nil {
|
||||
hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker")
|
||||
}
|
||||
|
||||
@@ -743,8 +743,8 @@ func TestWaitExecHandleHooks(t *testing.T) {
|
||||
defer ctxCancel()
|
||||
}
|
||||
|
||||
hookTracker := NewHookTracker()
|
||||
errs := h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, hookTracker)
|
||||
hookTracker := NewMultiHookTracker()
|
||||
errs := h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, hookTracker, "restore1")
|
||||
|
||||
// for i, ee := range test.expectedErrors {
|
||||
require.Len(t, errs, len(test.expectedErrors))
|
||||
@@ -1011,15 +1011,18 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
||||
pod *v1.Pod
|
||||
}
|
||||
|
||||
hookTracker1 := NewHookTracker()
|
||||
hookTracker1.Add("default", "my-pod", "container1", HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
hookTracker1 := NewMultiHookTracker()
|
||||
hookTracker1.Add("restore1", "default", "my-pod", "container1", HookSourceAnnotation, "<from-annotation>", hookPhase(""))
|
||||
|
||||
hookTracker2 := NewHookTracker()
|
||||
hookTracker2.Add("default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker2 := NewMultiHookTracker()
|
||||
hookTracker2.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
|
||||
hookTracker3 := NewHookTracker()
|
||||
hookTracker3.Add("default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker3.Add("default", "my-pod", "container2", HookSourceSpec, "my-hook-2", hookPhase(""))
|
||||
hookTracker3 := NewMultiHookTracker()
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
hookTracker3.Add("restore1", "default", "my-pod", "container2", HookSourceSpec, "my-hook-2", hookPhase(""))
|
||||
|
||||
hookTracker4 := NewMultiHookTracker()
|
||||
hookTracker4.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase(""))
|
||||
|
||||
tests1 := []struct {
|
||||
name string
|
||||
@@ -1027,7 +1030,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
||||
groupResource string
|
||||
byContainer map[string][]PodExecRestoreHook
|
||||
expectedExecutions []expectedExecution
|
||||
hookTracker *HookTracker
|
||||
hookTracker *MultiHookTracker
|
||||
expectedFailed int
|
||||
}{
|
||||
{
|
||||
@@ -1159,7 +1162,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
hookTracker: hookTracker2,
|
||||
hookTracker: hookTracker4,
|
||||
expectedFailed: 1,
|
||||
},
|
||||
{
|
||||
@@ -1243,7 +1246,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
hookTracker: NewHookTracker(),
|
||||
hookTracker: NewMultiHookTracker(),
|
||||
expectedFailed: 0,
|
||||
},
|
||||
}
|
||||
@@ -1271,8 +1274,8 @@ func TestRestoreHookTrackerUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
_ = h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, test.hookTracker)
|
||||
_, actualFailed := test.hookTracker.Stat()
|
||||
_ = h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, test.hookTracker, "restore1")
|
||||
_, actualFailed := test.hookTracker.Stat("restore1")
|
||||
assert.Equal(t, test.expectedFailed, actualFailed)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -467,7 +467,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
|
||||
updated.Status.HookStatus = &velerov1api.HookStatus{}
|
||||
}
|
||||
updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = itemBackupper.hookTracker.Stat()
|
||||
log.Infof("hookTracker: %+v, hookAttempted: %d, hookFailed: %d", itemBackupper.hookTracker.GetTracker(), updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed)
|
||||
log.Debugf("hookAttempted: %d, hookFailed: %d", updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed)
|
||||
|
||||
if err := kube.PatchResource(backupRequest.Backup, updated, kb.kbClient); err != nil {
|
||||
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress and hook status")
|
||||
|
||||
@@ -54,6 +54,7 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/credentials"
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
"github.com/vmware-tanzu/velero/internal/storage"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
|
||||
@@ -943,6 +944,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
s.logger.Fatal(err, "fail to get controller-runtime informer from manager for PVR")
|
||||
}
|
||||
|
||||
multiHookTracker := hook.NewMultiHookTracker()
|
||||
|
||||
if _, ok := enabledRuntimeControllers[controller.Restore]; ok {
|
||||
restorer, err := restore.NewKubernetesRestorer(
|
||||
s.discoveryHelper,
|
||||
@@ -965,6 +968,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
s.kubeClient.CoreV1().RESTClient(),
|
||||
s.credentialFileStore,
|
||||
s.mgr.GetClient(),
|
||||
multiHookTracker,
|
||||
)
|
||||
|
||||
cmd.CheckError(err)
|
||||
@@ -1017,6 +1021,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
|
||||
backupStoreGetter,
|
||||
s.metrics,
|
||||
s.crClient,
|
||||
multiHookTracker,
|
||||
).SetupWithManager(s.mgr); err != nil {
|
||||
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/metrics"
|
||||
@@ -55,6 +56,7 @@ type restoreFinalizerReconciler struct {
|
||||
metrics *metrics.ServerMetrics
|
||||
clock clock.WithTickerAndDelayedExecution
|
||||
crClient client.Client
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
}
|
||||
|
||||
func NewRestoreFinalizerReconciler(
|
||||
@@ -65,6 +67,7 @@ func NewRestoreFinalizerReconciler(
|
||||
backupStoreGetter persistence.ObjectBackupStoreGetter,
|
||||
metrics *metrics.ServerMetrics,
|
||||
crClient client.Client,
|
||||
multiHookTracker *hook.MultiHookTracker,
|
||||
) *restoreFinalizerReconciler {
|
||||
return &restoreFinalizerReconciler{
|
||||
Client: client,
|
||||
@@ -75,6 +78,7 @@ func NewRestoreFinalizerReconciler(
|
||||
metrics: metrics,
|
||||
clock: &clock.RealClock{},
|
||||
crClient: crClient,
|
||||
multiHookTracker: multiHookTracker,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,11 +155,12 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
|
||||
restoredPVCList := volume.RestoredPVCFromRestoredResourceList(restoredResourceList)
|
||||
|
||||
finalizerCtx := &finalizerContext{
|
||||
logger: log,
|
||||
restore: restore,
|
||||
crClient: r.crClient,
|
||||
volumeInfo: volumeInfo,
|
||||
restoredPVCList: restoredPVCList,
|
||||
logger: log,
|
||||
restore: restore,
|
||||
crClient: r.crClient,
|
||||
volumeInfo: volumeInfo,
|
||||
restoredPVCList: restoredPVCList,
|
||||
multiHookTracker: r.multiHookTracker,
|
||||
}
|
||||
warnings, errs := finalizerCtx.execute()
|
||||
|
||||
@@ -233,11 +238,12 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
|
||||
// finalizerContext includes all the dependencies required by finalization tasks and
|
||||
// a function execute() to orderly implement task logic.
|
||||
type finalizerContext struct {
|
||||
logger logrus.FieldLogger
|
||||
restore *velerov1api.Restore
|
||||
crClient client.Client
|
||||
volumeInfo []*volume.BackupVolumeInfo
|
||||
restoredPVCList map[string]struct{}
|
||||
logger logrus.FieldLogger
|
||||
restore *velerov1api.Restore
|
||||
crClient client.Client
|
||||
volumeInfo []*volume.BackupVolumeInfo
|
||||
restoredPVCList map[string]struct{}
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
}
|
||||
|
||||
func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
|
||||
@@ -247,6 +253,9 @@ func (ctx *finalizerContext) execute() (results.Result, results.Result) { //noli
|
||||
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
|
||||
errs.Merge(&pdpErrs)
|
||||
|
||||
rehErrs := ctx.WaitRestoreExecHook()
|
||||
errs.Merge(&rehErrs)
|
||||
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
@@ -373,3 +382,45 @@ func needPatch(newPV *v1.PersistentVolume, pvInfo *volume.PVInfo) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// WaitRestoreExecHook waits for restore exec hooks to finish then update the hook execution results
|
||||
func (ctx *finalizerContext) WaitRestoreExecHook() (errs results.Result) {
|
||||
log := ctx.logger.WithField("restore", ctx.restore.Name)
|
||||
log.Info("Waiting for restore exec hooks starts")
|
||||
|
||||
// wait for restore exec hooks to finish
|
||||
err := wait.PollUntilContextCancel(context.Background(), 1*time.Second, true, func(context.Context) (bool, error) {
|
||||
log.Debug("Checking the progress of hooks execution")
|
||||
if ctx.multiHookTracker.IsComplete(ctx.restore.Name) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
errs.Add(ctx.restore.Namespace, err)
|
||||
return errs
|
||||
}
|
||||
log.Info("Done waiting for restore exec hooks starts")
|
||||
|
||||
for _, ei := range ctx.multiHookTracker.HookErrs(ctx.restore.Name) {
|
||||
errs.Add(ei.Namespace, ei.Err)
|
||||
}
|
||||
|
||||
// update hooks execution status
|
||||
updated := ctx.restore.DeepCopy()
|
||||
if updated.Status.HookStatus == nil {
|
||||
updated.Status.HookStatus = &velerov1api.HookStatus{}
|
||||
}
|
||||
updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = ctx.multiHookTracker.Stat(ctx.restore.Name)
|
||||
log.Debugf("hookAttempted: %d, hookFailed: %d", updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed)
|
||||
|
||||
if err := kubeutil.PatchResource(ctx.restore, updated, ctx.crClient); err != nil {
|
||||
log.WithError(errors.WithStack((err))).Error("Updating restore status")
|
||||
errs.Add(ctx.restore.Namespace, err)
|
||||
}
|
||||
|
||||
// delete the hook data for this restore
|
||||
ctx.multiHookTracker.Delete(ctx.restore.Name)
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -34,6 +35,7 @@ import (
|
||||
corev1api "k8s.io/api/core/v1"
|
||||
crclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/vmware-tanzu/velero/internal/hook"
|
||||
"github.com/vmware-tanzu/velero/internal/volume"
|
||||
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
|
||||
"github.com/vmware-tanzu/velero/pkg/builder"
|
||||
@@ -135,6 +137,7 @@ func TestRestoreFinalizerReconcile(t *testing.T) {
|
||||
NewFakeSingleObjectBackupStoreGetter(backupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
fakeClient,
|
||||
hook.NewMultiHookTracker(),
|
||||
)
|
||||
r.clock = testclocks.NewFakeClock(now)
|
||||
|
||||
@@ -196,6 +199,7 @@ func TestUpdateResult(t *testing.T) {
|
||||
NewFakeSingleObjectBackupStoreGetter(backupStore),
|
||||
metrics.NewServerMetrics(),
|
||||
fakeClient,
|
||||
hook.NewMultiHookTracker(),
|
||||
)
|
||||
restore := builder.ForRestore(velerov1api.DefaultNamespace, "restore-1").Result()
|
||||
res := map[string]results.Result{"warnings": {}, "errors": {}}
|
||||
@@ -454,3 +458,99 @@ func TestPatchDynamicPVWithVolumeInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitRestoreExecHook(t *testing.T) {
|
||||
hookTracker1 := hook.NewMultiHookTracker()
|
||||
restoreName1 := "restore1"
|
||||
|
||||
hookTracker2 := hook.NewMultiHookTracker()
|
||||
restoreName2 := "restore2"
|
||||
hookTracker2.Add(restoreName2, "ns", "pod", "con1", "s1", "h1", "")
|
||||
hookTracker2.Record(restoreName2, "ns", "pod", "con1", "s1", "h1", "", false, nil)
|
||||
|
||||
hookTracker3 := hook.NewMultiHookTracker()
|
||||
restoreName3 := "restore3"
|
||||
podNs, podName, container, source, hookName := "ns", "pod", "con1", "s1", "h1"
|
||||
hookFailed, hookErr := true, fmt.Errorf("hook failed")
|
||||
hookTracker3.Add(restoreName3, podNs, podName, container, source, hookName, hook.PhasePre)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
hookTracker *hook.MultiHookTracker
|
||||
restore *velerov1api.Restore
|
||||
expectedHooksAttempted int
|
||||
expectedHooksFailed int
|
||||
expectedHookErrs int
|
||||
waitSec int
|
||||
podName string
|
||||
podNs string
|
||||
Container string
|
||||
Source string
|
||||
hookName string
|
||||
hookFailed bool
|
||||
hookErr error
|
||||
}{
|
||||
{
|
||||
name: "no restore exec hooks",
|
||||
hookTracker: hookTracker1,
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName1).Result(),
|
||||
expectedHooksAttempted: 0,
|
||||
expectedHooksFailed: 0,
|
||||
expectedHookErrs: 0,
|
||||
},
|
||||
{
|
||||
name: "1 restore exec hook having been executed",
|
||||
hookTracker: hookTracker2,
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName2).Result(),
|
||||
expectedHooksAttempted: 1,
|
||||
expectedHooksFailed: 0,
|
||||
expectedHookErrs: 0,
|
||||
},
|
||||
{
|
||||
name: "1 restore exec hook to be executed",
|
||||
hookTracker: hookTracker3,
|
||||
restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName3).Result(),
|
||||
waitSec: 2,
|
||||
expectedHooksAttempted: 1,
|
||||
expectedHooksFailed: 1,
|
||||
expectedHookErrs: 1,
|
||||
podName: podName,
|
||||
podNs: podNs,
|
||||
Container: container,
|
||||
Source: source,
|
||||
hookName: hookName,
|
||||
hookFailed: hookFailed,
|
||||
hookErr: hookErr,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
var (
|
||||
fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build()
|
||||
logger = velerotest.NewLogger()
|
||||
)
|
||||
ctx := &finalizerContext{
|
||||
logger: logger,
|
||||
crClient: fakeClient,
|
||||
restore: tc.restore,
|
||||
multiHookTracker: tc.hookTracker,
|
||||
}
|
||||
require.NoError(t, ctx.crClient.Create(context.Background(), tc.restore))
|
||||
|
||||
if tc.waitSec > 0 {
|
||||
go func() {
|
||||
time.Sleep(time.Second * time.Duration(tc.waitSec))
|
||||
tc.hookTracker.Record(tc.restore.Name, tc.podNs, tc.podName, tc.Container, tc.Source, tc.hookName, hook.PhasePre, tc.hookFailed, tc.hookErr)
|
||||
}()
|
||||
}
|
||||
|
||||
errs := ctx.WaitRestoreExecHook()
|
||||
assert.Len(t, errs.Namespaces, tc.expectedHookErrs)
|
||||
|
||||
updated := &velerov1api.Restore{}
|
||||
err := ctx.crClient.Get(context.Background(), crclient.ObjectKey{Namespace: velerov1api.DefaultNamespace, Name: tc.restore.Name}, updated)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedHooksAttempted, updated.Status.HookStatus.HooksAttempted)
|
||||
assert.Equal(t, tc.expectedHooksFailed, updated.Status.HookStatus.HooksFailed)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ type kubernetesRestorer struct {
|
||||
podGetter cache.Getter
|
||||
credentialFileStore credentials.FileStore
|
||||
kbClient crclient.Client
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
}
|
||||
|
||||
// NewKubernetesRestorer creates a new kubernetesRestorer.
|
||||
@@ -131,6 +132,7 @@ func NewKubernetesRestorer(
|
||||
podGetter cache.Getter,
|
||||
credentialStore credentials.FileStore,
|
||||
kbClient crclient.Client,
|
||||
multiHookTracker *hook.MultiHookTracker,
|
||||
) (Restorer, error) {
|
||||
return &kubernetesRestorer{
|
||||
discoveryHelper: discoveryHelper,
|
||||
@@ -155,6 +157,7 @@ func NewKubernetesRestorer(
|
||||
podGetter: podGetter,
|
||||
credentialFileStore: credentialStore,
|
||||
kbClient: kbClient,
|
||||
multiHookTracker: multiHookTracker,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -252,11 +255,6 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
|
||||
}
|
||||
}
|
||||
|
||||
resourceRestoreHooks, err := hook.GetRestoreHooksFromSpec(&req.Restore.Spec.Hooks)
|
||||
if err != nil {
|
||||
return results.Result{}, results.Result{Velero: []string{err.Error()}}
|
||||
}
|
||||
hooksCtx, hooksCancelFunc := go_context.WithCancel(go_context.Background())
|
||||
waitExecHookHandler := &hook.DefaultWaitExecHookHandler{
|
||||
PodCommandExecutor: kr.podCommandExecutor,
|
||||
ListWatchFactory: &hook.DefaultListWatchFactory{
|
||||
@@ -264,6 +262,11 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
|
||||
},
|
||||
}
|
||||
|
||||
hooksWaitExecutor, err := newHooksWaitExecutor(req.Restore, waitExecHookHandler)
|
||||
if err != nil {
|
||||
return results.Result{}, results.Result{Velero: []string{err.Error()}}
|
||||
}
|
||||
|
||||
pvRestorer := &pvRestorer{
|
||||
logger: req.Log,
|
||||
backup: req.Backup,
|
||||
@@ -310,18 +313,14 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
|
||||
pvRenamer: kr.pvRenamer,
|
||||
discoveryHelper: kr.discoveryHelper,
|
||||
resourcePriorities: kr.resourcePriorities,
|
||||
resourceRestoreHooks: resourceRestoreHooks,
|
||||
hooksErrs: make(chan hook.HookErrInfo),
|
||||
waitExecHookHandler: waitExecHookHandler,
|
||||
hooksContext: hooksCtx,
|
||||
hooksCancelFunc: hooksCancelFunc,
|
||||
kbClient: kr.kbClient,
|
||||
itemOperationsList: req.GetItemOperationsList(),
|
||||
resourceModifiers: req.ResourceModifiers,
|
||||
disableInformerCache: req.DisableInformerCache,
|
||||
hookTracker: hook.NewHookTracker(),
|
||||
multiHookTracker: kr.multiHookTracker,
|
||||
backupVolumeInfoMap: req.BackupVolumeInfoMap,
|
||||
restoreVolumeInfoTracker: req.RestoreVolumeInfoTracker,
|
||||
hooksWaitExecutor: hooksWaitExecutor,
|
||||
}
|
||||
|
||||
return restoreCtx.execute()
|
||||
@@ -362,19 +361,14 @@ type restoreContext struct {
|
||||
pvRenamer func(string) (string, error)
|
||||
discoveryHelper discovery.Helper
|
||||
resourcePriorities Priorities
|
||||
hooksWaitGroup sync.WaitGroup
|
||||
hooksErrs chan hook.HookErrInfo
|
||||
resourceRestoreHooks []hook.ResourceRestoreHook
|
||||
waitExecHookHandler hook.WaitExecHookHandler
|
||||
hooksContext go_context.Context
|
||||
hooksCancelFunc go_context.CancelFunc
|
||||
kbClient crclient.Client
|
||||
itemOperationsList *[]*itemoperation.RestoreOperation
|
||||
resourceModifiers *resourcemodifiers.ResourceModifiers
|
||||
disableInformerCache bool
|
||||
hookTracker *hook.HookTracker
|
||||
multiHookTracker *hook.MultiHookTracker
|
||||
backupVolumeInfoMap map[string]volume.BackupVolumeInfo
|
||||
restoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
|
||||
hooksWaitExecutor *hooksWaitExecutor
|
||||
}
|
||||
|
||||
type resourceClientKey struct {
|
||||
@@ -650,6 +644,12 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
|
||||
updated.Status.Progress.TotalItems = len(ctx.restoredItems)
|
||||
updated.Status.Progress.ItemsRestored = len(ctx.restoredItems)
|
||||
|
||||
// patch the restore
|
||||
err = kube.PatchResource(ctx.restore, updated, ctx.kbClient)
|
||||
if err != nil {
|
||||
ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status")
|
||||
}
|
||||
|
||||
// Wait for all of the pod volume restore goroutines to be done, which is
|
||||
// only possible once all of their errors have been received by the loop
|
||||
// below, then close the podVolumeErrs channel so the loop terminates.
|
||||
@@ -672,31 +672,6 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
|
||||
}
|
||||
ctx.log.Info("Done waiting for all pod volume restores to complete")
|
||||
|
||||
// Wait for all post-restore exec hooks with same logic as pod volume wait above.
|
||||
go func() {
|
||||
ctx.log.Info("Waiting for all post-restore-exec hooks to complete")
|
||||
|
||||
ctx.hooksWaitGroup.Wait()
|
||||
close(ctx.hooksErrs)
|
||||
}()
|
||||
for errInfo := range ctx.hooksErrs {
|
||||
errs.Add(errInfo.Namespace, errInfo.Err)
|
||||
}
|
||||
ctx.log.Info("Done waiting for all post-restore exec hooks to complete")
|
||||
|
||||
// update hooks execution status
|
||||
if updated.Status.HookStatus == nil {
|
||||
updated.Status.HookStatus = &velerov1api.HookStatus{}
|
||||
}
|
||||
updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = ctx.hookTracker.Stat()
|
||||
ctx.log.Infof("hookTracker: %+v, hookAttempted: %d, hookFailed: %d", ctx.hookTracker.GetTracker(), updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed)
|
||||
|
||||
// patch the restore status
|
||||
err = kube.PatchResource(ctx.restore, updated, ctx.kbClient)
|
||||
if err != nil {
|
||||
ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status")
|
||||
}
|
||||
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
@@ -1730,8 +1705,24 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
|
||||
}
|
||||
}
|
||||
|
||||
// Asynchronously executes restore exec hooks if any
|
||||
// Velero will wait for all the asynchronous hook operations to finish in finalizing phase, using hook tracker to track the execution progress.
|
||||
if groupResource == kuberesource.Pods {
|
||||
ctx.waitExec(createdObj)
|
||||
pod := new(v1.Pod)
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
|
||||
ctx.log.Errorf("error converting pod %s: %v", kube.NamespaceAndName(obj), err)
|
||||
errs.Add(namespace, err)
|
||||
return warnings, errs, itemExists
|
||||
}
|
||||
|
||||
execHooksByContainer, err := ctx.hooksWaitExecutor.groupHooks(ctx.restore.Name, pod, ctx.multiHookTracker)
|
||||
if err != nil {
|
||||
ctx.log.Errorf("error grouping hooks from pod %s: %v", kube.NamespaceAndName(obj), err)
|
||||
errs.Add(namespace, err)
|
||||
return warnings, errs, itemExists
|
||||
}
|
||||
|
||||
ctx.hooksWaitExecutor.exec(execHooksByContainer, pod, ctx.multiHookTracker, ctx.restore.Name)
|
||||
}
|
||||
|
||||
// Wait for a CRD to be available for instantiating resources
|
||||
@@ -1898,41 +1889,48 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr
|
||||
}
|
||||
}
|
||||
|
||||
// waitExec executes hooks in a restored pod's containers when they become ready.
|
||||
func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) {
|
||||
ctx.hooksWaitGroup.Add(1)
|
||||
// hooksWaitExecutor is used to collect necessary fields that are required to asynchronously execute restore exec hooks
|
||||
// note that fields are shared across different pods within a specific restore
|
||||
// and separate hooksWaitExecutors instance will be created for different restores without interfering with each other.
|
||||
type hooksWaitExecutor struct {
|
||||
log logrus.FieldLogger
|
||||
hooksContext go_context.Context
|
||||
hooksCancelFunc go_context.CancelFunc
|
||||
resourceRestoreHooks []hook.ResourceRestoreHook
|
||||
waitExecHookHandler hook.WaitExecHookHandler
|
||||
}
|
||||
|
||||
func newHooksWaitExecutor(restore *velerov1api.Restore, waitExecHookHandler hook.WaitExecHookHandler) (*hooksWaitExecutor, error) {
|
||||
resourceRestoreHooks, err := hook.GetRestoreHooksFromSpec(&restore.Spec.Hooks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hooksCtx, hooksCancelFunc := go_context.WithCancel(go_context.Background())
|
||||
hwe := &hooksWaitExecutor{
|
||||
log: logrus.WithField("restore", restore.Name),
|
||||
hooksContext: hooksCtx,
|
||||
hooksCancelFunc: hooksCancelFunc,
|
||||
resourceRestoreHooks: resourceRestoreHooks,
|
||||
waitExecHookHandler: waitExecHookHandler,
|
||||
}
|
||||
return hwe, nil
|
||||
}
|
||||
|
||||
// groupHooks returns a list of hooks to be executed in a pod grouped bycontainer name.
|
||||
func (hwe *hooksWaitExecutor) groupHooks(restoreName string, pod *v1.Pod, multiHookTracker *hook.MultiHookTracker) (map[string][]hook.PodExecRestoreHook, error) {
|
||||
execHooksByContainer, err := hook.GroupRestoreExecHooks(restoreName, hwe.resourceRestoreHooks, pod, hwe.log, multiHookTracker)
|
||||
return execHooksByContainer, err
|
||||
}
|
||||
|
||||
// exec asynchronously executes hooks in a restored pod's containers when they become ready.
|
||||
// Goroutine within this function will continue running until the hook executions are complete.
|
||||
// Velero will wait for goroutine to finish in finalizing phase, using hook tracker to track the progress.
|
||||
// To optimize memory usage, ensure that the variables used in this function are kept to a minimum to prevent unnecessary retention in memory.
|
||||
func (hwe *hooksWaitExecutor) exec(execHooksByContainer map[string][]hook.PodExecRestoreHook, pod *v1.Pod, multiHookTracker *hook.MultiHookTracker, restoreName string) {
|
||||
go func() {
|
||||
// Done() will only be called after all errors have been successfully sent
|
||||
// on the ctx.podVolumeErrs channel.
|
||||
defer ctx.hooksWaitGroup.Done()
|
||||
|
||||
podNs := createdObj.GetNamespace()
|
||||
pod := new(v1.Pod)
|
||||
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
|
||||
ctx.log.WithError(err).Error("error converting unstructured pod")
|
||||
ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err}
|
||||
return
|
||||
}
|
||||
execHooksByContainer, err := hook.GroupRestoreExecHooks(
|
||||
ctx.resourceRestoreHooks,
|
||||
pod,
|
||||
ctx.log,
|
||||
ctx.hookTracker,
|
||||
)
|
||||
if err != nil {
|
||||
ctx.log.WithError(err).Errorf("error getting exec hooks for pod %s/%s", pod.Namespace, pod.Name)
|
||||
ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err}
|
||||
return
|
||||
}
|
||||
|
||||
if errs := ctx.waitExecHookHandler.HandleHooks(ctx.hooksContext, ctx.log, pod, execHooksByContainer, ctx.hookTracker); len(errs) > 0 {
|
||||
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully execute post-restore hooks")
|
||||
ctx.hooksCancelFunc()
|
||||
|
||||
for _, err := range errs {
|
||||
// Errors are already logged in the HandleHooks method.
|
||||
ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err}
|
||||
}
|
||||
if errs := hwe.waitExecHookHandler.HandleHooks(hwe.hooksContext, hwe.log, pod, execHooksByContainer, multiHookTracker, restoreName); len(errs) > 0 {
|
||||
hwe.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully execute post-restore hooks")
|
||||
hwe.hooksCancelFunc()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user