Make parallel restore configurable

Signed-off-by: Ming Qiu <mqiu@vmware.com>
This commit is contained in:
Ming Qiu
2024-03-08 07:18:50 +00:00
parent 84c1eca66c
commit 64a3f2aa3a
15 changed files with 240 additions and 19 deletions

View File

@@ -136,6 +136,9 @@ type UploaderConfigForRestore struct {
// +optional
// +nullable
WriteSparseFiles *bool `json:"writeSparseFiles,omitempty"`
// ParallelFilesDownload is the concurrency number setting for restore.
// +optional
ParallelFilesDownload int `json:"parallelFilesDownload,omitempty"`
}
// RestoreHooks contains custom behaviors that should be executed during or post restore.

View File

@@ -171,9 +171,3 @@ func (b *RestoreBuilder) ItemOperationTimeout(timeout time.Duration) *RestoreBui
b.object.Spec.ItemOperationTimeout.Duration = timeout
return b
}
// WriteSparseFiles sets the Restore's uploader write sparse files
func (b *RestoreBuilder) WriteSparseFiles(val bool) *RestoreBuilder {
b.object.Spec.UploaderConfig.WriteSparseFiles = &val
return b
}

View File

@@ -99,6 +99,7 @@ type CreateOptions struct {
ItemOperationTimeout time.Duration
ResourceModifierConfigMap string
WriteSparseFiles flag.OptionalBool
ParallelFilesDownload int
client kbclient.WithWatch
}
@@ -151,6 +152,8 @@ func (o *CreateOptions) BindFlags(flags *pflag.FlagSet) {
f = flags.VarPF(&o.WriteSparseFiles, "write-sparse-files", "", "Whether to write sparse files during restoring volumes")
f.NoOptDefVal = cmd.TRUE
flags.IntVar(&o.ParallelFilesDownload, "parallel-files-download", 0, "The number of restore operations to run in parallel. If set to 0, the default parallelism will be the number of CPUs for the node that node agent pod is running.")
}
func (o *CreateOptions) Complete(args []string, f client.Factory) error {
@@ -200,6 +203,10 @@ func (o *CreateOptions) Validate(c *cobra.Command, args []string, f client.Facto
return errors.New("existing-resource-policy has invalid value, it accepts only none, update as value")
}
if o.ParallelFilesDownload < 0 {
return errors.New("parallel-files-download cannot be negative")
}
switch {
case o.BackupName != "":
backup := new(api.Backup)
@@ -324,7 +331,8 @@ func (o *CreateOptions) Run(c *cobra.Command, f client.Factory) error {
Duration: o.ItemOperationTimeout,
},
UploaderConfig: &api.UploaderConfigForRestore{
WriteSparseFiles: o.WriteSparseFiles.Value,
WriteSparseFiles: o.WriteSparseFiles.Value,
ParallelFilesDownload: o.ParallelFilesDownload,
},
},
}

View File

@@ -85,7 +85,7 @@ func TestCreateCommand(t *testing.T) {
allowPartiallyFailed := "true"
itemOperationTimeout := "10m0s"
writeSparseFiles := "true"
parallel := 2
flags := new(pflag.FlagSet)
o := NewCreateOptions()
o.BindFlags(flags)
@@ -108,6 +108,7 @@ func TestCreateCommand(t *testing.T) {
flags.Parse([]string{"--allow-partially-failed", allowPartiallyFailed})
flags.Parse([]string{"--item-operation-timeout", itemOperationTimeout})
flags.Parse([]string{"--write-sparse-files", writeSparseFiles})
flags.Parse([]string{"--parallel-files-download", "2"})
client := velerotest.NewFakeControllerRuntimeClient(t).(kbclient.WithWatch)
f.On("Namespace").Return(mock.Anything)
@@ -144,6 +145,7 @@ func TestCreateCommand(t *testing.T) {
require.Equal(t, allowPartiallyFailed, o.AllowPartiallyFailed.String())
require.Equal(t, itemOperationTimeout, o.ItemOperationTimeout.String())
require.Equal(t, writeSparseFiles, o.WriteSparseFiles.String())
require.Equal(t, parallel, o.ParallelFilesDownload)
})
t.Run("create a restore from schedule", func(t *testing.T) {

View File

@@ -178,10 +178,7 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
d.Println()
d.Printf("Preserve Service NodePorts:\t%s\n", BoolPointerString(restore.Spec.PreserveNodePorts, "false", "true", "auto"))
if restore.Spec.UploaderConfig != nil && boolptr.IsSetToTrue(restore.Spec.UploaderConfig.WriteSparseFiles) {
d.Println()
DescribeUploaderConfigForRestore(d, restore.Spec)
}
describeUploaderConfigForRestore(d, restore.Spec)
d.Println()
describeRestoreItemOperations(ctx, kbClient, d, restore, details, insecureSkipTLSVerify, caCertFile)
@@ -199,10 +196,18 @@ func DescribeRestore(ctx context.Context, kbClient kbclient.Client, restore *vel
})
}
// DescribeUploaderConfigForRestore describes uploader config in human-readable format
func DescribeUploaderConfigForRestore(d *Describer, spec velerov1api.RestoreSpec) {
d.Printf("Uploader config:\n")
d.Printf("\tWrite Sparse Files:\t%T\n", boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles))
// describeUploaderConfigForRestore describes uploader config in human-readable format
func describeUploaderConfigForRestore(d *Describer, spec velerov1api.RestoreSpec) {
if spec.UploaderConfig != nil {
d.Println()
d.Printf("Uploader config:\n")
if boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles) {
d.Printf("\tWrite Sparse Files:\t%v\n", boolptr.IsSetToTrue(spec.UploaderConfig.WriteSparseFiles))
}
if spec.UploaderConfig.ParallelFilesDownload > 0 {
d.Printf("\tParallel Restore:\t%d\n", spec.UploaderConfig.ParallelFilesDownload)
}
}
}
func describeRestoreItemOperations(ctx context.Context, kbClient kbclient.Client, d *Describer, restore *velerov1api.Restore, details bool, insecureSkipTLSVerify bool, caCertPath string) {

View File

@@ -12,6 +12,7 @@ import (
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/results"
)
@@ -181,3 +182,58 @@ func TestDescribePodVolumeRestores(t *testing.T) {
})
}
}
func TestDescribeUploaderConfigForRestore(t *testing.T) {
cases := []struct {
name string
spec velerov1api.RestoreSpec
expected string
}{
{
name: "UploaderConfigNil",
spec: velerov1api.RestoreSpec{}, // Create a RestoreSpec with nil UploaderConfig
expected: "",
},
{
name: "test",
spec: velerov1api.RestoreSpec{
UploaderConfig: &velerov1api.UploaderConfigForRestore{
WriteSparseFiles: boolptr.True(),
ParallelFilesDownload: 4,
},
},
expected: "\nUploader config:\n Write Sparse Files: true\n Parallel Restore: 4\n",
},
{
name: "WriteSparseFiles test",
spec: velerov1api.RestoreSpec{
UploaderConfig: &velerov1api.UploaderConfigForRestore{
WriteSparseFiles: boolptr.True(),
},
},
expected: "\nUploader config:\n Write Sparse Files: true\n",
},
{
name: "ParallelFilesDownload test",
spec: velerov1api.RestoreSpec{
UploaderConfig: &velerov1api.UploaderConfigForRestore{
ParallelFilesDownload: 4,
},
},
expected: "\nUploader config:\n Parallel Restore: 4\n",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
d := &Describer{
Prefix: "",
out: &tabwriter.Writer{},
buf: &bytes.Buffer{},
}
d.out.Init(d.buf, 0, 8, 2, ' ', 0)
describeUploaderConfigForRestore(d, tc.spec)
d.out.Flush()
assert.Equal(t, tc.expected, d.buf.String(), "Output should match expected")
})
}
}

View File

@@ -411,6 +411,8 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
IgnorePermissionErrors: true,
}
restoreConcurrency := runtime.NumCPU()
if len(uploaderCfg) > 0 {
writeSparseFiles, err := uploaderutil.GetWriteSparseFiles(uploaderCfg)
if err != nil {
@@ -419,9 +421,17 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
if writeSparseFiles {
fsOutput.WriteSparseFiles = true
}
concurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg)
if err != nil {
return 0, 0, errors.Wrap(err, "failed to get parallel restore uploader config")
}
if concurrency > 0 {
restoreConcurrency = concurrency
}
}
log.Debugf("Restore filesystem output %v", fsOutput)
log.Debugf("Restore filesystem output %v, concurrency %d", fsOutput, restoreConcurrency)
err = fsOutput.Init(ctx)
if err != nil {
@@ -436,7 +446,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
}
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
Parallel: restoreConcurrency,
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,
ProgressCallback: func(ctx context.Context, stats restore.Stats) {

View File

@@ -246,5 +246,9 @@ func (rp *resticProvider) parseRestoreExtraFlags(uploaderCfg map[string]string)
extraFlags = append(extraFlags, "--sparse")
}
if restoreConcurrency, err := uploaderutil.GetRestoreConcurrency(uploaderCfg); err == nil && restoreConcurrency > 0 {
return extraFlags, errors.New("restic does not support parallel restore")
}
return extraFlags, nil
}

View File

@@ -434,6 +434,13 @@ func TestParseUploaderConfig(t *testing.T) {
},
expectedFlags: []string{},
},
{
name: "RestoreConcorrency",
uploaderConfig: map[string]string{
"Parallel": "5",
},
expectedFlags: []string{},
},
}
for _, testCase := range testCases {

View File

@@ -27,6 +27,7 @@ import (
const (
ParallelFilesUpload = "ParallelFilesUpload"
WriteSparseFiles = "WriteSparseFiles"
RestoreConcurrency = "ParallelFilesDownload"
)
func StoreBackupConfig(config *velerov1api.UploaderConfigForBackup) map[string]string {
@@ -42,6 +43,10 @@ func StoreRestoreConfig(config *velerov1api.UploaderConfigForRestore) map[string
} else {
data[WriteSparseFiles] = strconv.FormatBool(false)
}
if config.ParallelFilesDownload > 0 {
data[RestoreConcurrency] = strconv.Itoa(config.ParallelFilesDownload)
}
return data
}
@@ -68,3 +73,15 @@ func GetWriteSparseFiles(uploaderCfg map[string]string) (bool, error) {
}
return false, nil
}
func GetRestoreConcurrency(uploaderCfg map[string]string) (int, error) {
restoreConcurrency, ok := uploaderCfg[RestoreConcurrency]
if ok {
restoreConcurrencyInt, err := strconv.Atoi(restoreConcurrency)
if err != nil {
return 0, errors.Wrap(err, "failed to parse RestoreConcurrency config")
}
return restoreConcurrencyInt, nil
}
return 0, nil
}

View File

@@ -78,6 +78,16 @@ func TestStoreRestoreConfig(t *testing.T) {
WriteSparseFiles: "false", // Assuming default value is false for nil case
},
},
{
name: "Parallel is set",
config: &velerov1api.UploaderConfigForRestore{
ParallelFilesDownload: 5,
},
expectedData: map[string]string{
RestoreConcurrency: "5",
WriteSparseFiles: "false",
},
},
}
for _, tc := range testCases {
@@ -180,3 +190,53 @@ func TestGetWriteSparseFiles(t *testing.T) {
})
}
}
func TestGetRestoreConcurrency(t *testing.T) {
testCases := []struct {
Name string
UploaderCfg map[string]string
ExpectedResult int
ExpectedError bool
ExpectedErrorMsg string
}{
{
Name: "Valid Configuration",
UploaderCfg: map[string]string{RestoreConcurrency: "10"},
ExpectedResult: 10,
ExpectedError: false,
},
{
Name: "Missing Configuration",
UploaderCfg: map[string]string{},
ExpectedResult: 0,
ExpectedError: false,
},
{
Name: "Invalid Configuration",
UploaderCfg: map[string]string{RestoreConcurrency: "not_an_integer"},
ExpectedResult: 0,
ExpectedError: true,
ExpectedErrorMsg: "failed to parse RestoreConcurrency config: strconv.Atoi: parsing \"not_an_integer\": invalid syntax",
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
result, err := GetRestoreConcurrency(tc.UploaderCfg)
if tc.ExpectedError {
if err.Error() != tc.ExpectedErrorMsg {
t.Errorf("Expected error message %s, but got %s", tc.ExpectedErrorMsg, err.Error())
}
} else {
if err != nil {
t.Errorf("Expected no error, but got %v", err)
}
}
if result != tc.ExpectedResult {
t.Errorf("Expected result %d, but got %d", tc.ExpectedResult, result)
}
})
}
}