From 44eaea8faf9f429a7cec8b3abb2464b6e812449a Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 8 May 2026 17:15:03 +0800 Subject: [PATCH] incremental aware object writer - write Signed-off-by: Lyndon-Li --- changelogs/unreleased/9853-Lyndon-Li | 1 + .../udmrepo/kopialib/freelist/freelist.go | 67 ++++ .../kopialib/freelist/freelist_test.go | 108 ++++++ pkg/repository/udmrepo/kopialib/lib_repo.go | 239 ++++++++++-- .../udmrepo/kopialib/lib_repo_ex_test.go | 343 ++++++++++++++++++ 5 files changed, 735 insertions(+), 23 deletions(-) create mode 100644 changelogs/unreleased/9853-Lyndon-Li create mode 100644 pkg/repository/udmrepo/kopialib/freelist/freelist.go create mode 100644 pkg/repository/udmrepo/kopialib/freelist/freelist_test.go diff --git a/changelogs/unreleased/9853-Lyndon-Li b/changelogs/unreleased/9853-Lyndon-Li new file mode 100644 index 000000000..e30f4502a --- /dev/null +++ b/changelogs/unreleased/9853-Lyndon-Li @@ -0,0 +1 @@ +Add the Write implementation for incremental aware object writer \ No newline at end of file diff --git a/pkg/repository/udmrepo/kopialib/freelist/freelist.go b/pkg/repository/udmrepo/kopialib/freelist/freelist.go new file mode 100644 index 000000000..aaabb4087 --- /dev/null +++ b/pkg/repository/udmrepo/kopialib/freelist/freelist.go @@ -0,0 +1,67 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package freelist + +import ( + "fmt" +) + +type FreeList struct { + chunks chan []byte + memory []byte + chunkSize int +} + +func New(size, chunkSize int) *FreeList { + memory := make([]byte, size) + numChunks := size / chunkSize + chunks := make(chan []byte, numChunks) + + for i := range numChunks { + start := i * chunkSize + end := start + chunkSize + + chunks <- memory[start:end:end] + } + + return &FreeList{ + chunks: chunks, + memory: memory, + chunkSize: chunkSize, + } +} + +func (f *FreeList) Chunks() <-chan []byte { + return f.chunks +} + +func (f *FreeList) Get() []byte { + return <-f.chunks +} + +func (f *FreeList) Return(chunk []byte) { + if cap(chunk) != f.chunkSize { + panic(fmt.Sprintf("chunk (%v) is not allocated by me", cap(chunk))) + } + + chunk = chunk[:cap(chunk)] + f.chunks <- chunk +} + +func (f *FreeList) Capacity() int { + return len(f.chunks) +} diff --git a/pkg/repository/udmrepo/kopialib/freelist/freelist_test.go b/pkg/repository/udmrepo/kopialib/freelist/freelist_test.go new file mode 100644 index 000000000..3d46c5d15 --- /dev/null +++ b/pkg/repository/udmrepo/kopialib/freelist/freelist_test.go @@ -0,0 +1,108 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package freelist + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + size := 1024 + chunkSize := 256 + numChunks := size / chunkSize + + fl := New(size, chunkSize) + + assert.NotNil(t, fl) + assert.Equal(t, chunkSize, fl.chunkSize) + assert.Len(t, fl.memory, size) + assert.Equal(t, numChunks, cap(fl.chunks)) + assert.Len(t, fl.chunks, numChunks) + assert.Equal(t, numChunks, fl.Capacity()) +} + +func TestGetAndReturn(t *testing.T) { + size := 1024 + chunkSize := 256 + + fl := New(size, chunkSize) + + chunk := fl.Get() + assert.Equal(t, chunkSize, cap(chunk)) + assert.Len(t, chunk, chunkSize) + assert.Equal(t, 3, fl.Capacity()) + + fl.Return(chunk) + assert.Equal(t, 4, fl.Capacity()) +} + +func TestReturnPanic(t *testing.T) { + fl := New(1024, 256) + + invalidChunk := make([]byte, 128) + assert.PanicsWithValue(t, "chunk (128) is not allocated by me", func() { + fl.Return(invalidChunk) + }) +} + +func TestChunks(t *testing.T) { + fl := New(1024, 256) + + chunks := fl.Chunks() + assert.Len(t, chunks, 4) + assert.Equal(t, cap(fl.chunks), cap(chunks)) +} + +func TestCapacity(t *testing.T) { + fl := New(1024, 256) + + assert.Equal(t, 4, fl.Capacity()) + + fl.Get() + assert.Equal(t, 3, fl.Capacity()) +} + +func TestConcurrentAccess(t *testing.T) { + size := 1024 * 10 + chunkSize := 256 + numChunks := size / chunkSize + + fl := New(size, chunkSize) + + var wg sync.WaitGroup + for i := 0; i < numChunks; i++ { + wg.Add(1) + go func() { + defer wg.Done() + chunk := fl.Get() + assert.Equal(t, chunkSize, cap(chunk)) + assert.Len(t, chunk, chunkSize) + + for j := 0; j < len(chunk); j++ { + chunk[j] = byte(j % 256) + } + + fl.Return(chunk) + }() + } + + wg.Wait() + assert.Equal(t, numChunks, fl.Capacity()) +} diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index 79610bbbd..4fe7d6f35 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -19,9 +19,11 @@ package kopialib import ( "context" "encoding/json" + "fmt" "io" "os" "strings" + "sync" "sync/atomic" "time" @@ -42,6 +44,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/kopia" "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/freelist" ) type kopiaRepoService struct { @@ -80,14 +83,21 @@ type kopiaObjectWriter struct { } type kopiaObjectWriterEx struct { - ctx context.Context - rawRepoWriter repo.RepositoryWriter - parentEntries []object.IndirectObjectEntry - blockSize int64 - description string - compressor compression.Name - splitter string - logger logrus.FieldLogger + ctx context.Context + rawRepoWriter repo.RepositoryWriter + parentEntries []object.IndirectObjectEntry + entries []object.IndirectObjectEntry + entryLock sync.Mutex + blockSize int64 + description string + compressor compression.Name + splitter string + writeLock sync.Mutex + asyncWritesSem chan struct{} + asyncWritesGroup sync.WaitGroup + asyncBuffer *freelist.FreeList + writeError atomic.Value + logger logrus.FieldLogger } type openOptions struct { @@ -101,6 +111,7 @@ const ( overwriteQuickMaintainInterval = time.Duration(0) repoBackend = "kopia" fixedSplitter1M = "FIXED-1M" + fixedSplitter128K = "FIXED-128K" fixedBlockSize = 1 << 20 ) @@ -454,15 +465,24 @@ func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.Obje kr.logger.Infof("Write object %s in block mode without parent", opt.Description) } + var asyncWritesSem chan struct{} + var asyncBuffer *freelist.FreeList + if opt.AsyncWrites > 0 { + asyncWritesSem = make(chan struct{}, opt.AsyncWrites) + asyncBuffer = freelist.New(opt.AsyncWrites*fixedBlockSize, fixedBlockSize) + } + return &kopiaObjectWriterEx{ - ctx: ctx, - rawRepoWriter: kr.rawWriter, - parentEntries: parentEntries, - description: opt.Description, - compressor: getCompressorForObject(opt), - blockSize: fixedBlockSize, - splitter: fixedSplitter1M, - logger: kr.logger, + ctx: ctx, + rawRepoWriter: kr.rawWriter, + parentEntries: parentEntries, + description: opt.Description, + compressor: getCompressorForObject(opt), + blockSize: fixedBlockSize, + splitter: fixedSplitter1M, + asyncWritesSem: asyncWritesSem, + asyncBuffer: asyncBuffer, + logger: kr.logger, }, nil } else { if opt.ParentObject != "" { @@ -850,9 +870,111 @@ func (kow *kopiaObjectWriter) Close() error { return nil } -// TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) { - return 0, errors.New("not implemented") + kow.writeLock.Lock() + defer kow.writeLock.Unlock() + + if kow.rawRepoWriter == nil { + return 0, errors.New("object writer is closed or not open") + } + + if err := kow.getWriteError(); err != nil { + return 0, errors.Wrapf(err, "error happened during writing object") + } + + length := len(p) + if int64(length)%kow.blockSize != 0 { + return 0, errors.Errorf("invalid length %v", length) + } + + kow.entryLock.Lock() + curPos := int64(len(kow.entries)) * kow.blockSize + kow.entryLock.Unlock() + + offset := curPos + entryID := 0 + for curPos < offset+int64(length) { + kow.entryLock.Lock() + entryID = len(kow.entries) + kow.entries = append(kow.entries, object.IndirectObjectEntry{ + Start: curPos, + Length: kow.blockSize, + }) + kow.entryLock.Unlock() + + buffOffset := curPos - offset + objName := fmt.Sprintf("%s-b%v", kow.description, entryID) + kow.writeObjectAsync(objName, entryID, p[buffOffset:buffOffset+kow.blockSize]) + + curPos += kow.blockSize + } + + return length, nil +} + +func (kow *kopiaObjectWriterEx) writeObject(objName string, p []byte) (object.ID, error) { + writer := kow.rawRepoWriter.NewObjectWriter(kopia.SetupKopiaLog(kow.ctx, kow.logger), object.WriterOptions{ + Description: objName, + Compressor: kow.compressor, + Splitter: kow.splitter, + }) + + if writer == nil { + return object.EmptyID, errors.Errorf("error opening writer for %s", objName) + } + + defer writer.Close() + + written, err := writer.Write(p) + if err != nil { + return object.EmptyID, errors.Wrapf(err, "error writing for %s", objName) + } + + if written != len(p) { + return object.EmptyID, errors.Errorf("short write for %s", objName) + } + + objID, err := writer.Result() + if err != nil { + return object.EmptyID, errors.Wrapf(err, "error flushing data for %s", objName) + } + + return objID, nil +} + +func (kow *kopiaObjectWriterEx) writeObjectSync(objName string, entry int, p []byte) error { + objID, err := kow.writeObject(objName, p) + if err != nil { + return err + } + + kow.entryLock.Lock() + kow.entries[entry].Object = objID + kow.entryLock.Unlock() + + return nil +} + +func (kow *kopiaObjectWriterEx) writeObjectAsync(objName string, entryID int, p []byte) { + if kow.asyncWritesSem == nil { + if err := kow.writeObjectSync(objName, entryID, p); err != nil { + kow.saveWriteError(errors.Wrapf(err, "error writing object for %s", objName)) + } + } else { + kow.asyncWritesSem <- struct{}{} + + buffer := kow.asyncBuffer.Get() + copy(buffer, p) + + kow.asyncWritesGroup.Go(func() { + if err := kow.writeObjectSync(objName, entryID, buffer); err != nil { + kow.saveWriteError(errors.Wrapf(err, "error writing object for %s", objName)) + } + + kow.asyncBuffer.Return(buffer) + <-kow.asyncWritesSem + }) + } } // TODO add implementation in following PRs @@ -864,14 +986,85 @@ func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) { return udmrepo.ID(""), errors.New("not supported") } -// TODO add implementation in following PRs -func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) { - return udmrepo.ID(""), errors.New("not implemented") +type indirectObject struct { + StreamID string `json:"stream"` + Entries []object.IndirectObjectEntry `json:"entries"` +} + +const kopiaIndirectStreamType = "kopia:indirect" + +func (kow *kopiaObjectWriterEx) writeIndirectObject() (object.ID, error) { + if kow.rawRepoWriter == nil { + return object.EmptyID, errors.New("object writer is closed or not open") + } + + writer := kow.rawRepoWriter.NewObjectWriter(kopia.SetupKopiaLog(kow.ctx, kow.logger), object.WriterOptions{ + Description: "LIST(" + kow.description + ")", + Prefix: "x", + Compressor: getMetadataCompressor(), + Splitter: fixedSplitter128K, + }) + if writer == nil { + return object.EmptyID, errors.New("unable to create writer for indirect object") + } + + defer writer.Close() + + ind := indirectObject{ + StreamID: kopiaIndirectStreamType, + Entries: kow.entries, + } + + if err := json.NewEncoder(writer).Encode(ind); err != nil { + return object.EmptyID, errors.Wrap(err, "unable to write indirect object index") + } + + return writer.Result() +} + +func (kow *kopiaObjectWriterEx) saveWriteError(err error) { + if err != nil { + kow.writeError.Store(err) + } +} + +func (kow *kopiaObjectWriterEx) getWriteError() error { + if v := kow.writeError.Load(); v != nil { + return v.(error) + } + + return nil +} + +func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) { + kow.writeLock.Lock() + defer kow.writeLock.Unlock() + + kow.asyncWritesGroup.Wait() + + if err := kow.getWriteError(); err != nil { + return udmrepo.ID(""), errors.Wrap(err, "error happened during writing object") + } + + id, err := kow.writeIndirectObject() + if err != nil { + return udmrepo.ID(""), errors.Wrap(err, "error to write indirect object") + } + + objectID := "I" + udmrepo.ID(id.String()) + + return objectID, nil } -// TODO add implementation in following PRs func (kow *kopiaObjectWriterEx) Close() error { - return errors.New("not implemented") + kow.writeLock.Lock() + defer kow.writeLock.Unlock() + + kow.asyncWritesGroup.Wait() + + kow.rawRepoWriter = nil + + return nil } // getCompressorForObject returns the compressor for an object, at present, we don't support compression diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go index ec45d1e81..6d9c5fc98 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo_ex_test.go @@ -2,6 +2,8 @@ package kopialib import ( "context" + "errors" + "sync" "testing" "github.com/kopia/kopia/repo" @@ -13,6 +15,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/freelist" velerotest "github.com/vmware-tanzu/velero/pkg/test" ) @@ -205,3 +208,343 @@ func TestNewObjectWriterEx(t *testing.T) { }) } } + +func TestKopiaObjectWriterEx_Write(t *testing.T) { + testCases := []struct { + name string + setupWriter func(t *testing.T) *kopiaObjectWriterEx + inputData []byte + expectedErr string + expectedLen int + }{ + { + name: "writer is closed", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + return &kopiaObjectWriterEx{ + rawRepoWriter: nil, + } + }, + inputData: make([]byte, 1024), + expectedErr: "object writer is closed or not open", + }, + { + name: "write error exists", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + kow := &kopiaObjectWriterEx{ + rawRepoWriter: repomocks.NewMockRepositoryWriter(t), + blockSize: 1024, + } + kow.saveWriteError(errors.New("previous error")) + return kow + }, + inputData: make([]byte, 1024), + expectedErr: "error happened during writing object: previous error", + }, + { + name: "invalid length", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + return &kopiaObjectWriterEx{ + rawRepoWriter: repomocks.NewMockRepositoryWriter(t), + blockSize: 1024, + } + }, + inputData: make([]byte, 1023), + expectedErr: "invalid length 1023", + }, + { + name: "success sync write", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + return &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + logger: velerotest.NewLogger(), + } + }, + inputData: make([]byte, 1024), + expectedLen: 1024, + }, + { + name: "success async write", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + sem := make(chan struct{}, 1) + buf := freelist.New(1024, 1024) + + return &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + asyncWritesSem: sem, + asyncBuffer: buf, + logger: velerotest.NewLogger(), + } + }, + inputData: make([]byte, 1024), + expectedLen: 1024, + }, + { + name: "success multiple blocks in one write", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + return &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + logger: velerotest.NewLogger(), + } + }, + inputData: make([]byte, 2048), + expectedLen: 2048, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kow := tc.setupWriter(t) + l, err := kow.Write(tc.inputData) + + if kow.asyncWritesSem != nil { + kow.asyncWritesGroup.Wait() + } + + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expectedLen, l) + } + }) + } +} + +func TestKopiaObjectWriterEx_Result(t *testing.T) { + testCases := []struct { + name string + setupWriter func(t *testing.T) *kopiaObjectWriterEx + expectedErr string + expectedID udmrepo.ID + }{ + { + name: "write error exists", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + kow := &kopiaObjectWriterEx{} + kow.saveWriteError(errors.New("async write failed")) + return kow + }, + expectedErr: "error happened during writing object: async write failed", + }, + { + name: "writer closed", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + kow := &kopiaObjectWriterEx{ + rawRepoWriter: nil, + } + return kow + }, + expectedErr: "error to write indirect object: object writer is closed or not open", + }, + { + name: "success", + setupWriter: func(t *testing.T) *kopiaObjectWriterEx { + t.Helper() + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(100, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("Iabcdef") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + return &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + logger: velerotest.NewLogger(), + } + }, + expectedID: udmrepo.ID("IIabcdef"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + kow := tc.setupWriter(t) + id, err := kow.Result() + + if tc.expectedErr != "" { + assert.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expectedID, id) + } + }) + } +} + +func TestKopiaObjectWriterEx_Close(t *testing.T) { + kow := &kopiaObjectWriterEx{} + err := kow.Close() + assert.NoError(t, err) +} + +func TestKopiaObjectWriterEx_ConcurrentWrite(t *testing.T) { + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + kow := &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + logger: velerotest.NewLogger(), + } + + numGoroutines := 10 + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + data := make([]byte, 1024) + l, err := kow.Write(data) + assert.NoError(t, err) + assert.Equal(t, 1024, l) + }() + } + + wg.Wait() + + assert.Len(t, kow.entries, numGoroutines) +} + +func TestKopiaObjectWriterEx_ConcurrentAsyncWrite(t *testing.T) { + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + sem := make(chan struct{}, 5) + buf := freelist.New(5*1024, 1024) + + kow := &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + asyncWritesSem: sem, + asyncBuffer: buf, + logger: velerotest.NewLogger(), + } + + numGoroutines := 10 + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + data := make([]byte, 1024) + l, err := kow.Write(data) + assert.NoError(t, err) + assert.Equal(t, 1024, l) + }() + } + + wg.Wait() + + kow.asyncWritesGroup.Wait() + + assert.Len(t, kow.entries, numGoroutines) +} + +func TestKopiaObjectWriterEx_MultipleWrites(t *testing.T) { + mockRepoWriter := repomocks.NewMockRepositoryWriter(t) + mockWriter := repomocks.NewWriter(t) + + // Since we are writing 3 blocks, Write should be called 3 times and Close 3 times + mockWriter.On("Write", mock.Anything).Return(1024, nil) + mockWriter.On("Close").Return(nil) + + id, _ := object.ParseID("I12345") + mockWriter.On("Result").Return(id, nil) + + mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter) + + kow := &kopiaObjectWriterEx{ + ctx: context.Background(), + rawRepoWriter: mockRepoWriter, + blockSize: 1024, + logger: velerotest.NewLogger(), + } + + // Write 1st block + l, err := kow.Write(make([]byte, 1024)) + require.NoError(t, err) + assert.Equal(t, 1024, l) + + // Write 2nd and 3rd block + l, err = kow.Write(make([]byte, 2048)) + require.NoError(t, err) + assert.Equal(t, 2048, l) + + // In the end we expect 3 blocks to be tracked in `kow.entries` + assert.Len(t, kow.entries, 3) + assert.Equal(t, int64(0), kow.entries[0].Start) + assert.Equal(t, int64(1024), kow.entries[1].Start) + assert.Equal(t, int64(2048), kow.entries[2].Start) +}