mirror of
https://github.com/vmware-tanzu/velero.git
synced 2026-05-30 02:50:25 +00:00
Merge pull request #9853 from Lyndon-Li/incremental-aware-object-write
Some checks failed
Run the E2E test on kind / get-go-version (push) Failing after 58s
Run the E2E test on kind / build (push) Has been skipped
Run the E2E test on kind / setup-test-matrix (push) Successful in 3s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / get-go-version (push) Successful in 12s
Main CI / Build (push) Failing after 29s
Close stale issues and PRs / stale (push) Successful in 11s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 1m51s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-aws, main) (push) Failing after 1m12s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-gcp, main) (push) Failing after 1m13s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-microsoft-azure, main) (push) Failing after 1m16s
Some checks failed
Run the E2E test on kind / get-go-version (push) Failing after 58s
Run the E2E test on kind / build (push) Has been skipped
Run the E2E test on kind / setup-test-matrix (push) Successful in 3s
Run the E2E test on kind / run-e2e-test (push) Has been skipped
Main CI / get-go-version (push) Successful in 12s
Main CI / Build (push) Failing after 29s
Close stale issues and PRs / stale (push) Successful in 11s
Trivy Nightly Scan / Trivy nightly scan (velero, main) (push) Failing after 1m51s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-aws, main) (push) Failing after 1m12s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-gcp, main) (push) Failing after 1m13s
Trivy Nightly Scan / Trivy nightly scan (velero-plugin-for-microsoft-azure, main) (push) Failing after 1m16s
Incremental aware object write
This commit is contained in:
1
changelogs/unreleased/9853-Lyndon-Li
Normal file
1
changelogs/unreleased/9853-Lyndon-Li
Normal file
@@ -0,0 +1 @@
|
||||
Add the Write implementation for incremental aware object writer
|
||||
67
pkg/repository/udmrepo/kopialib/freelist/freelist.go
Normal file
67
pkg/repository/udmrepo/kopialib/freelist/freelist.go
Normal file
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package freelist
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type FreeList struct {
|
||||
chunks chan []byte
|
||||
memory []byte
|
||||
chunkSize int
|
||||
}
|
||||
|
||||
func New(size, chunkSize int) *FreeList {
|
||||
memory := make([]byte, size)
|
||||
numChunks := size / chunkSize
|
||||
chunks := make(chan []byte, numChunks)
|
||||
|
||||
for i := range numChunks {
|
||||
start := i * chunkSize
|
||||
end := start + chunkSize
|
||||
|
||||
chunks <- memory[start:end:end]
|
||||
}
|
||||
|
||||
return &FreeList{
|
||||
chunks: chunks,
|
||||
memory: memory,
|
||||
chunkSize: chunkSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FreeList) Chunks() <-chan []byte {
|
||||
return f.chunks
|
||||
}
|
||||
|
||||
func (f *FreeList) Get() []byte {
|
||||
return <-f.chunks
|
||||
}
|
||||
|
||||
func (f *FreeList) Return(chunk []byte) {
|
||||
if cap(chunk) != f.chunkSize {
|
||||
panic(fmt.Sprintf("chunk (%v) is not allocated by me", cap(chunk)))
|
||||
}
|
||||
|
||||
chunk = chunk[:cap(chunk)]
|
||||
f.chunks <- chunk
|
||||
}
|
||||
|
||||
func (f *FreeList) Capacity() int {
|
||||
return len(f.chunks)
|
||||
}
|
||||
108
pkg/repository/udmrepo/kopialib/freelist/freelist_test.go
Normal file
108
pkg/repository/udmrepo/kopialib/freelist/freelist_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
Copyright the Velero contributors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package freelist
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
size := 1024
|
||||
chunkSize := 256
|
||||
numChunks := size / chunkSize
|
||||
|
||||
fl := New(size, chunkSize)
|
||||
|
||||
assert.NotNil(t, fl)
|
||||
assert.Equal(t, chunkSize, fl.chunkSize)
|
||||
assert.Len(t, fl.memory, size)
|
||||
assert.Equal(t, numChunks, cap(fl.chunks))
|
||||
assert.Len(t, fl.chunks, numChunks)
|
||||
assert.Equal(t, numChunks, fl.Capacity())
|
||||
}
|
||||
|
||||
func TestGetAndReturn(t *testing.T) {
|
||||
size := 1024
|
||||
chunkSize := 256
|
||||
|
||||
fl := New(size, chunkSize)
|
||||
|
||||
chunk := fl.Get()
|
||||
assert.Equal(t, chunkSize, cap(chunk))
|
||||
assert.Len(t, chunk, chunkSize)
|
||||
assert.Equal(t, 3, fl.Capacity())
|
||||
|
||||
fl.Return(chunk)
|
||||
assert.Equal(t, 4, fl.Capacity())
|
||||
}
|
||||
|
||||
func TestReturnPanic(t *testing.T) {
|
||||
fl := New(1024, 256)
|
||||
|
||||
invalidChunk := make([]byte, 128)
|
||||
assert.PanicsWithValue(t, "chunk (128) is not allocated by me", func() {
|
||||
fl.Return(invalidChunk)
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunks(t *testing.T) {
|
||||
fl := New(1024, 256)
|
||||
|
||||
chunks := fl.Chunks()
|
||||
assert.Len(t, chunks, 4)
|
||||
assert.Equal(t, cap(fl.chunks), cap(chunks))
|
||||
}
|
||||
|
||||
func TestCapacity(t *testing.T) {
|
||||
fl := New(1024, 256)
|
||||
|
||||
assert.Equal(t, 4, fl.Capacity())
|
||||
|
||||
fl.Get()
|
||||
assert.Equal(t, 3, fl.Capacity())
|
||||
}
|
||||
|
||||
func TestConcurrentAccess(t *testing.T) {
|
||||
size := 1024 * 10
|
||||
chunkSize := 256
|
||||
numChunks := size / chunkSize
|
||||
|
||||
fl := New(size, chunkSize)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < numChunks; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
chunk := fl.Get()
|
||||
assert.Equal(t, chunkSize, cap(chunk))
|
||||
assert.Len(t, chunk, chunkSize)
|
||||
|
||||
for j := 0; j < len(chunk); j++ {
|
||||
chunk[j] = byte(j % 256)
|
||||
}
|
||||
|
||||
fl.Return(chunk)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
assert.Equal(t, numChunks, fl.Capacity())
|
||||
}
|
||||
@@ -19,9 +19,11 @@ package kopialib
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -42,6 +44,7 @@ import (
|
||||
"github.com/vmware-tanzu/velero/pkg/kopia"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/freelist"
|
||||
)
|
||||
|
||||
type kopiaRepoService struct {
|
||||
@@ -80,14 +83,21 @@ type kopiaObjectWriter struct {
|
||||
}
|
||||
|
||||
type kopiaObjectWriterEx struct {
|
||||
ctx context.Context
|
||||
rawRepoWriter repo.RepositoryWriter
|
||||
parentEntries []object.IndirectObjectEntry
|
||||
blockSize int64
|
||||
description string
|
||||
compressor compression.Name
|
||||
splitter string
|
||||
logger logrus.FieldLogger
|
||||
ctx context.Context
|
||||
rawRepoWriter repo.RepositoryWriter
|
||||
parentEntries []object.IndirectObjectEntry
|
||||
entries []object.IndirectObjectEntry
|
||||
entryLock sync.Mutex
|
||||
blockSize int64
|
||||
description string
|
||||
compressor compression.Name
|
||||
splitter string
|
||||
writeLock sync.Mutex
|
||||
asyncWritesSem chan struct{}
|
||||
asyncWritesGroup sync.WaitGroup
|
||||
asyncBuffer *freelist.FreeList
|
||||
writeError atomic.Value
|
||||
logger logrus.FieldLogger
|
||||
}
|
||||
|
||||
type openOptions struct {
|
||||
@@ -101,6 +111,7 @@ const (
|
||||
overwriteQuickMaintainInterval = time.Duration(0)
|
||||
repoBackend = "kopia"
|
||||
fixedSplitter1M = "FIXED-1M"
|
||||
fixedSplitter128K = "FIXED-128K"
|
||||
fixedBlockSize = 1 << 20
|
||||
)
|
||||
|
||||
@@ -454,15 +465,24 @@ func (kr *kopiaRepository) NewObjectWriter(ctx context.Context, opt udmrepo.Obje
|
||||
kr.logger.Infof("Write object %s in block mode without parent", opt.Description)
|
||||
}
|
||||
|
||||
var asyncWritesSem chan struct{}
|
||||
var asyncBuffer *freelist.FreeList
|
||||
if opt.AsyncWrites > 0 {
|
||||
asyncWritesSem = make(chan struct{}, opt.AsyncWrites)
|
||||
asyncBuffer = freelist.New(opt.AsyncWrites*fixedBlockSize, fixedBlockSize)
|
||||
}
|
||||
|
||||
return &kopiaObjectWriterEx{
|
||||
ctx: ctx,
|
||||
rawRepoWriter: kr.rawWriter,
|
||||
parentEntries: parentEntries,
|
||||
description: opt.Description,
|
||||
compressor: getCompressorForObject(opt),
|
||||
blockSize: fixedBlockSize,
|
||||
splitter: fixedSplitter1M,
|
||||
logger: kr.logger,
|
||||
ctx: ctx,
|
||||
rawRepoWriter: kr.rawWriter,
|
||||
parentEntries: parentEntries,
|
||||
description: opt.Description,
|
||||
compressor: getCompressorForObject(opt),
|
||||
blockSize: fixedBlockSize,
|
||||
splitter: fixedSplitter1M,
|
||||
asyncWritesSem: asyncWritesSem,
|
||||
asyncBuffer: asyncBuffer,
|
||||
logger: kr.logger,
|
||||
}, nil
|
||||
} else {
|
||||
if opt.ParentObject != "" {
|
||||
@@ -850,9 +870,111 @@ func (kow *kopiaObjectWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
func (kow *kopiaObjectWriterEx) Write(p []byte) (int, error) {
|
||||
return 0, errors.New("not implemented")
|
||||
kow.writeLock.Lock()
|
||||
defer kow.writeLock.Unlock()
|
||||
|
||||
if kow.rawRepoWriter == nil {
|
||||
return 0, errors.New("object writer is closed or not open")
|
||||
}
|
||||
|
||||
if err := kow.getWriteError(); err != nil {
|
||||
return 0, errors.Wrapf(err, "error happened during writing object")
|
||||
}
|
||||
|
||||
length := len(p)
|
||||
if int64(length)%kow.blockSize != 0 {
|
||||
return 0, errors.Errorf("invalid length %v", length)
|
||||
}
|
||||
|
||||
kow.entryLock.Lock()
|
||||
curPos := int64(len(kow.entries)) * kow.blockSize
|
||||
kow.entryLock.Unlock()
|
||||
|
||||
offset := curPos
|
||||
entryID := 0
|
||||
for curPos < offset+int64(length) {
|
||||
kow.entryLock.Lock()
|
||||
entryID = len(kow.entries)
|
||||
kow.entries = append(kow.entries, object.IndirectObjectEntry{
|
||||
Start: curPos,
|
||||
Length: kow.blockSize,
|
||||
})
|
||||
kow.entryLock.Unlock()
|
||||
|
||||
buffOffset := curPos - offset
|
||||
objName := fmt.Sprintf("%s-b%v", kow.description, entryID)
|
||||
kow.writeObjectAsync(objName, entryID, p[buffOffset:buffOffset+kow.blockSize])
|
||||
|
||||
curPos += kow.blockSize
|
||||
}
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) writeObject(objName string, p []byte) (object.ID, error) {
|
||||
writer := kow.rawRepoWriter.NewObjectWriter(kopia.SetupKopiaLog(kow.ctx, kow.logger), object.WriterOptions{
|
||||
Description: objName,
|
||||
Compressor: kow.compressor,
|
||||
Splitter: kow.splitter,
|
||||
})
|
||||
|
||||
if writer == nil {
|
||||
return object.EmptyID, errors.Errorf("error opening writer for %s", objName)
|
||||
}
|
||||
|
||||
defer writer.Close()
|
||||
|
||||
written, err := writer.Write(p)
|
||||
if err != nil {
|
||||
return object.EmptyID, errors.Wrapf(err, "error writing for %s", objName)
|
||||
}
|
||||
|
||||
if written != len(p) {
|
||||
return object.EmptyID, errors.Errorf("short write for %s", objName)
|
||||
}
|
||||
|
||||
objID, err := writer.Result()
|
||||
if err != nil {
|
||||
return object.EmptyID, errors.Wrapf(err, "error flushing data for %s", objName)
|
||||
}
|
||||
|
||||
return objID, nil
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) writeObjectSync(objName string, entry int, p []byte) error {
|
||||
objID, err := kow.writeObject(objName, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kow.entryLock.Lock()
|
||||
kow.entries[entry].Object = objID
|
||||
kow.entryLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) writeObjectAsync(objName string, entryID int, p []byte) {
|
||||
if kow.asyncWritesSem == nil {
|
||||
if err := kow.writeObjectSync(objName, entryID, p); err != nil {
|
||||
kow.saveWriteError(errors.Wrapf(err, "error writing object for %s", objName))
|
||||
}
|
||||
} else {
|
||||
kow.asyncWritesSem <- struct{}{}
|
||||
|
||||
buffer := kow.asyncBuffer.Get()
|
||||
copy(buffer, p)
|
||||
|
||||
kow.asyncWritesGroup.Go(func() {
|
||||
if err := kow.writeObjectSync(objName, entryID, buffer); err != nil {
|
||||
kow.saveWriteError(errors.Wrapf(err, "error writing object for %s", objName))
|
||||
}
|
||||
|
||||
kow.asyncBuffer.Return(buffer)
|
||||
<-kow.asyncWritesSem
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
@@ -864,14 +986,85 @@ func (kow *kopiaObjectWriterEx) Checkpoint() (udmrepo.ID, error) {
|
||||
return udmrepo.ID(""), errors.New("not supported")
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) {
|
||||
return udmrepo.ID(""), errors.New("not implemented")
|
||||
type indirectObject struct {
|
||||
StreamID string `json:"stream"`
|
||||
Entries []object.IndirectObjectEntry `json:"entries"`
|
||||
}
|
||||
|
||||
const kopiaIndirectStreamType = "kopia:indirect"
|
||||
|
||||
func (kow *kopiaObjectWriterEx) writeIndirectObject() (object.ID, error) {
|
||||
if kow.rawRepoWriter == nil {
|
||||
return object.EmptyID, errors.New("object writer is closed or not open")
|
||||
}
|
||||
|
||||
writer := kow.rawRepoWriter.NewObjectWriter(kopia.SetupKopiaLog(kow.ctx, kow.logger), object.WriterOptions{
|
||||
Description: "LIST(" + kow.description + ")",
|
||||
Prefix: "x",
|
||||
Compressor: getMetadataCompressor(),
|
||||
Splitter: fixedSplitter128K,
|
||||
})
|
||||
if writer == nil {
|
||||
return object.EmptyID, errors.New("unable to create writer for indirect object")
|
||||
}
|
||||
|
||||
defer writer.Close()
|
||||
|
||||
ind := indirectObject{
|
||||
StreamID: kopiaIndirectStreamType,
|
||||
Entries: kow.entries,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(writer).Encode(ind); err != nil {
|
||||
return object.EmptyID, errors.Wrap(err, "unable to write indirect object index")
|
||||
}
|
||||
|
||||
return writer.Result()
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) saveWriteError(err error) {
|
||||
if err != nil {
|
||||
kow.writeError.Store(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) getWriteError() error {
|
||||
if v := kow.writeError.Load(); v != nil {
|
||||
return v.(error)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kow *kopiaObjectWriterEx) Result() (udmrepo.ID, error) {
|
||||
kow.writeLock.Lock()
|
||||
defer kow.writeLock.Unlock()
|
||||
|
||||
kow.asyncWritesGroup.Wait()
|
||||
|
||||
if err := kow.getWriteError(); err != nil {
|
||||
return udmrepo.ID(""), errors.Wrap(err, "error happened during writing object")
|
||||
}
|
||||
|
||||
id, err := kow.writeIndirectObject()
|
||||
if err != nil {
|
||||
return udmrepo.ID(""), errors.Wrap(err, "error to write indirect object")
|
||||
}
|
||||
|
||||
objectID := "I" + udmrepo.ID(id.String())
|
||||
|
||||
return objectID, nil
|
||||
}
|
||||
|
||||
// TODO add implementation in following PRs
|
||||
func (kow *kopiaObjectWriterEx) Close() error {
|
||||
return errors.New("not implemented")
|
||||
kow.writeLock.Lock()
|
||||
defer kow.writeLock.Unlock()
|
||||
|
||||
kow.asyncWritesGroup.Wait()
|
||||
|
||||
kow.rawRepoWriter = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCompressorForObject returns the compressor for an object, at present, we don't support compression
|
||||
|
||||
@@ -2,6 +2,8 @@ package kopialib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
@@ -13,6 +15,7 @@ import (
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
repomocks "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend/mocks"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/freelist"
|
||||
velerotest "github.com/vmware-tanzu/velero/pkg/test"
|
||||
)
|
||||
|
||||
@@ -205,3 +208,343 @@ func TestNewObjectWriterEx(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_Write(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
setupWriter func(t *testing.T) *kopiaObjectWriterEx
|
||||
inputData []byte
|
||||
expectedErr string
|
||||
expectedLen int
|
||||
}{
|
||||
{
|
||||
name: "writer is closed",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
return &kopiaObjectWriterEx{
|
||||
rawRepoWriter: nil,
|
||||
}
|
||||
},
|
||||
inputData: make([]byte, 1024),
|
||||
expectedErr: "object writer is closed or not open",
|
||||
},
|
||||
{
|
||||
name: "write error exists",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
kow := &kopiaObjectWriterEx{
|
||||
rawRepoWriter: repomocks.NewMockRepositoryWriter(t),
|
||||
blockSize: 1024,
|
||||
}
|
||||
kow.saveWriteError(errors.New("previous error"))
|
||||
return kow
|
||||
},
|
||||
inputData: make([]byte, 1024),
|
||||
expectedErr: "error happened during writing object: previous error",
|
||||
},
|
||||
{
|
||||
name: "invalid length",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
return &kopiaObjectWriterEx{
|
||||
rawRepoWriter: repomocks.NewMockRepositoryWriter(t),
|
||||
blockSize: 1024,
|
||||
}
|
||||
},
|
||||
inputData: make([]byte, 1023),
|
||||
expectedErr: "invalid length 1023",
|
||||
},
|
||||
{
|
||||
name: "success sync write",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
return &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
},
|
||||
inputData: make([]byte, 1024),
|
||||
expectedLen: 1024,
|
||||
},
|
||||
{
|
||||
name: "success async write",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
sem := make(chan struct{}, 1)
|
||||
buf := freelist.New(1024, 1024)
|
||||
|
||||
return &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
asyncWritesSem: sem,
|
||||
asyncBuffer: buf,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
},
|
||||
inputData: make([]byte, 1024),
|
||||
expectedLen: 1024,
|
||||
},
|
||||
{
|
||||
name: "success multiple blocks in one write",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
return &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
},
|
||||
inputData: make([]byte, 2048),
|
||||
expectedLen: 2048,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
kow := tc.setupWriter(t)
|
||||
l, err := kow.Write(tc.inputData)
|
||||
|
||||
if kow.asyncWritesSem != nil {
|
||||
kow.asyncWritesGroup.Wait()
|
||||
}
|
||||
|
||||
if tc.expectedErr != "" {
|
||||
assert.EqualError(t, err, tc.expectedErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedLen, l)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_Result(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
setupWriter func(t *testing.T) *kopiaObjectWriterEx
|
||||
expectedErr string
|
||||
expectedID udmrepo.ID
|
||||
}{
|
||||
{
|
||||
name: "write error exists",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
kow := &kopiaObjectWriterEx{}
|
||||
kow.saveWriteError(errors.New("async write failed"))
|
||||
return kow
|
||||
},
|
||||
expectedErr: "error happened during writing object: async write failed",
|
||||
},
|
||||
{
|
||||
name: "writer closed",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
kow := &kopiaObjectWriterEx{
|
||||
rawRepoWriter: nil,
|
||||
}
|
||||
return kow
|
||||
},
|
||||
expectedErr: "error to write indirect object: object writer is closed or not open",
|
||||
},
|
||||
{
|
||||
name: "success",
|
||||
setupWriter: func(t *testing.T) *kopiaObjectWriterEx {
|
||||
t.Helper()
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(100, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("Iabcdef")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
return &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
},
|
||||
expectedID: udmrepo.ID("IIabcdef"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
kow := tc.setupWriter(t)
|
||||
id, err := kow.Result()
|
||||
|
||||
if tc.expectedErr != "" {
|
||||
assert.EqualError(t, err, tc.expectedErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedID, id)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_Close(t *testing.T) {
|
||||
kow := &kopiaObjectWriterEx{}
|
||||
err := kow.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_ConcurrentWrite(t *testing.T) {
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
kow := &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
numGoroutines := 10
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
data := make([]byte, 1024)
|
||||
l, err := kow.Write(data)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1024, l)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.Len(t, kow.entries, numGoroutines)
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_ConcurrentAsyncWrite(t *testing.T) {
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
sem := make(chan struct{}, 5)
|
||||
buf := freelist.New(5*1024, 1024)
|
||||
|
||||
kow := &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
asyncWritesSem: sem,
|
||||
asyncBuffer: buf,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
numGoroutines := 10
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
data := make([]byte, 1024)
|
||||
l, err := kow.Write(data)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1024, l)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
kow.asyncWritesGroup.Wait()
|
||||
|
||||
assert.Len(t, kow.entries, numGoroutines)
|
||||
}
|
||||
|
||||
func TestKopiaObjectWriterEx_MultipleWrites(t *testing.T) {
|
||||
mockRepoWriter := repomocks.NewMockRepositoryWriter(t)
|
||||
mockWriter := repomocks.NewWriter(t)
|
||||
|
||||
// Since we are writing 3 blocks, Write should be called 3 times and Close 3 times
|
||||
mockWriter.On("Write", mock.Anything).Return(1024, nil)
|
||||
mockWriter.On("Close").Return(nil)
|
||||
|
||||
id, _ := object.ParseID("I12345")
|
||||
mockWriter.On("Result").Return(id, nil)
|
||||
|
||||
mockRepoWriter.On("NewObjectWriter", mock.Anything, mock.Anything).Return(mockWriter)
|
||||
|
||||
kow := &kopiaObjectWriterEx{
|
||||
ctx: context.Background(),
|
||||
rawRepoWriter: mockRepoWriter,
|
||||
blockSize: 1024,
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
// Write 1st block
|
||||
l, err := kow.Write(make([]byte, 1024))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1024, l)
|
||||
|
||||
// Write 2nd and 3rd block
|
||||
l, err = kow.Write(make([]byte, 2048))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2048, l)
|
||||
|
||||
// In the end we expect 3 blocks to be tracked in `kow.entries`
|
||||
assert.Len(t, kow.entries, 3)
|
||||
assert.Equal(t, int64(0), kow.entries[0].Start)
|
||||
assert.Equal(t, int64(1024), kow.entries[1].Start)
|
||||
assert.Equal(t, int64(2048), kow.entries[2].Start)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user