uploader flush buffer for restore

Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
Lyndon-Li
2025-12-29 17:57:57 +08:00
parent 341597f542
commit e2bbace03b
6 changed files with 133 additions and 1 deletions

View File

@@ -101,3 +101,7 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e
return nil
}
func (o *BlockOutput) Flush() error {
return flushVolume(o.targetFileName)
}

View File

@@ -40,3 +40,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
return fmt.Errorf("block mode is not supported for Windows")
}
func (o *BlockOutput) Flush() error {
return flushVolume(o.targetFileName)
}

View File

@@ -0,0 +1,21 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import "github.com/pkg/errors"
var errFlushUnsupported = errors.New("flush is not supported")

View File

@@ -0,0 +1,47 @@
//go:build linux
// +build linux
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"os"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
func flushVolume(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return errors.Wrapf(err, "error opening dir %v", dirPath)
}
raw, err := dir.SyscallConn()
if err != nil {
return errors.Wrapf(err, "error getting handle of dir %v", dirPath)
}
raw.Control(func(fd uintptr) {
if e := unix.Syncfs(int(fd)); e != nil {
err = e
}
})
return errors.Wrapf(err, "error syncing dir %v", dirPath)
}

View File

@@ -0,0 +1,24 @@
//go:build !linux
// +build !linux
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
func flushVolume(_ string) error {
return errFlushUnsupported
}

View File

@@ -375,6 +375,18 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
return result, nil
}
type flusher interface {
Flush() error
}
type fileSystemRestoreOutput struct {
*restore.FilesystemOutput
}
func (o *fileSystemRestoreOutput) Flush() error {
return flushVolume(o.TargetPath)
}
// Restore restore specific sourcePath with given snapshotID and update progress
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string,
log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
@@ -435,10 +447,21 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
}
var output restore.Output = fsOutput
var fls flusher
if volMode == uploader.PersistentVolumeBlock {
output = &BlockOutput{
o := &BlockOutput{
FilesystemOutput: fsOutput,
}
output = o
fls = o
} else {
o := &fileSystemRestoreOutput{
FilesystemOutput: fsOutput,
}
output = o
fls = o
}
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
@@ -453,5 +476,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
if err != nil {
return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target")
}
if err := fls.Flush(); err != nil {
if err == errFlushUnsupported {
log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS)
} else {
return 0, 0, errors.Wrapf(err, "Failed to flush data to target")
}
}
return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil
}