From 060b3364f20e0cb1223565b69a7a235c44aa59a9 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 29 Dec 2025 17:57:57 +0800 Subject: [PATCH 1/4] uploader flush buffer for restore Signed-off-by: Lyndon-Li --- pkg/uploader/kopia/block_restore.go | 4 ++ pkg/uploader/kopia/block_restore_windows.go | 4 ++ pkg/uploader/kopia/flush.go | 21 +++++++++ pkg/uploader/kopia/flush_volume_linux.go | 47 +++++++++++++++++++++ pkg/uploader/kopia/flush_volume_other.go | 24 +++++++++++ pkg/uploader/kopia/snapshot.go | 34 ++++++++++++++- 6 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 pkg/uploader/kopia/flush.go create mode 100644 pkg/uploader/kopia/flush_volume_linux.go create mode 100644 pkg/uploader/kopia/flush_volume_other.go diff --git a/pkg/uploader/kopia/block_restore.go b/pkg/uploader/kopia/block_restore.go index b9fba99fc..1065f3293 100644 --- a/pkg/uploader/kopia/block_restore.go +++ b/pkg/uploader/kopia/block_restore.go @@ -101,3 +101,7 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e return nil } + +func (o *BlockOutput) Flush() error { + return flushVolume(o.targetFileName) +} diff --git a/pkg/uploader/kopia/block_restore_windows.go b/pkg/uploader/kopia/block_restore_windows.go index 702e0a5e2..0110876a7 100644 --- a/pkg/uploader/kopia/block_restore_windows.go +++ b/pkg/uploader/kopia/block_restore_windows.go @@ -40,3 +40,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { return fmt.Errorf("block mode is not supported for Windows") } + +func (o *BlockOutput) Flush() error { + return flushVolume(o.targetFileName) +} diff --git a/pkg/uploader/kopia/flush.go b/pkg/uploader/kopia/flush.go new file mode 100644 index 000000000..8e6c81aa5 --- /dev/null +++ b/pkg/uploader/kopia/flush.go @@ -0,0 +1,21 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import "github.com/pkg/errors" + +var errFlushUnsupported = errors.New("flush is not supported") diff --git a/pkg/uploader/kopia/flush_volume_linux.go b/pkg/uploader/kopia/flush_volume_linux.go new file mode 100644 index 000000000..708d60f48 --- /dev/null +++ b/pkg/uploader/kopia/flush_volume_linux.go @@ -0,0 +1,47 @@ +//go:build linux +// +build linux + +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "os" + + "github.com/pkg/errors" + "golang.org/x/sys/unix" +) + +func flushVolume(dirPath string) error { + dir, err := os.Open(dirPath) + if err != nil { + return errors.Wrapf(err, "error opening dir %v", dirPath) + } + + raw, err := dir.SyscallConn() + if err != nil { + return errors.Wrapf(err, "error getting handle of dir %v", dirPath) + } + + raw.Control(func(fd uintptr) { + if e := unix.Syncfs(int(fd)); e != nil { + err = e + } + }) + + return errors.Wrapf(err, "error syncing dir %v", dirPath) +} diff --git a/pkg/uploader/kopia/flush_volume_other.go b/pkg/uploader/kopia/flush_volume_other.go new file mode 100644 index 000000000..0c0959bda --- /dev/null +++ b/pkg/uploader/kopia/flush_volume_other.go @@ -0,0 +1,24 @@ +//go:build !linux +// +build !linux + +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +func flushVolume(_ string) error { + return errFlushUnsupported +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index fce620eb7..679440635 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -375,6 +375,18 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour return result, nil } +type flusher interface { + Flush() error +} + +type fileSystemRestoreOutput struct { + *restore.FilesystemOutput +} + +func (o *fileSystemRestoreOutput) Flush() error { + return flushVolume(o.TargetPath) +} + // Restore restore specific sourcePath with given snapshotID and update progress func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { @@ -435,10 +447,21 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, } var output restore.Output = fsOutput + var fls flusher if volMode == uploader.PersistentVolumeBlock { - output = &BlockOutput{ + o := &BlockOutput{ FilesystemOutput: fsOutput, } + + output = o + fls = o + } else { + o := &fileSystemRestoreOutput{ + FilesystemOutput: fsOutput, + } + + output = o + fls = o } stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ @@ -453,5 +476,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, if err != nil { return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") } + + if err := fls.Flush(); err != nil { + if err == errFlushUnsupported { + log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS) + } else { + return 0, 0, errors.Wrapf(err, "Failed to flush data to target") + } + } + return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil } From 89c5182c3ccd5938e251a0994dc5bc971ec62f91 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 26 Jan 2026 11:34:37 +0800 Subject: [PATCH 2/4] flush volume after restore Signed-off-by: Lyndon-Li --- pkg/uploader/kopia/flush.go | 4 ++++ pkg/uploader/kopia/flush_volume_linux.go | 2 +- pkg/uploader/kopia/snapshot.go | 19 ++++++++-------- pkg/uploader/kopia/snapshot_test.go | 29 ++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pkg/uploader/kopia/flush.go b/pkg/uploader/kopia/flush.go index 8e6c81aa5..04c10c7f6 100644 --- a/pkg/uploader/kopia/flush.go +++ b/pkg/uploader/kopia/flush.go @@ -19,3 +19,7 @@ package kopia import "github.com/pkg/errors" var errFlushUnsupported = errors.New("flush is not supported") + +type Flusher interface { + Flush() error +} diff --git a/pkg/uploader/kopia/flush_volume_linux.go b/pkg/uploader/kopia/flush_volume_linux.go index 708d60f48..3f4901614 100644 --- a/pkg/uploader/kopia/flush_volume_linux.go +++ b/pkg/uploader/kopia/flush_volume_linux.go @@ -43,5 +43,5 @@ func flushVolume(dirPath string) error { } }) - return errors.Wrapf(err, "error syncing dir %v", dirPath) + return errors.Wrapf(err, "error syncing fs from %v", dirPath) } diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 679440635..ca05f7568 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -53,6 +53,7 @@ var loadSnapshotFunc = snapshot.LoadSnapshot var listSnapshotsFunc = snapshot.ListSnapshots var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath var restoreEntryFunc = restore.Entry +var flushVolumeFunc = flushVolume const UploaderConfigMultipartKey = "uploader-multipart" const MaxErrorReported = 10 @@ -375,16 +376,12 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour return result, nil } -type flusher interface { - Flush() error -} - type fileSystemRestoreOutput struct { *restore.FilesystemOutput } func (o *fileSystemRestoreOutput) Flush() error { - return flushVolume(o.TargetPath) + return flushVolumeFunc(o.TargetPath) } // Restore restore specific sourcePath with given snapshotID and update progress @@ -446,22 +443,22 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrap(err, "error to init output") } - var output restore.Output = fsOutput - var fls flusher + var output restore.Output + var flusher Flusher if volMode == uploader.PersistentVolumeBlock { o := &BlockOutput{ FilesystemOutput: fsOutput, } output = o - fls = o + flusher = o } else { o := &fileSystemRestoreOutput{ FilesystemOutput: fsOutput, } output = o - fls = o + flusher = o } stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ @@ -477,12 +474,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") } - if err := fls.Flush(); err != nil { + if err := flusher.Flush(); err != nil { if err == errFlushUnsupported { log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS) } else { return 0, 0, errors.Wrapf(err, "Failed to flush data to target") } + } else { + log.Warnf("Flush done for volume dir %v", path) } return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 2423ba590..984b92af5 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -675,6 +675,7 @@ func TestRestore(t *testing.T) { invalidManifestType bool filesystemEntryFunc func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) restoreEntryFunc func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) + flushVolumeFunc func(string) error dest string expectedBytes int64 expectedCount int32 @@ -757,6 +758,30 @@ func TestRestore(t *testing.T) { volMode: uploader.PersistentVolumeBlock, dest: "/tmp", }, + { + name: "Flush is not supported", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + return restore.Stats{}, nil + }, + flushVolumeFunc: func(string) error { return errFlushUnsupported }, + snapshotID: "snapshot-123", + expectedError: nil, + }, + { + name: "Flush fails", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + return restore.Stats{}, nil + }, + flushVolumeFunc: func(string) error { return errors.New("fake-flush-error") }, + snapshotID: "snapshot-123", + expectedError: errors.New("fake-flush-error"), + }, } em := &manifest.EntryMetadata{ @@ -784,6 +809,10 @@ func TestRestore(t *testing.T) { restoreEntryFunc = tc.restoreEntryFunc } + if tc.flushVolumeFunc != nil { + flushVolumeFunc = tc.flushVolumeFunc + } + repoWriterMock := &repomocks.RepositoryWriter{} repoWriterMock.On("GetManifest", mock.Anything, mock.Anything, mock.Anything).Return(em, nil) repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil) From 9cada8fc11dacc31f050c914b4232fc7a31ea3b7 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 26 Feb 2026 13:43:58 +0800 Subject: [PATCH 3/4] issue 9460: flush buffer when uploader completes Signed-off-by: Lyndon-Li --- changelogs/unreleased/9561-Lyndon-Li‎‎ | 1 + go.mod | 2 +- pkg/uploader/kopia/block_restore.go | 21 +++++++++++++-- pkg/uploader/kopia/flush_volume_linux.go | 11 +++++--- .../kopia/{flush.go => restore_output.go} | 9 +++++-- pkg/uploader/kopia/snapshot.go | 27 ++++++++++--------- 6 files changed, 50 insertions(+), 21 deletions(-) create mode 100644 changelogs/unreleased/9561-Lyndon-Li‎‎ rename pkg/uploader/kopia/{flush.go => restore_output.go} (82%) diff --git a/changelogs/unreleased/9561-Lyndon-Li‎‎ b/changelogs/unreleased/9561-Lyndon-Li‎‎ new file mode 100644 index 000000000..e6661e44d --- /dev/null +++ b/changelogs/unreleased/9561-Lyndon-Li‎‎ @@ -0,0 +1 @@ +Fix issue #9460, flush buffer before data mover completes \ No newline at end of file diff --git a/go.mod b/go.mod index 65177f63d..8fbfbef90 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( go.uber.org/zap v1.27.1 golang.org/x/mod v0.30.0 golang.org/x/oauth2 v0.33.0 + golang.org/x/sys v0.38.0 golang.org/x/text v0.31.0 google.golang.org/api v0.256.0 google.golang.org/grpc v1.77.0 @@ -183,7 +184,6 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sync v0.18.0 // indirect - golang.org/x/sys v0.38.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.38.0 // indirect diff --git a/pkg/uploader/kopia/block_restore.go b/pkg/uploader/kopia/block_restore.go index 1065f3293..4f28a59de 100644 --- a/pkg/uploader/kopia/block_restore.go +++ b/pkg/uploader/kopia/block_restore.go @@ -35,6 +35,7 @@ type BlockOutput struct { *restore.FilesystemOutput targetFileName string + targetFile *os.File } var _ restore.Output = &BlockOutput{} @@ -52,7 +53,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote if err != nil { return errors.Wrapf(err, "failed to open file %s", o.targetFileName) } - defer targetFile.Close() + o.targetFile = targetFile buffer := make([]byte, bufferSize) @@ -103,5 +104,21 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e } func (o *BlockOutput) Flush() error { - return flushVolume(o.targetFileName) + if o.targetFile != nil { + if err := o.targetFile.Sync(); err != nil { + return errors.Wrapf(err, "error syncing block dev %v", o.targetFileName) + } + } + + return nil +} + +func (o *BlockOutput) Terminate() error { + if o.targetFile != nil { + if err := o.targetFile.Close(); err != nil { + return errors.Wrapf(err, "error closing block dev %v", o.targetFileName) + } + } + + return nil } diff --git a/pkg/uploader/kopia/flush_volume_linux.go b/pkg/uploader/kopia/flush_volume_linux.go index 3f4901614..98234e1b9 100644 --- a/pkg/uploader/kopia/flush_volume_linux.go +++ b/pkg/uploader/kopia/flush_volume_linux.go @@ -37,11 +37,14 @@ func flushVolume(dirPath string) error { return errors.Wrapf(err, "error getting handle of dir %v", dirPath) } - raw.Control(func(fd uintptr) { + var syncErr error + if err := raw.Control(func(fd uintptr) { if e := unix.Syncfs(int(fd)); e != nil { - err = e + syncErr = e } - }) + }); err != nil { + return errors.Wrapf(err, "error calling fs sync from %v", dirPath) + } - return errors.Wrapf(err, "error syncing fs from %v", dirPath) + return errors.Wrapf(syncErr, "error syncing fs from %v", dirPath) } diff --git a/pkg/uploader/kopia/flush.go b/pkg/uploader/kopia/restore_output.go similarity index 82% rename from pkg/uploader/kopia/flush.go rename to pkg/uploader/kopia/restore_output.go index 04c10c7f6..74311d38a 100644 --- a/pkg/uploader/kopia/flush.go +++ b/pkg/uploader/kopia/restore_output.go @@ -16,10 +16,15 @@ limitations under the License. package kopia -import "github.com/pkg/errors" +import ( + "github.com/kopia/kopia/snapshot/restore" + "github.com/pkg/errors" +) var errFlushUnsupported = errors.New("flush is not supported") -type Flusher interface { +type RestoreOutput interface { + restore.Output Flush() error + Terminate() error } diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index ca05f7568..1924ed35b 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -384,6 +384,10 @@ func (o *fileSystemRestoreOutput) Flush() error { return flushVolumeFunc(o.TargetPath) } +func (o *fileSystemRestoreOutput) Terminate() error { + return nil +} + // Restore restore specific sourcePath with given snapshotID and update progress func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { @@ -443,24 +447,23 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrap(err, "error to init output") } - var output restore.Output - var flusher Flusher + var output RestoreOutput if volMode == uploader.PersistentVolumeBlock { - o := &BlockOutput{ + output = &BlockOutput{ FilesystemOutput: fsOutput, } - - output = o - flusher = o } else { - o := &fileSystemRestoreOutput{ + output = &fileSystemRestoreOutput{ FilesystemOutput: fsOutput, } - - output = o - flusher = o } + defer func() { + if err := output.Terminate(); err != nil { + log.Warnf("error terminating restore output for %v", path) + } + }() + stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ Parallel: restoreConcurrency, RestoreDirEntryAtDepth: math.MaxInt32, @@ -474,14 +477,14 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target") } - if err := flusher.Flush(); err != nil { + if err := output.Flush(); err != nil { if err == errFlushUnsupported { log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS) } else { return 0, 0, errors.Wrapf(err, "Failed to flush data to target") } } else { - log.Warnf("Flush done for volume dir %v", path) + log.Infof("Flush done for volume dir %v", path) } return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil From ab31b811ee3adf4f74fcca702c834d7ee7f7024c Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Mon, 2 Mar 2026 15:11:54 +0800 Subject: [PATCH 4/4] fix compile error for Windows Signed-off-by: Lyndon-Li --- pkg/uploader/kopia/block_restore_windows.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/uploader/kopia/block_restore_windows.go b/pkg/uploader/kopia/block_restore_windows.go index 0110876a7..c747c3a91 100644 --- a/pkg/uploader/kopia/block_restore_windows.go +++ b/pkg/uploader/kopia/block_restore_windows.go @@ -44,3 +44,7 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e func (o *BlockOutput) Flush() error { return flushVolume(o.targetFileName) } + +func (o *BlockOutput) Terminate() error { + return nil +}