Merge branch 'main' into add-schedule-interval-metric

This commit is contained in:
Quang
2026-03-03 01:00:32 +11:00
committed by GitHub
9 changed files with 200 additions and 3 deletions

View File

@@ -0,0 +1 @@
Fix issue #9460, flush buffer before data mover completes

2
go.mod
View File

@@ -43,6 +43,7 @@ require (
go.uber.org/zap v1.27.1
golang.org/x/mod v0.30.0
golang.org/x/oauth2 v0.33.0
golang.org/x/sys v0.38.0
golang.org/x/text v0.31.0
google.golang.org/api v0.256.0
google.golang.org/grpc v1.77.0
@@ -183,7 +184,6 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.38.0 // indirect

View File

@@ -35,6 +35,7 @@ type BlockOutput struct {
*restore.FilesystemOutput
targetFileName string
targetFile *os.File
}
var _ restore.Output = &BlockOutput{}
@@ -52,7 +53,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
if err != nil {
return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
}
defer targetFile.Close()
o.targetFile = targetFile
buffer := make([]byte, bufferSize)
@@ -101,3 +102,23 @@ func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e
return nil
}
func (o *BlockOutput) Flush() error {
if o.targetFile != nil {
if err := o.targetFile.Sync(); err != nil {
return errors.Wrapf(err, "error syncing block dev %v", o.targetFileName)
}
}
return nil
}
func (o *BlockOutput) Terminate() error {
if o.targetFile != nil {
if err := o.targetFile.Close(); err != nil {
return errors.Wrapf(err, "error closing block dev %v", o.targetFileName)
}
}
return nil
}

View File

@@ -40,3 +40,11 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
return fmt.Errorf("block mode is not supported for Windows")
}
func (o *BlockOutput) Flush() error {
return flushVolume(o.targetFileName)
}
func (o *BlockOutput) Terminate() error {
return nil
}

View File

@@ -0,0 +1,50 @@
//go:build linux
// +build linux
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"os"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
func flushVolume(dirPath string) error {
dir, err := os.Open(dirPath)
if err != nil {
return errors.Wrapf(err, "error opening dir %v", dirPath)
}
raw, err := dir.SyscallConn()
if err != nil {
return errors.Wrapf(err, "error getting handle of dir %v", dirPath)
}
var syncErr error
if err := raw.Control(func(fd uintptr) {
if e := unix.Syncfs(int(fd)); e != nil {
syncErr = e
}
}); err != nil {
return errors.Wrapf(err, "error calling fs sync from %v", dirPath)
}
return errors.Wrapf(syncErr, "error syncing fs from %v", dirPath)
}

View File

@@ -0,0 +1,24 @@
//go:build !linux
// +build !linux
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
func flushVolume(_ string) error {
return errFlushUnsupported
}

View File

@@ -0,0 +1,30 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kopia
import (
"github.com/kopia/kopia/snapshot/restore"
"github.com/pkg/errors"
)
var errFlushUnsupported = errors.New("flush is not supported")
type RestoreOutput interface {
restore.Output
Flush() error
Terminate() error
}

View File

@@ -53,6 +53,7 @@ var loadSnapshotFunc = snapshot.LoadSnapshot
var listSnapshotsFunc = snapshot.ListSnapshots
var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry
var flushVolumeFunc = flushVolume
const UploaderConfigMultipartKey = "uploader-multipart"
const MaxErrorReported = 10
@@ -375,6 +376,18 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour
return result, nil
}
type fileSystemRestoreOutput struct {
*restore.FilesystemOutput
}
func (o *fileSystemRestoreOutput) Flush() error {
return flushVolumeFunc(o.TargetPath)
}
func (o *fileSystemRestoreOutput) Terminate() error {
return nil
}
// Restore restore specific sourcePath with given snapshotID and update progress
func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string,
log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
@@ -434,13 +447,23 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
return 0, 0, errors.Wrap(err, "error to init output")
}
var output restore.Output = fsOutput
var output RestoreOutput
if volMode == uploader.PersistentVolumeBlock {
output = &BlockOutput{
FilesystemOutput: fsOutput,
}
} else {
output = &fileSystemRestoreOutput{
FilesystemOutput: fsOutput,
}
}
defer func() {
if err := output.Terminate(); err != nil {
log.Warnf("error terminating restore output for %v", path)
}
}()
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: restoreConcurrency,
RestoreDirEntryAtDepth: math.MaxInt32,
@@ -453,5 +476,16 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress,
if err != nil {
return 0, 0, errors.Wrapf(err, "Failed to copy snapshot data to the target")
}
if err := output.Flush(); err != nil {
if err == errFlushUnsupported {
log.Warnf("Skip flushing data for %v under the current OS %v", path, runtime.GOOS)
} else {
return 0, 0, errors.Wrapf(err, "Failed to flush data to target")
}
} else {
log.Infof("Flush done for volume dir %v", path)
}
return stat.RestoredTotalFileSize, stat.RestoredFileCount, nil
}

View File

@@ -675,6 +675,7 @@ func TestRestore(t *testing.T) {
invalidManifestType bool
filesystemEntryFunc func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error)
restoreEntryFunc func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error)
flushVolumeFunc func(string) error
dest string
expectedBytes int64
expectedCount int32
@@ -757,6 +758,30 @@ func TestRestore(t *testing.T) {
volMode: uploader.PersistentVolumeBlock,
dest: "/tmp",
},
{
name: "Flush is not supported",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
flushVolumeFunc: func(string) error { return errFlushUnsupported },
snapshotID: "snapshot-123",
expectedError: nil,
},
{
name: "Flush fails",
filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) {
return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil
},
restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) {
return restore.Stats{}, nil
},
flushVolumeFunc: func(string) error { return errors.New("fake-flush-error") },
snapshotID: "snapshot-123",
expectedError: errors.New("fake-flush-error"),
},
}
em := &manifest.EntryMetadata{
@@ -784,6 +809,10 @@ func TestRestore(t *testing.T) {
restoreEntryFunc = tc.restoreEntryFunc
}
if tc.flushVolumeFunc != nil {
flushVolumeFunc = tc.flushVolumeFunc
}
repoWriterMock := &repomocks.RepositoryWriter{}
repoWriterMock.On("GetManifest", mock.Anything, mock.Anything, mock.Anything).Return(em, nil)
repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil)