diff --git a/pkg/repository/provider/unified_repo.go b/pkg/repository/provider/unified_repo.go index 251845dac..8ec5c454c 100644 --- a/pkg/repository/provider/unified_repo.go +++ b/pkg/repository/provider/unified_repo.go @@ -20,6 +20,7 @@ import ( "context" "encoding/base64" "fmt" + "maps" "net/url" "path" "strconv" @@ -81,11 +82,16 @@ func NewUnifiedRepoProvider( log: log, } - repo.repoService = createRepoService(log) + repo.repoService = createRepoService(repoBackend, log) return &repo } +func GetUnifiedRepoClientSideCacheLimit(repoOption map[string]string, repoBackend string, log logrus.FieldLogger) int64 { + repoService := createRepoService(repoBackend, log) + return repoService.ClientSideCacheLimit(repoOption) +} + func (urp *unifiedRepoProvider) InitRepo(ctx context.Context, param RepoParam) error { log := urp.log.WithFields(logrus.Fields{ "BSL name": param.BackupLocation.Name, @@ -416,12 +422,11 @@ func (urp *unifiedRepoProvider) GetStoreOptions(param any) (map[string]string, e } storeOptions := make(map[string]string) - for k, v := range storeVar { - storeOptions[k] = v - } + maps.Copy(storeOptions, storeVar) + maps.Copy(storeOptions, storeCred) - for k, v := range storeCred { - storeOptions[k] = v + if repoParam.CacheDir != "" { + storeOptions[udmrepo.StoreOptionCacheDir] = repoParam.CacheDir } return storeOptions, nil @@ -597,6 +602,6 @@ func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repo return result, nil } -func createRepoService(log logrus.FieldLogger) udmrepo.BackupRepoService { - return reposervice.Create(log) +func createRepoService(repoBackend string, log logrus.FieldLogger) udmrepo.BackupRepoService { + return reposervice.Create(repoBackend, log) } diff --git a/pkg/repository/udmrepo/kopialib/backend/common.go b/pkg/repository/udmrepo/kopialib/backend/common.go index 646811da9..a54923003 100644 --- a/pkg/repository/udmrepo/kopialib/backend/common.go +++ b/pkg/repository/udmrepo/kopialib/backend/common.go @@ -33,7 +33,7 @@ import ( ) const ( - defaultCacheLimitMB = 5000 + DefaultCacheLimitMB = 5000 maxCacheDurationSecond = 30 ) @@ -66,7 +66,8 @@ func SetupNewRepositoryOptions(ctx context.Context, flags map[string]string) rep // SetupConnectOptions setups the options when connecting to an existing Kopia repository func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) repo.ConnectOptions { - cacheLimit := optionalHaveIntWithDefault(ctx, udmrepo.StoreOptionCacheLimit, repoOptions.StorageOptions, defaultCacheLimitMB) << 20 + cacheLimit := optionalHaveIntWithDefault(ctx, udmrepo.StoreOptionCacheLimit, repoOptions.StorageOptions, DefaultCacheLimitMB) << 20 + cacheDir := optionalHaveString(udmrepo.StoreOptionCacheDir, repoOptions.StorageOptions) // 80% for data cache and 20% for metadata cache and align to KB dataCacheLimit := (cacheLimit / 5 * 4) >> 10 @@ -74,6 +75,7 @@ func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) r return repo.ConnectOptions{ CachingOptions: content.CachingOptions{ + CacheDirectory: cacheDir, // softLimit 80% ContentCacheSizeBytes: (dataCacheLimit / 5 * 4) << 10, MetadataCacheSizeBytes: (metadataCacheLimit / 5 * 4) << 10, diff --git a/pkg/repository/udmrepo/kopialib/lib_repo.go b/pkg/repository/udmrepo/kopialib/lib_repo.go index d1eeb88d0..2bfa8901f 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo.go @@ -18,6 +18,7 @@ package kopialib import ( "context" + "encoding/json" "os" "strings" "sync/atomic" @@ -35,6 +36,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/kopia" "github.com/vmware-tanzu/velero/pkg/repository/udmrepo" + "github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend" ) type kopiaRepoService struct { @@ -79,6 +81,7 @@ const ( defaultMaintainCheckPeriod = time.Hour overwriteFullMaintainInterval = time.Duration(0) overwriteQuickMaintainInterval = time.Duration(0) + repoBackend = "kopia" ) var kopiaRepoOpen = repo.Open @@ -211,6 +214,34 @@ func (ks *kopiaRepoService) DefaultMaintenanceFrequency() time.Duration { return defaultMaintainCheckPeriod } +func (ks *kopiaRepoService) ClientSideCacheLimit(repoOption map[string]string) int64 { + defaultLimit := int64(backend.DefaultCacheLimitMB << 20) + if repoOption == nil { + return defaultLimit + } + + if v, found := repoOption[repoBackend]; found { + var configs map[string]any + if err := json.Unmarshal([]byte(v), &configs); err != nil { + ks.logger.WithError(err).Warnf("error unmarshalling config data from data %v", v) + return defaultLimit + } + + limit := defaultLimit + if v, found := configs[udmrepo.StoreOptionCacheLimit]; found { + if iv, ok := v.(float64); ok { + limit = int64(iv) << 20 + } else { + ks.logger.Warnf("ignore cache limit from data %v", v) + } + } + + return limit + } + + return defaultLimit +} + func (km *kopiaMaintenance) runMaintenance(ctx context.Context, rep repo.DirectRepositoryWriter) error { err := snapshotmaintenance.Run(kopia.SetupKopiaLog(ctx, km.logger), rep, km.mode, false, maintenance.SafetyFull) if err != nil { diff --git a/pkg/repository/udmrepo/kopialib/lib_repo_test.go b/pkg/repository/udmrepo/kopialib/lib_repo_test.go index ba7aaa93c..a363ef807 100644 --- a/pkg/repository/udmrepo/kopialib/lib_repo_test.go +++ b/pkg/repository/udmrepo/kopialib/lib_repo_test.go @@ -1345,3 +1345,63 @@ func TestMaintainProgress(t *testing.T) { }) } } + +func TestClientSideCacheLimit(t *testing.T) { + testCases := []struct { + name string + repoOption map[string]string + expected int64 + }{ + { + name: "nil option", + expected: 5000 << 20, + }, + { + name: "no option", + repoOption: map[string]string{ + "other-repo": "\"enableCompression\": true", + }, + expected: 5000 << 20, + }, + { + name: "unmarshall fails", + repoOption: map[string]string{ + "kopia": "wrong-json", + }, + expected: 5000 << 20, + }, + { + name: "no cache limit", + repoOption: map[string]string{ + "kopia": "{\"enableCompression\": true}", + }, + expected: 5000 << 20, + }, + { + name: "wrong cache value type", + repoOption: map[string]string{ + "kopia": "{\"cacheLimitMB\": \"abcd\",\"enableCompression\": true}", + }, + expected: 5000 << 20, + }, + { + name: "succeed", + repoOption: map[string]string{ + "kopia": "{\"cacheLimitMB\": 1,\"enableCompression\": true}", + }, + expected: 1048576, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ks := &kopiaRepoService{ + logger: velerotest.NewLogger(), + } + + limit := ks.ClientSideCacheLimit(tc.repoOption) + + assert.Equal(t, tc.expected, limit) + }) + } +} diff --git a/pkg/repository/udmrepo/mocks/BackupRepoService.go b/pkg/repository/udmrepo/mocks/BackupRepoService.go index 2acf94816..346901d38 100644 --- a/pkg/repository/udmrepo/mocks/BackupRepoService.go +++ b/pkg/repository/udmrepo/mocks/BackupRepoService.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.1. DO NOT EDIT. +// Code generated by mockery v2.53.2. DO NOT EDIT. package mocks @@ -16,7 +16,25 @@ type BackupRepoService struct { mock.Mock } -// DefaultMaintenanceFrequency provides a mock function with given fields: +// ClientSideCacheLimit provides a mock function with given fields: repoOption +func (_m *BackupRepoService) ClientSideCacheLimit(repoOption map[string]string) int64 { + ret := _m.Called(repoOption) + + if len(ret) == 0 { + panic("no return value specified for ClientSideCacheLimit") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func(map[string]string) int64); ok { + r0 = rf(repoOption) + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// DefaultMaintenanceFrequency provides a mock function with no fields func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration { ret := _m.Called() diff --git a/pkg/repository/udmrepo/repo.go b/pkg/repository/udmrepo/repo.go index 0adb39751..8ba2fb071 100644 --- a/pkg/repository/udmrepo/repo.go +++ b/pkg/repository/udmrepo/repo.go @@ -93,6 +93,9 @@ type BackupRepoService interface { // DefaultMaintenanceFrequency returns the defgault frequency of maintenance, callers refer this // frequency to maintain the backup repository to get the best maintenance performance DefaultMaintenanceFrequency() time.Duration + + // ClientSideCacheLimit returns the max cache size required on client side + ClientSideCacheLimit(repoOption map[string]string) int64 } // BackupRepo provides the access to the backup repository diff --git a/pkg/repository/udmrepo/repo_options.go b/pkg/repository/udmrepo/repo_options.go index efddfdcd1..a8371b7c1 100644 --- a/pkg/repository/udmrepo/repo_options.go +++ b/pkg/repository/udmrepo/repo_options.go @@ -17,6 +17,7 @@ limitations under the License. package udmrepo import ( + "maps" "os" "path/filepath" "strings" @@ -65,6 +66,7 @@ const ( StoreOptionGenReadOnly = "readOnly" StoreOptionCacheLimit = "cacheLimitMB" + StoreOptionCacheDir = "cacheDir" ThrottleOptionReadOps = "readOPS" ThrottleOptionWriteOps = "writeOPS" @@ -184,9 +186,7 @@ func WithStoreOptions(getter StoreOptionsGetter, param any) func(*RepoOptions) e options.StorageType = storeType - for k, v := range storeOptions { - options.StorageOptions[k] = v - } + maps.Copy(options.StorageOptions, storeOptions) return nil } diff --git a/pkg/repository/udmrepo/service/service.go b/pkg/repository/udmrepo/service/service.go index c2f0a9b0e..195b67b11 100644 --- a/pkg/repository/udmrepo/service/service.go +++ b/pkg/repository/udmrepo/service/service.go @@ -24,6 +24,6 @@ import ( ) // Create creates an instance of BackupRepoService -func Create(logger logrus.FieldLogger) udmrepo.BackupRepoService { +func Create(repoBackend string, logger logrus.FieldLogger) udmrepo.BackupRepoService { return kopialib.NewKopiaRepoService(logger) } diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index b09b8a571..7cde36911 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -74,7 +74,7 @@ func NewKopiaUploaderProvider( return nil, errors.Wrapf(err, "error to get repo options") } - repoSvc := BackupRepoServiceCreateFunc(log) + repoSvc := BackupRepoServiceCreateFunc(backupRepo.Spec.RepositoryType, log) log.WithField("repoUID", repoUID).Info("Opening backup repo") kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt) diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 6231d5d62..728c26da2 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -373,7 +373,7 @@ func TestNewKopiaUploaderProvider(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { credGetter := &credentials.CredentialGetter{FromSecret: tc.mockCredGetter} - BackupRepoServiceCreateFunc = func(logger logrus.FieldLogger) udmrepo.BackupRepoService { + BackupRepoServiceCreateFunc = func(string, logrus.FieldLogger) udmrepo.BackupRepoService { return tc.mockBackupRepoService } // Call the function being tested.