diff --git a/changelogs/unreleased/9845-Lyndon-Li b/changelogs/unreleased/9845-Lyndon-Li new file mode 100644 index 000000000..8cf7af71c --- /dev/null +++ b/changelogs/unreleased/9845-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #9823, add incremental aware object writer for block data mover \ No newline at end of file diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index 2131baa5c..79610bbbd 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -28,6 +28,7 @@ import ( "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/repo/manifest" @@ -78,6 +79,17 @@ type kopiaObjectWriter struct { rawWriter object.Writer } +type kopiaObjectWriterEx struct { + ctx context.Context + rawRepoWriter repo.RepositoryWriter + parentEntries []object.IndirectObjectEntry + blockSize int64 + description string + compressor compression.Name + splitter string + logger logrus.FieldLogger +} + type openOptions struct { repoLogger io.Writer } @@ -88,6 +100,8 @@ const ( overwriteFullMaintainInterval = time.Duration(0) overwriteQuickMaintainInterval = time.Duration(0) repoBackend = "kopia" + fixedSplitter1M = "FIXED-1M" + fixedBlockSize = 1 << 20 ) var kopiaRepoOpen = repo.Open @@ -391,26 +405,86 @@ func (kr *kopiaRepository) Close(ctx context.Context) error { return nil } +func (kr *kopiaRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) { + return kr.rawRepo.ContentInfo(kopia.SetupKopiaLog(ctx, kr.logger), contentID) +} + +func (kr *kopiaRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { + directRepo, ok := kr.rawRepo.(repo.DirectRepository) + if !ok { + return nil, errors.New("invalid repo interface") + } + + return directRepo.ContentReader().GetContent(kopia.SetupKopiaLog(ctx, kr.logger), contentID) +} + +func (kr *kopiaRepository) PrefetchContents(ctx context.Context, contentIDs []content.ID, prefetchHint string) []content.ID { + return kr.rawRepo.PrefetchContents(kopia.SetupKopiaLog(ctx, kr.logger), contentIDs, prefetchHint) +} + +func (kr *kopiaRepository) getFlattenedEntries(ctx context.Context, rawID object.ID) ([]object.IndirectObjectEntry, error) { + indexObjectID, ok := rawID.IndexObjectID() + if !ok { + return nil, errors.Errorf("object is not an indirect object, %v", rawID) + } + + return object.LoadIndexObject(kopia.SetupKopiaLog(ctx, kr.logger), kr, indexObjectID) +} + func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) (udmrepo.ObjectWriter, error) { if kr.rawWriter == nil { return nil, errors.New("repo writer is closed or not open") } - writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{ - Description: opt.Description, - Prefix: index.IDPrefix(opt.Prefix), - AsyncWrites: opt.AsyncWrites, - Compressor: getCompressorForObject(opt), - MetadataCompressor: getMetadataCompressor(), - }) + var parentEntries []object.IndirectObjectEntry + if opt.AccessMode == udmrepo.ObjectDataAccessModeBlock { + if opt.ParentObject != "" { + kr.logger.Infof("Write object %s in block mode with parent %s", opt.Description, opt.ParentObject) - if writer == nil { - return nil, errors.Errorf("error creating writer for object %s", opt.Description) + rawID, err := object.ParseID(string(opt.ParentObject)) + if err != nil { + return nil, errors.Wrapf(err, "error parsing parent object ID from %v", opt.ParentObject) + } + + parentEntries, err = kr.getFlattenedEntries(ctx, rawID) + if err != nil { + return nil, errors.Wrapf(err, "error getting parent object entries from %v", opt.ParentObject) + } + } else { + kr.logger.Infof("Write object %s in block mode without parent", opt.Description) + } + + return &kopiaObjectWriterEx{ + ctx: ctx, + rawRepoWriter: kr.rawWriter, + parentEntries: parentEntries, + description: opt.Description, + compressor: getCompressorForObject(opt), + blockSize: fixedBlockSize, + splitter: fixedSplitter1M, + logger: kr.logger, + }, nil + } else { + if opt.ParentObject != "" { + return nil, errors.Errorf("parent object is only supported for block mode") + } + + writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{ + Description: opt.Description, + Prefix: index.IDPrefix(opt.Prefix), + AsyncWrites: opt.AsyncWrites, + Compressor: getCompressorForObject(opt), + MetadataCompressor: getMetadataCompressor(), + }) + + if writer == nil { + return nil, errors.Errorf("error creating writer for object %s", opt.Description) + } + + return &kopiaObjectWriter{ + rawWriter: writer, + }, nil } - - return &kopiaObjectWriter{ - rawWriter: writer, - }, nil } const kopiaDirStreamType = "kopia:directory" @@ -731,7 +805,6 @@ func (kow *kopiaObjectWriter) Write(p []byte) (int, error) { return kow.rawWriter.Write(p) } -// TODO add implementation in following PRs func (kow *kopiaObjectWriter) WriteAt(p []byte, offset int64) (int, error) { return 0, errors.New("not supported") } @@ -777,6 +850,30 @@ func (kow *kopiaObjectWriter) Close() error { return nil } +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) { + return 0, errors.New("not implemented") +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) WriteAt(p []byte, offset int64) (int, error) { + return 0, errors.New("not implemented") +} + +func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) { + return udmrepo.ID(""), errors.New("not supported") +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) { + return udmrepo.ID(""), errors.New("not implemented") +} + +// TODO add implementation in following PRs +func (kow *kopiaObjectWriterEx) Close() error { + return errors.New("not implemented") +} + // getCompressorForObject returns the compressor for an object, at present, we don't support compression func getCompressorForObject(_ udmrepo.ObjectWriteOptions) compression.Name { return "" diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go new file mode 100644 index 000000000..ec45d1e81 --- /dev/null +++ b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go @@ -0,0 +1,207 @@ +package kopialib + +import ( + "context" + "testing" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/object" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" + repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks" + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +type mockDirectRepository struct { + repo.DirectRepository + mock.Mock +} + +func (m *mockDirectRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) { + args := m.Called(ctx, contentID) + return args.Get(0).(content.Info), args.Error(1) +} + +func (m *mockDirectRepository) ContentReader() content.Reader { + args := m.Called() + return args.Get(0).(content.Reader) +} + +type mockContentReader struct { + content.Reader + mock.Mock +} + +func (m *mockContentReader) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) { + args := m.Called(ctx, contentID) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]byte), args.Error(1) +} + +func TestContentInfo(t *testing.T) { + testCases := []struct { + name string + rawRepo repo.Repository + contentID content.ID + expectedErr string + }{ + { + name: "success", + rawRepo: func() repo.Repository { + m := repomocks.NewMockRepository(t) + m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, nil) + return m + }(), + }, + { + name: "error", + rawRepo: func() repo.Repository { + m := repomocks.NewMockRepository(t) + m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, assert.AnError) + return m + }(), + expectedErr: assert.AnError.Error(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()} + _, err := kr.ContentInfo(context.Background(), tc.contentID) + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetContent(t *testing.T) { + testCases := []struct { + name string + rawRepo repo.Repository + contentID content.ID + expectedErr string + }{ + { + name: "invalid repo interface", + rawRepo: repomocks.NewMockRepository(t), + expectedErr: "invalid repo interface", + }, + { + name: "success", + rawRepo: func() repo.Repository { + m := &mockDirectRepository{} + cr := &mockContentReader{} + cr.On("GetContent", mock.Anything, mock.Anything).Return([]byte("test"), nil) + m.On("ContentReader").Return(cr) + return m + }(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()} + _, err := kr.GetContent(context.Background(), tc.contentID) + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPrefetchContents(t *testing.T) { + mockRepo := repomocks.NewMockRepository(t) + id, _ := content.ParseID("123") + mockRepo.On("PrefetchContents", mock.Anything, mock.Anything, mock.Anything).Return([]content.ID{id}) + kr := &kopiaRepository{rawRepo: mockRepo, logger: velerotest.NewLogger()} + res := kr.PrefetchContents(context.Background(), []content.ID{id}, "hint") + assert.Equal(t, []content.ID{id}, res) +} + +func TestGetFlattenedEntries(t *testing.T) { + kr := &kopiaRepository{logger: velerotest.NewLogger()} + rawID := object.ID{} + _, err := kr.getFlattenedEntries(context.Background(), rawID) + require.Error(t, err) + assert.Contains(t, err.Error(), "object is not an indirect object") +} + +func TestNewObjectWriterEx(t *testing.T) { + testCases := []struct { + name string + opt udmrepo.ObjectWriteOptions + rawWriter *repomocks.MockRepositoryWriter + expectedErr string + }{ + { + name: "block mode success without parent", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + }, + { + name: "block mode with parent, invalid parent ID", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + ParentObject: udmrepo.ID("invalid-parent"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "error parsing parent object ID from invalid-parent: malformed content ID: \"invalid-parent\": invalid content hash: encoding/hex: invalid byte: U+0069 'i'", + }, + { + name: "block mode with parent, valid ID but failed to load index", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + ParentObject: udmrepo.ID("I0123456789abcdef"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "error getting parent object entries from I0123456789abcdef: unexpected content error: invalid repo interface", + }, + { + name: "file mode with parent", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeFile, + ParentObject: udmrepo.ID("some-parent"), + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + expectedErr: "parent object is only supported for block mode", + }, + { + name: "block mode success with async writes", + opt: udmrepo.ObjectWriteOptions{ + AccessMode: udmrepo.ObjectDataAccessModeBlock, + AsyncWrites: 4, + }, + rawWriter: repomocks.NewMockRepositoryWriter(t), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kr := &kopiaRepository{logger: velerotest.NewLogger()} + if tc.rawWriter != nil { + kr.rawWriter = tc.rawWriter + } + + _, err := kr.NewObjectWriter(context.Background(), tc.opt) + + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + } + }) + } +}