Merge pull request #9845 from Lyndon-Li/incremental-aware-object-writer

Incremental aware object writer
This commit is contained in:
lyndon-li
2026-05-27 13:22:39 +08:00
committed by GitHub
3 changed files with 319 additions and 14 deletions

View File

@@ -0,0 +1 @@
Fix issue #9823, add incremental aware object writer for block data mover

View File

@@ -28,6 +28,7 @@ import (
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/manifest"
@@ -78,6 +79,17 @@ type kopiaObjectWriter struct {
rawWriter object.Writer
}
type kopiaObjectWriterEx struct {
ctx context.Context
rawRepoWriter repo.RepositoryWriter
parentEntries []object.IndirectObjectEntry
blockSize int64
description string
compressor compression.Name
splitter string
logger logrus.FieldLogger
}
type openOptions struct {
repoLogger io.Writer
}
@@ -88,6 +100,8 @@ const (
overwriteFullMaintainInterval = time.Duration(0)
overwriteQuickMaintainInterval = time.Duration(0)
repoBackend = "kopia"
fixedSplitter1M = "FIXED-1M"
fixedBlockSize = 1 << 20
)
var kopiaRepoOpen = repo.Open
@@ -391,26 +405,86 @@ func (kr *kopiaRepository) Close(ctx context.Context) error {
return nil
}
func (kr *kopiaRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {
return kr.rawRepo.ContentInfo(kopia.SetupKopiaLog(ctx, kr.logger), contentID)
}
func (kr *kopiaRepository) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
directRepo, ok := kr.rawRepo.(repo.DirectRepository)
if !ok {
return nil, errors.New("invalid repo interface")
}
return directRepo.ContentReader().GetContent(kopia.SetupKopiaLog(ctx, kr.logger), contentID)
}
func (kr *kopiaRepository) PrefetchContents(ctx context.Context, contentIDs []content.ID, prefetchHint string) []content.ID {
return kr.rawRepo.PrefetchContents(kopia.SetupKopiaLog(ctx, kr.logger), contentIDs, prefetchHint)
}
func (kr *kopiaRepository) getFlattenedEntries(ctx context.Context, rawID object.ID) ([]object.IndirectObjectEntry, error) {
indexObjectID, ok := rawID.IndexObjectID()
if !ok {
return nil, errors.Errorf("object is not an indirect object, %v", rawID)
}
return object.LoadIndexObject(kopia.SetupKopiaLog(ctx, kr.logger), kr, indexObjectID)
}
func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.ObjectWriteOptions) (udmrepo.ObjectWriter, error) {
if kr.rawWriter == nil {
return nil, errors.New("repo writer is closed or not open")
}
writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{
Description: opt.Description,
Prefix: index.IDPrefix(opt.Prefix),
AsyncWrites: opt.AsyncWrites,
Compressor: getCompressorForObject(opt),
MetadataCompressor: getMetadataCompressor(),
})
var parentEntries []object.IndirectObjectEntry
if opt.AccessMode == udmrepo.ObjectDataAccessModeBlock {
if opt.ParentObject != "" {
kr.logger.Infof("Write object %s in block mode with parent %s", opt.Description, opt.ParentObject)
if writer == nil {
return nil, errors.Errorf("error creating writer for object %s", opt.Description)
rawID, err := object.ParseID(string(opt.ParentObject))
if err != nil {
return nil, errors.Wrapf(err, "error parsing parent object ID from %v", opt.ParentObject)
}
parentEntries, err = kr.getFlattenedEntries(ctx, rawID)
if err != nil {
return nil, errors.Wrapf(err, "error getting parent object entries from %v", opt.ParentObject)
}
} else {
kr.logger.Infof("Write object %s in block mode without parent", opt.Description)
}
return &kopiaObjectWriterEx{
ctx: ctx,
rawRepoWriter: kr.rawWriter,
parentEntries: parentEntries,
description: opt.Description,
compressor: getCompressorForObject(opt),
blockSize: fixedBlockSize,
splitter: fixedSplitter1M,
logger: kr.logger,
}, nil
} else {
if opt.ParentObject != "" {
return nil, errors.Errorf("parent object is only supported for block mode")
}
writer := kr.rawWriter.NewObjectWriter(kopia.SetupKopiaLog(ctx, kr.logger), object.WriterOptions{
Description: opt.Description,
Prefix: index.IDPrefix(opt.Prefix),
AsyncWrites: opt.AsyncWrites,
Compressor: getCompressorForObject(opt),
MetadataCompressor: getMetadataCompressor(),
})
if writer == nil {
return nil, errors.Errorf("error creating writer for object %s", opt.Description)
}
return &kopiaObjectWriter{
rawWriter: writer,
}, nil
}
return &kopiaObjectWriter{
rawWriter: writer,
}, nil
}
const kopiaDirStreamType = "kopia:directory"
@@ -731,7 +805,6 @@ func (kow *kopiaObjectWriter) Write(p []byte) (int, error) {
return kow.rawWriter.Write(p)
}
// TODO add implementation in following PRs
func (kow *kopiaObjectWriter) WriteAt(p []byte, offset int64) (int, error) {
return 0, errors.New("not supported")
}
@@ -777,6 +850,30 @@ func (kow *kopiaObjectWriter) Close() error {
return nil
}
// TODO add implementation in following PRs
func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) {
return 0, errors.New("not implemented")
}
// TODO add implementation in following PRs
func (kow *kopiaObjectWriterEx) WriteAt(p []byte, offset int64) (int, error) {
return 0, errors.New("not implemented")
}
func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) {
return udmrepo.ID(""), errors.New("not supported")
}
// TODO add implementation in following PRs
func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) {
return udmrepo.ID(""), errors.New("not implemented")
}
// TODO add implementation in following PRs
func (kow *kopiaObjectWriterEx) Close() error {
return errors.New("not implemented")
}
// getCompressorForObject returns the compressor for an object, at present, we don't support compression
func getCompressorForObject(_ udmrepo.ObjectWriteOptions) compression.Name {
return ""

View File

@@ -0,0 +1,207 @@
package kopialib
import (
"context"
"testing"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/object"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)
type mockDirectRepository struct {
repo.DirectRepository
mock.Mock
}
func (m *mockDirectRepository) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {
args := m.Called(ctx, contentID)
return args.Get(0).(content.Info), args.Error(1)
}
func (m *mockDirectRepository) ContentReader() content.Reader {
args := m.Called()
return args.Get(0).(content.Reader)
}
type mockContentReader struct {
content.Reader
mock.Mock
}
func (m *mockContentReader) GetContent(ctx context.Context, contentID content.ID) ([]byte, error) {
args := m.Called(ctx, contentID)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]byte), args.Error(1)
}
func TestContentInfo(t *testing.T) {
testCases := []struct {
name string
rawRepo repo.Repository
contentID content.ID
expectedErr string
}{
{
name: "success",
rawRepo: func() repo.Repository {
m := repomocks.NewMockRepository(t)
m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, nil)
return m
}(),
},
{
name: "error",
rawRepo: func() repo.Repository {
m := repomocks.NewMockRepository(t)
m.On("ContentInfo", mock.Anything, mock.Anything).Return(content.Info{}, assert.AnError)
return m
}(),
expectedErr: assert.AnError.Error(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()}
_, err := kr.ContentInfo(context.Background(), tc.contentID)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
func TestGetContent(t *testing.T) {
testCases := []struct {
name string
rawRepo repo.Repository
contentID content.ID
expectedErr string
}{
{
name: "invalid repo interface",
rawRepo: repomocks.NewMockRepository(t),
expectedErr: "invalid repo interface",
},
{
name: "success",
rawRepo: func() repo.Repository {
m := &mockDirectRepository{}
cr := &mockContentReader{}
cr.On("GetContent", mock.Anything, mock.Anything).Return([]byte("test"), nil)
m.On("ContentReader").Return(cr)
return m
}(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kr := &kopiaRepository{rawRepo: tc.rawRepo, logger: velerotest.NewLogger()}
_, err := kr.GetContent(context.Background(), tc.contentID)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
func TestPrefetchContents(t *testing.T) {
mockRepo := repomocks.NewMockRepository(t)
id, _ := content.ParseID("123")
mockRepo.On("PrefetchContents", mock.Anything, mock.Anything, mock.Anything).Return([]content.ID{id})
kr := &kopiaRepository{rawRepo: mockRepo, logger: velerotest.NewLogger()}
res := kr.PrefetchContents(context.Background(), []content.ID{id}, "hint")
assert.Equal(t, []content.ID{id}, res)
}
func TestGetFlattenedEntries(t *testing.T) {
kr := &kopiaRepository{logger: velerotest.NewLogger()}
rawID := object.ID{}
_, err := kr.getFlattenedEntries(context.Background(), rawID)
require.Error(t, err)
assert.Contains(t, err.Error(), "object is not an indirect object")
}
func TestNewObjectWriterEx(t *testing.T) {
testCases := []struct {
name string
opt udmrepo.ObjectWriteOptions
rawWriter *repomocks.MockRepositoryWriter
expectedErr string
}{
{
name: "block mode success without parent",
opt: udmrepo.ObjectWriteOptions{
AccessMode: udmrepo.ObjectDataAccessModeBlock,
},
rawWriter: repomocks.NewMockRepositoryWriter(t),
},
{
name: "block mode with parent, invalid parent ID",
opt: udmrepo.ObjectWriteOptions{
AccessMode: udmrepo.ObjectDataAccessModeBlock,
ParentObject: udmrepo.ID("invalid-parent"),
},
rawWriter: repomocks.NewMockRepositoryWriter(t),
expectedErr: "error parsing parent object ID from invalid-parent: malformed content ID: \"invalid-parent\": invalid content hash: encoding/hex: invalid byte: U+0069 'i'",
},
{
name: "block mode with parent, valid ID but failed to load index",
opt: udmrepo.ObjectWriteOptions{
AccessMode: udmrepo.ObjectDataAccessModeBlock,
ParentObject: udmrepo.ID("I0123456789abcdef"),
},
rawWriter: repomocks.NewMockRepositoryWriter(t),
expectedErr: "error getting parent object entries from I0123456789abcdef: unexpected content error: invalid repo interface",
},
{
name: "file mode with parent",
opt: udmrepo.ObjectWriteOptions{
AccessMode: udmrepo.ObjectDataAccessModeFile,
ParentObject: udmrepo.ID("some-parent"),
},
rawWriter: repomocks.NewMockRepositoryWriter(t),
expectedErr: "parent object is only supported for block mode",
},
{
name: "block mode success with async writes",
opt: udmrepo.ObjectWriteOptions{
AccessMode: udmrepo.ObjectDataAccessModeBlock,
AsyncWrites: 4,
},
rawWriter: repomocks.NewMockRepositoryWriter(t),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kr := &kopiaRepository{logger: velerotest.NewLogger()}
if tc.rawWriter != nil {
kr.rawWriter = tc.rawWriter
}
_, err := kr.NewObjectWriter(context.Background(), tc.opt)
if tc.expectedErr == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, tc.expectedErr)
}
})
}
}