From 4f34ae17a37aa4603368822eb62f8de144a4d445 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 28 Apr 2026 18:06:46 +0800 Subject: [PATCH 1/4] add incremental aware object writer Signed-off-by: Lyndon-Li --- pkg/repository/udmrepo/kopialib/lib_repo.go | 128 +++++++++-- .../udmrepo/kopialib/lib_repo_ex_test.go | 199 ++++++++++++++++++ 2 files changed, 313 insertions(+), 14 deletions(-) create mode 100644 pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index 34559baf7..7f60b99bd 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -22,11 +22,13 @@ import ( "io" "os" "strings" + "sync" "sync/atomic" "time" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/repo/manifest" @@ -75,6 +77,19 @@ type kopiaObjectWriter struct { rawWriter object.Writer } +type kopiaObjectWriterEx struct { + ctx context.Context + rawRepoWriter repo.RepositoryWriter + parentEntries []object.IndirectObjectEntry + entries []object.IndirectObjectEntry + entryLock sync.Mutex + blockSize int64 + description string + compressor compression.Name + splitter string + logger logrus.FieldLogger +} + type openOptions struct { repoLogger io.Writer } @@ -85,6 +100,8 @@ const ( overwriteFullMaintainInterval = time.Duration(0) overwriteQuickMaintainInterval = time.Duration(0) repoBackend = "kopia" + fixedSplitter1M = "FIXED-1M" + fixedBlockSize = 1 << 20 ) var kopiaRepoOpen = repo.Open @@ -388,26 +405,86 @@ func (kr *kopiaRepository) Close(ctx context.Context) error { return nil } +func (kr *kopiaRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) { + return kr.rawRepo.ContentInfo(kopia.SetupKopiaLog(ctx, kr.logger), contentID) +} + +func (kr *kopiaRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { + directRepo, ok := kr.rawRepo.(repo.DirectRepository) + if !ok { + return nil, errors.New("invalid repo interface") + } + + return directRepo.ContentReader().GetContent(kopia.SetupKopiaLog(ctx, kr.logger), contentID) +} + +func (kr *kopiaRepository) PrefetchContents(ctx context.Context, contentIDs []content.ID, prefetchHint string) []content.ID { + return kr.rawRepo.PrefetchContents(kopia.SetupKopiaLog(ctx, kr.logger), contentIDs, prefetchHint) +} + +func (kr *kopiaRepository) getFlattenedEntries(ctx context.Context, rawID object.ID) ([]object.IndirectObjectEntry, error) { + indexObjectID, ok := rawID.IndexObjectID() + if !ok { + return nil, errors.Errorf("object is not an indirect object, %v", rawID) + } + + return object.LoadIndexObject(kopia.SetupKopiaLog(ctx, kr.logger), kr, indexObjectID) +} + func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) (udmrepo.ObjectWriter, error) { if kr.rawWriter == nil { return nil, errors.New("repo writer is closed or not open") } - writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{ - Description: opt.Description, - Prefix: index.IDPrefix(opt.Prefix), - AsyncWrites: opt.AsyncWrites, - Compressor: getCompressorForObject(opt), - MetadataCompressor: getMetadataCompressor(), - }) + var parentEntries []object.IndirectObjectEntry + if opt.AccessMode == udmrepo.ObjectDataAccessModeBlock { + if opt.ParentObject != "" { + kr.logger.Infof("Write object %s in block mode with parent %s", opt.Description, opt.ParentObject) - if writer == nil { - return nil, errors.Errorf("error creating writer for object %s", opt.Description) + rawID, err := object.ParseID(string(opt.ParentObject)) + if err != nil { + return nil, errors.Wrapf(err, "error parsing parent object ID from %v", opt.ParentObject) + } + + parentEntries, err = kr.getFlattenedEntries(ctx, rawID) + if err != nil { + return nil, errors.Wrapf(err, "error getting parent object entries from %v", opt.ParentObject) + } + } else { + kr.logger.Infof("Write object %s in block mode without parent", opt.Description) + } + + return &kopiaObjectWriterEx{ + ctx: ctx, + rawRepoWriter: kr.rawWriter, + parentEntries: parentEntries, + description: opt.Description, + compressor: getCompressorForObject(opt), + blockSize: fixedBlockSize, + splitter: fixedSplitter1M, + logger: kr.logger, + }, nil + } else { + if opt.ParentObject != "" { + return nil, errors.Errorf("parent object is only supported for block mode") + } + + writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{ + Description: opt.Description, + Prefix: index.IDPrefix(opt.Prefix), + AsyncWrites: opt.AsyncWrites, + Compressor: getCompressorForObject(opt), + MetadataCompressor: getMetadataCompressor(), + }) + + if writer == nil { + return nil, errors.Errorf("error creating writer for object %s", opt.Description) + } + + return &kopiaObjectWriter{ + rawWriter: writer, + }, nil } - - return &kopiaObjectWriter{ - rawWriter: writer, - }, nil } // TODO add implementation in following PRs @@ -571,7 +648,6 @@ func (kow *kopiaObjectWriter) Write(p []byte) (int, error) { return kow.rawWriter.Write(p) } -// TODO add implementation in following PRs func (kow *kopiaObjectWriter) WriteAt(p []byte, offset int64) (int, error) { return 0, errors.New("not supported") } @@ -617,6 +693,30 @@ func (kow *kopiaObjectWriter) Close() error { return nil } +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) { + return 0, nil +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) WriteAt(p []byte, offset int64) (int, error) { + return 0, nil +} + +func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) { + return udmrepo.ID(""), errors.New("not supported") +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) { + return udmrepo.ID(""), nil +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Close() error { + return nil +} + // getCompressorForObject returns the compressor for an object, at present, we don't support compression func getCompressorForObject(_ udmrepo.ObjectWriteOptions) compression.Name { return "" diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go new file mode 100644 index 000000000..1be80006f --- /dev/null +++ b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go @@ -0,0 +1,199 @@ +package kopialib + +import ( + "context" + "testing" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/object" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" + repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks" + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +type mockDirectRepository struct { + repo.DirectRepository + mock.Mock +} + +func (m *mockDirectRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) { + args := m.Called(ctx, contentID) + return args.Get(0).(content.Info), args.Error(1) +} + +func (m *mockDirectRepository) ContentReader() content.Reader { + args := m.Called() + return args.Get(0).(content.Reader) +} + +type mockContentReader struct { + content.Reader + mock.Mock +} + +func (m *mockContentReader) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { + args := m.Called(ctx, contentID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]byte), args.Error(1) +} + +func TestContentInfo(t *testing.T) { + testCases := []struct { + name string + rawRepo repo.Repository + contentID content.ID + expectedErr string + }{ + { + name: "success", + rawRepo: func() repo.Repository { + m := repomocks.NewMockRepository(t) + m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, nil) + return m + }(), + }, + { + name: "error", + rawRepo: func() repo.Repository { + m := repomocks.NewMockRepository(t) + m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, assert.AnError) + return m + }(), + expectedErr: assert.AnError.Error(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()} + _, err := kr.ContentInfo(context.Background(), tc.contentID) + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetContent(t *testing.T) { + testCases := []struct { + name string + rawRepo repo.Repository + contentID content.ID + expectedErr string + }{ + { + name: "invalid repo interface", + rawRepo: repomocks.NewMockRepository(t), + expectedErr: "invalid repo interface", + }, + { + name: "success", + rawRepo: func() repo.Repository { + m := &mockDirectRepository{} + cr := &mockContentReader{} + cr.On("GetContent", mock.Anything, mock.Anything).Return([]byte("test"), nil) + m.On("ContentReader").Return(cr) + return m + }(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()} + _, err := kr.GetContent(context.Background(), tc.contentID) + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPrefetchContents(t *testing.T) { + mockRepo := repomocks.NewMockRepository(t) + id, _ := content.ParseID("123") + mockRepo.On("PrefetchContents", mock.Anything, mock.Anything, mock.Anything).Return([]content.ID{id}) + kr := &kopiaRepository{rawRepo: mockRepo, logger: velerotest.NewLogger()} + res := kr.PrefetchContents(context.Background(), []content.ID{id}, "hint") + assert.Equal(t, []content.ID{id}, res) +} + +func TestGetFlattenedEntries(t *testing.T) { + kr := &kopiaRepository{logger: velerotest.NewLogger()} + rawID := object.ID{} + _, err := kr.getFlattenedEntries(context.Background(), rawID) + assert.Error(t, err) + assert.Contains(t, err.Error(), "object is not an indirect object") +} + +func TestNewObjectWriterEx(t *testing.T) { + testCases := []struct { + name string + opt udmrepo.ObjectWriteOptions + rawWriter *repomocks.MockRepositoryWriter + expectedErr string + }{ + { + name: "block mode success without parent", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + }, + { + name: "block mode with parent, invalid parent ID", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + ParentObject: udmrepo.ID("invalid-parent"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "error to parse parent object ID from invalid-parent: malformed content ID: \"invalid-parent\": invalid content hash: encoding/hex: invalid byte: U+0069 'i'", + }, + { + name: "block mode with parent, valid ID but failed to load index", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + ParentObject: udmrepo.ID("I0123456789abcdef"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "error getting parent object entries from I0123456789abcdef: unexpected content error: invalid repo interface", + }, + { + name: "file mode with parent", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeFile, + ParentObject: udmrepo.ID("some-parent"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "parent object is only supported for block mode", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{logger: velerotest.NewLogger()} + if tc.rawWriter != nil { + kr.rawWriter = tc.rawWriter + } + + _, err := kr.NewObjectWriter(context.Background(), tc.opt) + + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + } + }) + } +} From 6a67f4a8a4aa88b08bf60dd1474d61bb6fe89659 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 8 May 2026 16:41:39 +0800 Subject: [PATCH 2/4] fix UT error Signed-off-by: Lyndon-Li --- pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go index 1be80006f..cd688666a 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go @@ -158,7 +158,7 @@ func TestNewObjectWriterEx(t *testing.T) { ParentObject: udmrepo.ID("invalid-parent"), }, rawWriter: repomocks.NewMockRepositoryWriter(t), - expectedErr: "error to parse parent object ID from invalid-parent: malformed content ID: \"invalid-parent\": invalid content hash: encoding/hex: invalid byte: U+0069 'i'", + expectedErr: "error parsing parent object ID from invalid-parent: malformed content ID: \"invalid-parent\": invalid content hash: encoding/hex: invalid byte: U+0069 'i'", }, { name: "block mode with parent, valid ID but failed to load index", From 596e774582b7c66adef78e6a7e0b6f19ee699f1d Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 22 May 2026 16:19:36 +0800 Subject: [PATCH 3/4] add incremental aware object writer Signed-off-by: Lyndon-Li --- changelogs/unreleased/9845-Lyndon-Li | 1 + pkg/repository/udmrepo/kopialib/lib_repo.go | 11 ++++------- pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go | 10 +++++++++- 3 files changed, 14 insertions(+), 8 deletions(-) create mode 100644 changelogs/unreleased/9845-Lyndon-Li diff --git a/changelogs/unreleased/9845-Lyndon-Li b/changelogs/unreleased/9845-Lyndon-Li new file mode 100644 index 000000000..6e08e098d --- /dev/null +++ b/changelogs/unreleased/9845-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #9823, add incremental ware object writer for block data mover \ No newline at end of file diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index 86e708e11..79610bbbd 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -22,7 +22,6 @@ import ( "io" "os" "strings" - "sync" "sync/atomic" "time" @@ -84,8 +83,6 @@ type kopiaObjectWriterEx struct { ctx context.Context rawRepoWriter repo.RepositoryWriter parentEntries []object.IndirectObjectEntry - entries []object.IndirectObjectEntry - entryLock sync.Mutex blockSize int64 description string compressor compression.Name @@ -855,12 +852,12 @@ func (kow *kopiaObjectWriter) Close() error { // TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) { - return 0, nil + return 0, errors.New("not implemented") } // TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) WriteAt(p []byte, offset int64) (int, error) { - return 0, nil + return 0, errors.New("not implemented") } func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) { @@ -869,12 +866,12 @@ func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) { // TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) { - return udmrepo.ID(""), nil + return udmrepo.ID(""), errors.New("not implemented") } // TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) Close() error { - return nil + return errors.New("not implemented") } // getCompressorForObject returns the compressor for an object, at present, we don't support compression diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go index cd688666a..ec45d1e81 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go @@ -133,7 +133,7 @@ func TestGetFlattenedEntries(t *testing.T) { kr := &kopiaRepository{logger: velerotest.NewLogger()} rawID := object.ID{} _, err := kr.getFlattenedEntries(context.Background(), rawID) - assert.Error(t, err) + require.Error(t, err) assert.Contains(t, err.Error(), "object is not an indirect object") } @@ -178,6 +178,14 @@ func TestNewObjectWriterEx(t *testing.T) { rawWriter: repomocks.NewMockRepositoryWriter(t), expectedErr: "parent object is only supported for block mode", }, + { + name: "block mode success with async writes", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + AsyncWrites: 4, + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + }, } for _, tc := range testCases { From ac1e472d53f91de07b69f6392fdf6be28cd9d1ab Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 26 May 2026 08:57:56 +0800 Subject: [PATCH 4/4] add incremental aware object writer Signed-off-by: Lyndon-Li --- changelogs/unreleased/9845-Lyndon-Li | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelogs/unreleased/9845-Lyndon-Li b/changelogs/unreleased/9845-Lyndon-Li index 6e08e098d..8cf7af71c 100644 --- a/changelogs/unreleased/9845-Lyndon-Li +++ b/changelogs/unreleased/9845-Lyndon-Li @@ -1 +1 @@ -Fix issue #9823, add incremental ware object writer for block data mover \ No newline at end of file +Fix issue #9823, add incremental aware object writer for block data mover \ No newline at end of file