mirror of
https://github.com/vmware-tanzu/velero.git
synced 2025-12-23 06:15:21 +00:00
udmrepo support cache dir
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
@@ -81,11 +82,16 @@ func NewUnifiedRepoProvider(
|
||||
log: log,
|
||||
}
|
||||
|
||||
repo.repoService = createRepoService(log)
|
||||
repo.repoService = createRepoService(repoBackend, log)
|
||||
|
||||
return &repo
|
||||
}
|
||||
|
||||
func GetUnifiedRepoClientSideCacheLimit(repoOption map[string]string, repoBackend string, log logrus.FieldLogger) int64 {
|
||||
repoService := createRepoService(repoBackend, log)
|
||||
return repoService.ClientSideCacheLimit(repoOption)
|
||||
}
|
||||
|
||||
func (urp *unifiedRepoProvider) InitRepo(ctx context.Context, param RepoParam) error {
|
||||
log := urp.log.WithFields(logrus.Fields{
|
||||
"BSL name": param.BackupLocation.Name,
|
||||
@@ -416,12 +422,11 @@ func (urp *unifiedRepoProvider) GetStoreOptions(param any) (map[string]string, e
|
||||
}
|
||||
|
||||
storeOptions := make(map[string]string)
|
||||
for k, v := range storeVar {
|
||||
storeOptions[k] = v
|
||||
}
|
||||
maps.Copy(storeOptions, storeVar)
|
||||
maps.Copy(storeOptions, storeCred)
|
||||
|
||||
for k, v := range storeCred {
|
||||
storeOptions[k] = v
|
||||
if repoParam.CacheDir != "" {
|
||||
storeOptions[udmrepo.StoreOptionCacheDir] = repoParam.CacheDir
|
||||
}
|
||||
|
||||
return storeOptions, nil
|
||||
@@ -597,6 +602,6 @@ func getStorageVariables(backupLocation *velerov1api.BackupStorageLocation, repo
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func createRepoService(log logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
return reposervice.Create(log)
|
||||
func createRepoService(repoBackend string, log logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
return reposervice.Create(repoBackend, log)
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultCacheLimitMB = 5000
|
||||
DefaultCacheLimitMB = 5000
|
||||
maxCacheDurationSecond = 30
|
||||
)
|
||||
|
||||
@@ -66,7 +66,8 @@ func SetupNewRepositoryOptions(ctx context.Context, flags map[string]string) rep
|
||||
|
||||
// SetupConnectOptions setups the options when connecting to an existing Kopia repository
|
||||
func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) repo.ConnectOptions {
|
||||
cacheLimit := optionalHaveIntWithDefault(ctx, udmrepo.StoreOptionCacheLimit, repoOptions.StorageOptions, defaultCacheLimitMB) << 20
|
||||
cacheLimit := optionalHaveIntWithDefault(ctx, udmrepo.StoreOptionCacheLimit, repoOptions.StorageOptions, DefaultCacheLimitMB) << 20
|
||||
cacheDir := optionalHaveString(udmrepo.StoreOptionCacheDir, repoOptions.StorageOptions)
|
||||
|
||||
// 80% for data cache and 20% for metadata cache and align to KB
|
||||
dataCacheLimit := (cacheLimit / 5 * 4) >> 10
|
||||
@@ -74,6 +75,7 @@ func SetupConnectOptions(ctx context.Context, repoOptions udmrepo.RepoOptions) r
|
||||
|
||||
return repo.ConnectOptions{
|
||||
CachingOptions: content.CachingOptions{
|
||||
CacheDirectory: cacheDir,
|
||||
// softLimit 80%
|
||||
ContentCacheSizeBytes: (dataCacheLimit / 5 * 4) << 10,
|
||||
MetadataCacheSizeBytes: (metadataCacheLimit / 5 * 4) << 10,
|
||||
|
||||
@@ -18,6 +18,7 @@ package kopialib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
@@ -35,6 +36,7 @@ import (
|
||||
|
||||
"github.com/vmware-tanzu/velero/pkg/kopia"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo"
|
||||
"github.com/vmware-tanzu/velero/pkg/repository/udmrepo/kopialib/backend"
|
||||
)
|
||||
|
||||
type kopiaRepoService struct {
|
||||
@@ -79,6 +81,7 @@ const (
|
||||
defaultMaintainCheckPeriod = time.Hour
|
||||
overwriteFullMaintainInterval = time.Duration(0)
|
||||
overwriteQuickMaintainInterval = time.Duration(0)
|
||||
repoBackend = "kopia"
|
||||
)
|
||||
|
||||
var kopiaRepoOpen = repo.Open
|
||||
@@ -211,6 +214,34 @@ func (ks *kopiaRepoService) DefaultMaintenanceFrequency() time.Duration {
|
||||
return defaultMaintainCheckPeriod
|
||||
}
|
||||
|
||||
func (ks *kopiaRepoService) ClientSideCacheLimit(repoOption map[string]string) int64 {
|
||||
defaultLimit := int64(backend.DefaultCacheLimitMB << 20)
|
||||
if repoOption == nil {
|
||||
return defaultLimit
|
||||
}
|
||||
|
||||
if v, found := repoOption[repoBackend]; found {
|
||||
var configs map[string]any
|
||||
if err := json.Unmarshal([]byte(v), &configs); err != nil {
|
||||
ks.logger.WithError(err).Warnf("error unmarshalling config data from data %v", v)
|
||||
return defaultLimit
|
||||
}
|
||||
|
||||
limit := defaultLimit
|
||||
if v, found := configs[udmrepo.StoreOptionCacheLimit]; found {
|
||||
if iv, ok := v.(float64); ok {
|
||||
limit = int64(iv) << 20
|
||||
} else {
|
||||
ks.logger.Warnf("ignore cache limit from data %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
return limit
|
||||
}
|
||||
|
||||
return defaultLimit
|
||||
}
|
||||
|
||||
func (km *kopiaMaintenance) runMaintenance(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
err := snapshotmaintenance.Run(kopia.SetupKopiaLog(ctx, km.logger), rep, km.mode, false, maintenance.SafetyFull)
|
||||
if err != nil {
|
||||
|
||||
@@ -1345,3 +1345,63 @@ func TestMaintainProgress(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientSideCacheLimit(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
repoOption map[string]string
|
||||
expected int64
|
||||
}{
|
||||
{
|
||||
name: "nil option",
|
||||
expected: 5000 << 20,
|
||||
},
|
||||
{
|
||||
name: "no option",
|
||||
repoOption: map[string]string{
|
||||
"other-repo": "\"enableCompression\": true",
|
||||
},
|
||||
expected: 5000 << 20,
|
||||
},
|
||||
{
|
||||
name: "unmarshall fails",
|
||||
repoOption: map[string]string{
|
||||
"kopia": "wrong-json",
|
||||
},
|
||||
expected: 5000 << 20,
|
||||
},
|
||||
{
|
||||
name: "no cache limit",
|
||||
repoOption: map[string]string{
|
||||
"kopia": "{\"enableCompression\": true}",
|
||||
},
|
||||
expected: 5000 << 20,
|
||||
},
|
||||
{
|
||||
name: "wrong cache value type",
|
||||
repoOption: map[string]string{
|
||||
"kopia": "{\"cacheLimitMB\": \"abcd\",\"enableCompression\": true}",
|
||||
},
|
||||
expected: 5000 << 20,
|
||||
},
|
||||
{
|
||||
name: "succeed",
|
||||
repoOption: map[string]string{
|
||||
"kopia": "{\"cacheLimitMB\": 1,\"enableCompression\": true}",
|
||||
},
|
||||
expected: 1048576,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ks := &kopiaRepoService{
|
||||
logger: velerotest.NewLogger(),
|
||||
}
|
||||
|
||||
limit := ks.ClientSideCacheLimit(tc.repoOption)
|
||||
|
||||
assert.Equal(t, tc.expected, limit)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.39.1. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.2. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
@@ -16,7 +16,25 @@ type BackupRepoService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// DefaultMaintenanceFrequency provides a mock function with given fields:
|
||||
// ClientSideCacheLimit provides a mock function with given fields: repoOption
|
||||
func (_m *BackupRepoService) ClientSideCacheLimit(repoOption map[string]string) int64 {
|
||||
ret := _m.Called(repoOption)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for ClientSideCacheLimit")
|
||||
}
|
||||
|
||||
var r0 int64
|
||||
if rf, ok := ret.Get(0).(func(map[string]string) int64); ok {
|
||||
r0 = rf(repoOption)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// DefaultMaintenanceFrequency provides a mock function with no fields
|
||||
func (_m *BackupRepoService) DefaultMaintenanceFrequency() time.Duration {
|
||||
ret := _m.Called()
|
||||
|
||||
|
||||
@@ -93,6 +93,9 @@ type BackupRepoService interface {
|
||||
// DefaultMaintenanceFrequency returns the defgault frequency of maintenance, callers refer this
|
||||
// frequency to maintain the backup repository to get the best maintenance performance
|
||||
DefaultMaintenanceFrequency() time.Duration
|
||||
|
||||
// ClientSideCacheLimit returns the max cache size required on client side
|
||||
ClientSideCacheLimit(repoOption map[string]string) int64
|
||||
}
|
||||
|
||||
// BackupRepo provides the access to the backup repository
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package udmrepo
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -65,6 +66,7 @@ const (
|
||||
StoreOptionGenReadOnly = "readOnly"
|
||||
|
||||
StoreOptionCacheLimit = "cacheLimitMB"
|
||||
StoreOptionCacheDir = "cacheDir"
|
||||
|
||||
ThrottleOptionReadOps = "readOPS"
|
||||
ThrottleOptionWriteOps = "writeOPS"
|
||||
@@ -184,9 +186,7 @@ func WithStoreOptions(getter StoreOptionsGetter, param any) func(*RepoOptions) e
|
||||
|
||||
options.StorageType = storeType
|
||||
|
||||
for k, v := range storeOptions {
|
||||
options.StorageOptions[k] = v
|
||||
}
|
||||
maps.Copy(options.StorageOptions, storeOptions)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,6 @@ import (
|
||||
)
|
||||
|
||||
// Create creates an instance of BackupRepoService
|
||||
func Create(logger logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
func Create(repoBackend string, logger logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
return kopialib.NewKopiaRepoService(logger)
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ func NewKopiaUploaderProvider(
|
||||
return nil, errors.Wrapf(err, "error to get repo options")
|
||||
}
|
||||
|
||||
repoSvc := BackupRepoServiceCreateFunc(log)
|
||||
repoSvc := BackupRepoServiceCreateFunc(backupRepo.Spec.RepositoryType, log)
|
||||
log.WithField("repoUID", repoUID).Info("Opening backup repo")
|
||||
|
||||
kp.bkRepo, err = repoSvc.Open(ctx, *repoOpt)
|
||||
|
||||
@@ -373,7 +373,7 @@ func TestNewKopiaUploaderProvider(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
credGetter := &credentials.CredentialGetter{FromSecret: tc.mockCredGetter}
|
||||
BackupRepoServiceCreateFunc = func(logger logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
BackupRepoServiceCreateFunc = func(string, logrus.FieldLogger) udmrepo.BackupRepoService {
|
||||
return tc.mockBackupRepoService
|
||||
}
|
||||
// Call the function being tested.
|
||||
|
||||
Reference in New Issue
Block a user