fix race condition in waiting for restic restores to complete

Signed-off-by: Steve Kriss <krisss@vmware.com>
This commit is contained in:
Steve Kriss
2020-01-15 10:27:21 -07:00
parent aa44cf1c32
commit 3b80e00d62
2 changed files with 32 additions and 84 deletions

View File

@@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
uuid "github.com/gofrs/uuid"
@@ -56,7 +57,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/collections"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
velerosync "github.com/vmware-tanzu/velero/pkg/util/sync"
"github.com/vmware-tanzu/velero/pkg/volume"
)
@@ -274,6 +274,7 @@ func (kr *kubernetesRestorer) Restore(
actions: resolvedActions,
volumeSnapshotterGetter: volumeSnapshotterGetter,
resticRestorer: resticRestorer,
resticErrs: make(chan error),
pvsToProvision: sets.NewString(),
pvRestorer: pvRestorer,
volumeSnapshots: req.VolumeSnapshots,
@@ -365,7 +366,8 @@ type context struct {
actions []resolvedAction
volumeSnapshotterGetter VolumeSnapshotterGetter
resticRestorer restic.Restorer
globalWaitGroup velerosync.ErrorGroup
resticWaitGroup sync.WaitGroup
resticErrs chan error
pvsToProvision sets.String
pvRestorer PVRestorer
volumeSnapshots []*volume.Snapshot
@@ -454,17 +456,27 @@ func (ctx *context) execute() (Result, Result) {
}
}
// TODO timeout?
ctx.log.Debug("Waiting on global wait group")
waitErrs := ctx.globalWaitGroup.Wait()
ctx.log.Debug("Done waiting on global wait group")
ctx.log.Info("Waiting for all restic restores to complete")
for _, err := range waitErrs {
// wait for all of the restic restore goroutines to be done, which is
// only possible once all of their errors have been received by the loop
// below, then close the resticErrs channel so the loop terminates.
go func() {
// TODO timeout?
ctx.resticWaitGroup.Wait()
close(ctx.resticErrs)
}()
// this loop will only terminate when the ctx.resticErrs channel is closed
// in the above goroutine, *after* all errors from the goroutines have been
// received by this loop.
for err := range ctx.resticErrs {
// TODO not ideal to be adding these to Velero-level errors
// rather than a specific namespace, but don't have a way
// to track the namespace right now.
errs.Velero = append(errs.Velero, err.Error())
}
ctx.log.Info("Done waiting for all restic restores to complete")
return warnings, errs
}
@@ -1148,11 +1160,17 @@ func restorePodVolumeBackups(ctx *context, createdObj *unstructured.Unstructured
if ctx.resticRestorer == nil {
ctx.log.Warn("No restic restorer, not restoring pod's volumes")
} else {
ctx.globalWaitGroup.GoErrorSlice(func() []error {
ctx.resticWaitGroup.Add(1)
go func() {
// Done() will only be called after all errors have been successfully sent
// on the ctx.resticErrs channel
defer ctx.resticWaitGroup.Done()
pod := new(v1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
ctx.log.WithError(err).Error("error converting unstructured pod")
return []error{err}
ctx.resticErrs <- err
return
}
data := restic.RestoreData{
@@ -1164,11 +1182,12 @@ func restorePodVolumeBackups(ctx *context, createdObj *unstructured.Unstructured
}
if errs := ctx.resticRestorer.RestorePodVolumes(data); errs != nil {
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
return errs
}
return nil
})
for _, err := range errs {
ctx.resticErrs <- err
}
}
}()
}
}

View File

@@ -1,71 +0,0 @@
/*
Copyright 2018 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import "sync"
// An ErrorGroup waits for a collection of goroutines that return errors to finish.
// The main goroutine calls Go one or more times to execute a function that returns
// an error in a goroutine. Then it calls Wait to wait for all goroutines to finish
// and collect the results of each.
type ErrorGroup struct {
wg sync.WaitGroup
errChan chan error
}
// Go runs the specified function in a goroutine.
func (eg *ErrorGroup) Go(action func() error) {
if eg.errChan == nil {
eg.errChan = make(chan error)
}
eg.wg.Add(1)
go func() {
eg.errChan <- action()
eg.wg.Done()
}()
}
// GoErrorSlice runs a function that returns a slice of errors
// in a goroutine.
func (eg *ErrorGroup) GoErrorSlice(action func() []error) {
if eg.errChan == nil {
eg.errChan = make(chan error)
}
eg.wg.Add(1)
go func() {
for _, err := range action() {
eg.errChan <- err
}
eg.wg.Done()
}()
}
// Wait waits for all functions run via Go to finish,
// and returns all of their errors.
func (eg *ErrorGroup) Wait() []error {
var errs []error
go func() {
for {
errs = append(errs, <-eg.errChan)
}
}()
eg.wg.Wait()
return errs
}