mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-01-05 04:55:22 +00:00
remove restore code that waits for a PV to become Available
Signed-off-by: Steve Kriss <krisss@vmware.com>
This commit is contained in:
1
changelogs/unreleased/1254-skriss
Normal file
1
changelogs/unreleased/1254-skriss
Normal file
@@ -0,0 +1 @@
|
||||
remove restore code that waits for a PV to become Available
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -41,7 +40,6 @@ import (
|
||||
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
api "github.com/heptio/velero/pkg/apis/velero/v1"
|
||||
@@ -76,9 +74,6 @@ type Restorer interface {
|
||||
) (api.RestoreResult, api.RestoreResult)
|
||||
}
|
||||
|
||||
type gvString string
|
||||
type kindString string
|
||||
|
||||
// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
|
||||
type kubernetesRestorer struct {
|
||||
discoveryHelper discovery.Helper
|
||||
@@ -187,7 +182,6 @@ func (kr *kubernetesRestorer) Restore(
|
||||
snapshotLocationLister listers.VolumeSnapshotLocationLister,
|
||||
blockStoreGetter BlockStoreGetter,
|
||||
) (api.RestoreResult, api.RestoreResult) {
|
||||
|
||||
// metav1.LabelSelectorAsSelector converts a nil LabelSelector to a
|
||||
// Nothing Selector, i.e. a selector that matches nothing. We want
|
||||
// a selector that matches everything. This can be accomplished by
|
||||
@@ -346,8 +340,6 @@ type context struct {
|
||||
blockStoreGetter BlockStoreGetter
|
||||
resticRestorer restic.Restorer
|
||||
globalWaitGroup velerosync.ErrorGroup
|
||||
resourceWaitGroup sync.WaitGroup
|
||||
resourceWatches []watch.Interface
|
||||
pvsToProvision sets.String
|
||||
pvRestorer PVRestorer
|
||||
volumeSnapshots []*volume.Snapshot
|
||||
@@ -396,7 +388,6 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
||||
}
|
||||
|
||||
resourceDirsMap := make(map[string]os.FileInfo)
|
||||
|
||||
for _, rscDir := range resourceDirs {
|
||||
rscName := rscDir.Name()
|
||||
resourceDirsMap[rscName] = rscDir
|
||||
@@ -404,16 +395,6 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
||||
|
||||
existingNamespaces := sets.NewString()
|
||||
|
||||
// TODO this is not optimal since it'll keep watches open for all resources/namespaces
|
||||
// until the very end of the restore. This should be done per resource type. Deferring
|
||||
// refactoring for now since this may be able to be removed entirely if we eliminate
|
||||
// waiting for PV snapshot restores.
|
||||
defer func() {
|
||||
for _, watch := range ctx.resourceWatches {
|
||||
watch.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
for _, resource := range ctx.prioritizedResources {
|
||||
// we don't want to explicitly restore namespace API objs because we'll handle
|
||||
// them as a special case prior to restoring anything into them
|
||||
@@ -496,11 +477,6 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
|
||||
merge(&warnings, &w)
|
||||
merge(&errs, &e)
|
||||
}
|
||||
|
||||
// TODO timeout?
|
||||
ctx.log.Debugf("Waiting on resource wait group for resource=%s", resource.String())
|
||||
ctx.resourceWaitGroup.Wait()
|
||||
ctx.log.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
|
||||
}
|
||||
|
||||
// TODO timeout?
|
||||
@@ -708,7 +684,6 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
||||
resourceClient client.Dynamic
|
||||
groupResource = schema.ParseGroupResource(resource)
|
||||
applicableActions []resolvedAction
|
||||
resourceWatch watch.Interface
|
||||
)
|
||||
|
||||
// pre-filter the actions based on namespace & resource includes/excludes since
|
||||
@@ -813,24 +788,6 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
|
||||
continue
|
||||
}
|
||||
obj = updatedObj
|
||||
|
||||
if resourceWatch == nil {
|
||||
resourceWatch, err = resourceClient.Watch(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
addToResult(&errs, namespace, fmt.Errorf("error watching for namespace %q, resource %q: %v", namespace, &groupResource, err))
|
||||
return warnings, errs
|
||||
}
|
||||
ctx.resourceWatches = append(ctx.resourceWatches, resourceWatch)
|
||||
ctx.resourceWaitGroup.Add(1)
|
||||
go func() {
|
||||
defer ctx.resourceWaitGroup.Done()
|
||||
|
||||
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.log); err != nil {
|
||||
ctx.log.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
|
||||
addVeleroError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
|
||||
}
|
||||
}()
|
||||
}
|
||||
} else if err != nil {
|
||||
addToResult(&errs, namespace, fmt.Errorf("error checking existence for PV %s: %v", name, err))
|
||||
continue
|
||||
@@ -999,51 +956,6 @@ func hasDeleteReclaimPolicy(obj map[string]interface{}) bool {
|
||||
return policy == string(v1.PersistentVolumeReclaimDelete)
|
||||
}
|
||||
|
||||
func waitForReady(
|
||||
watchChan <-chan watch.Event,
|
||||
name string,
|
||||
ready func(runtime.Unstructured) bool,
|
||||
timeout time.Duration,
|
||||
log logrus.FieldLogger,
|
||||
) (*unstructured.Unstructured, error) {
|
||||
var timeoutChan <-chan time.Time
|
||||
if timeout != 0 {
|
||||
timeoutChan = time.After(timeout)
|
||||
} else {
|
||||
timeoutChan = make(chan time.Time)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-watchChan:
|
||||
if event.Type != watch.Added && event.Type != watch.Modified {
|
||||
continue
|
||||
}
|
||||
|
||||
obj, ok := event.Object.(*unstructured.Unstructured)
|
||||
switch {
|
||||
case !ok:
|
||||
log.Errorf("Unexpected type %T", event.Object)
|
||||
continue
|
||||
case obj.GetName() != name:
|
||||
continue
|
||||
case !ready(obj):
|
||||
log.Debugf("Item %s is not ready yet", name)
|
||||
continue
|
||||
default:
|
||||
return obj, nil
|
||||
}
|
||||
case <-timeoutChan:
|
||||
return nil, errors.New("failed to observe item becoming ready within the timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isPVReady(obj runtime.Unstructured) bool {
|
||||
phase, _, _ := unstructured.NestedString(obj.UnstructuredContent(), "status", "phase")
|
||||
return phase == string(v1.VolumeAvailable)
|
||||
}
|
||||
|
||||
func resetMetadataAndStatus(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
res, ok := obj.Object["metadata"]
|
||||
if !ok {
|
||||
|
||||
@@ -1012,9 +1012,6 @@ status:
|
||||
})
|
||||
}
|
||||
|
||||
pvWatch := new(mockWatch)
|
||||
defer pvWatch.AssertExpectations(t)
|
||||
|
||||
unstructuredPVMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvObj)
|
||||
require.NoError(t, err)
|
||||
unstructuredPV := &unstructured.Unstructured{Object: unstructuredPVMap}
|
||||
@@ -1049,16 +1046,6 @@ status:
|
||||
if test.expectPVCreation {
|
||||
createdPV := unstructuredPV.DeepCopy()
|
||||
pvClient.On("Create", unstructuredPV).Return(createdPV, nil)
|
||||
|
||||
pvClient.On("Watch", metav1.ListOptions{}).Return(pvWatch, nil)
|
||||
pvWatchChan := make(chan watch.Event, 1)
|
||||
readyPV := restoredPV.DeepCopy()
|
||||
require.NoError(t, unstructured.SetNestedField(readyPV.UnstructuredContent(), string(v1.VolumeAvailable), "status", "phase"))
|
||||
pvWatchChan <- watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: readyPV,
|
||||
}
|
||||
pvWatch.On("ResultChan").Return(pvWatchChan)
|
||||
}
|
||||
|
||||
// Restore PV
|
||||
@@ -1099,8 +1086,6 @@ status:
|
||||
assert.Empty(t, warnings.Cluster)
|
||||
assert.Empty(t, warnings.Namespaces)
|
||||
assert.Equal(t, api.RestoreResult{}, errors)
|
||||
|
||||
ctx.resourceWaitGroup.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1304,46 +1289,6 @@ func TestIsCompleted(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsPVReady(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
obj *unstructured.Unstructured
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "no status returns not ready",
|
||||
obj: NewTestUnstructured().Unstructured,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "no status.phase returns not ready",
|
||||
obj: NewTestUnstructured().WithStatus().Unstructured,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "empty status.phase returns not ready",
|
||||
obj: NewTestUnstructured().WithStatusField("phase", "").Unstructured,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "non-Available status.phase returns not ready",
|
||||
obj: NewTestUnstructured().WithStatusField("phase", "foo").Unstructured,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "Available status.phase returns ready",
|
||||
obj: NewTestUnstructured().WithStatusField("phase", "Available").Unstructured,
|
||||
expected: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
assert.Equal(t, test.expected, isPVReady(test.obj))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldRestore(t *testing.T) {
|
||||
pv := `apiVersion: v1
|
||||
kind: PersistentVolume
|
||||
|
||||
Reference in New Issue
Block a user