From e2bbace03bec5519a62efb60b0c99cd9988cdb6e Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 29 Dec 2025 17:57:57 +0800 Subject: [PATCH] uploader flush buffer for restore Signed-off-by: Lyndon-Li --- pkg/uploader/kopia/block_restore.go | 4 ++ pkg/uploader/kopia/block_restore_windows.go | 4 ++ pkg/uploader/kopia/flush.go | 21 +++++++++ pkg/uploader/kopia/flush_volume_linux.go | 47 +++++++++++++++++++++ pkg/uploader/kopia/flush_volume_other.go | 24 +++++++++++ pkg/uploader/kopia/snapshot.go | 34 ++++++++++++++- 6 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 pkg/uploader/kopia/flush.go create mode 100644 pkg/uploader/kopia/flush_volume_linux.go create mode 100644 pkg/uploader/kopia/flush_volume_other.go diff --git a/pkg/uploader/kopia/block_restore.go b/pkg/uploader/kopia/block_restore.go index b9fba99fc..1065f3293 100644 --- a/pkg/uploader/kopia/block_restore.go +++ b/pkg/uploader/kopia/block_restore.go @@ -101,3 +101,7 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e return nil } + +func (o *BlockOutput) Flush() error { + return flushVolume(o.targetFileName) +} diff --git a/pkg/uploader/kopia/block_restore_windows.go b/pkg/uploader/kopia/block_restore_windows.go index 702e0a5e2..0110876a7 100644 --- a/pkg/uploader/kopia/block_restore_windows.go +++ b/pkg/uploader/kopia/block_restore_windows.go @@ -40,3 +40,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { return fmt.Errorf("block mode is not supported for Windows") } + +func (o *BlockOutput) Flush() error { + return flushVolume(o.targetFileName) +} diff --git a/pkg/uploader/kopia/flush.go b/pkg/uploader/kopia/flush.go new file mode 100644 index 000000000..8e6c81aa5 --- /dev/null +++ b/pkg/uploader/kopia/flush.go @@ -0,0 +1,21 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import "github.com/pkg/errors" + +var errFlushUnsupported = errors.New("flush is not supported") diff --git a/pkg/uploader/kopia/flush_volume_linux.go b/pkg/uploader/kopia/flush_volume_linux.go new file mode 100644 index 000000000..708d60f48 --- /dev/null +++ b/pkg/uploader/kopia/flush_volume_linux.go @@ -0,0 +1,47 @@ +//go:build linux +// +build linux + +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "os" + + "github.com/pkg/errors" + "golang.org/x/sys/unix" +) + +func flushVolume(dirPath string) error { + dir, err := os.Open(dirPath) + if err != nil { + return errors.Wrapf(err, "error opening dir %v", dirPath) + } + + raw, err := dir.SyscallConn() + if err != nil { + return errors.Wrapf(err, "error getting handle of dir %v", dirPath) + } + + raw.Control(func(fd uintptr) { + if e := unix.Syncfs(int(fd)); e != nil { + err = e + } + }) + + return errors.Wrapf(err, "error syncing dir %v", dirPath) +} diff --git a/pkg/uploader/kopia/flush_volume_other.go b/pkg/uploader/kopia/flush_volume_other.go new file mode 100644 index 000000000..0c0959bda --- /dev/null +++ b/pkg/uploader/kopia/flush_volume_other.go @@ -0,0 +1,24 @@ +//go:build !linux +// +build !linux + +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +func flushVolume(_ string) error { + return errFlushUnsupported +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index fce620eb7..679440635 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -375,6 +375,18 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour return result, nil } +type flusher interface { + Flush() error +} + +type fileSystemRestoreOutput struct { + *restore.FilesystemOutput +} + +func (o *fileSystemRestoreOutput) Flush() error { + return flushVolume(o.TargetPath) +} + // Restore restore specific sourcePath with given snapshotID and update progress func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { @@ -435,10 +447,21 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, } var output restore.Output = fsOutput + var fls flusher if volMode == uploader.PersistentVolumeBlock { - output = &BlockOutput{ + o := &BlockOutput{ FilesystemOutput: fsOutput, } + + output = o + fls = o + } else { + o := &fileSystemRestoreOutput{ + FilesystemOutput: fsOutput, + } + + output = o + fls = o } stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ @@ -453,5 +476,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, if err != nil { return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") } + + if err := fls.Flush(); err != nil { + if err == errFlushUnsupported { + log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS) + } else { + return 0, 0, errors.Wrapf(err, "Failed to flush data to target") + } + } + return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil }