From cc0c41d2169002949f36063d54091de921b9e3a8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 15 Aug 2024 08:41:03 -0700 Subject: [PATCH] remove region locks and make them simpler (#20268) - single flight approach is now optional, instead of default. - parallelize the loaders upto 32 items per assets (more room for improvement possible) --- cmd/erasure-server-pool.go | 2 +- cmd/iam-object-store.go | 229 +++++++++++++++++++++++++++++++------ cmd/iam-store.go | 188 +++++++++++++++++++----------- cmd/iam.go | 25 ++-- 4 files changed, 333 insertions(+), 111 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index e4287a958..0e6b01155 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2122,7 +2122,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) if len(disks) == 0 { xioutil.SafeClose(results) - err := fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx) + err := fmt.Errorf("Walk: no online disks found in (set:%d pool:%d) %w", setIdx, poolIdx, errErasureReadQuorum) cancelCause(err) return err } diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 93a488ab2..e45449f35 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -34,6 +34,7 @@ import ( xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/v3/sync/errgroup" "github.com/puzpuzpuz/xsync/v3" ) @@ -182,19 +183,20 @@ func (iamOS *IAMObjectStore) loadPolicyDocWithRetry(ctx context.Context, policy } } -func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m map[string]PolicyDoc) error { +func (iamOS *IAMObjectStore) loadPolicy(ctx context.Context, policy string) (PolicyDoc, error) { + var p PolicyDoc + data, objInfo, err := iamOS.loadIAMConfigBytesWithMetadata(ctx, getPolicyDocPath(policy)) if err != nil { if err == errConfigNotFound { - return errNoSuchPolicy + return p, errNoSuchPolicy } - return err + return p, err } - var p PolicyDoc err = p.parseJSON(data) if err != nil { - return err + return p, err } if p.Version == 0 { @@ -205,6 +207,14 @@ func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m p.UpdateDate = objInfo.ModTime } + return p, nil +} + +func (iamOS *IAMObjectStore) loadPolicyDoc(ctx context.Context, policy string, m map[string]PolicyDoc) error { + p, err := iamOS.loadPolicy(ctx, policy) + if err != nil { + return err + } m[policy] = p return nil } @@ -237,21 +247,21 @@ func (iamOS *IAMObjectStore) loadSecretKey(ctx context.Context, user string, use return u.Credentials.SecretKey, nil } -func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType IAMUserType, m map[string]UserIdentity) error { +func (iamOS *IAMObjectStore) loadUserIdentity(ctx context.Context, user string, userType IAMUserType) (UserIdentity, error) { var u UserIdentity err := iamOS.loadIAMConfig(ctx, &u, getUserIdentityPath(user, userType)) if err != nil { if err == errConfigNotFound { - return errNoSuchUser + return u, errNoSuchUser } - return err + return u, err } if u.Credentials.IsExpired() { // Delete expired identity - ignoring errors here. iamOS.deleteIAMConfig(ctx, getUserIdentityPath(user, userType)) iamOS.deleteIAMConfig(ctx, getMappedPolicyPath(user, userType, false)) - return nil + return u, errNoSuchUser } if u.Credentials.AccessKey == "" { @@ -267,7 +277,7 @@ func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType iamOS.deleteIAMConfig(ctx, getUserIdentityPath(user, userType)) iamOS.deleteIAMConfig(ctx, getMappedPolicyPath(user, userType, false)) } - return nil + return u, errNoSuchUser } u.Credentials.Claims = jwtClaims.Map() @@ -277,6 +287,35 @@ func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType u.Credentials.Description = u.Credentials.Comment } + return u, nil +} + +func (iamOS *IAMObjectStore) loadUserConcurrent(ctx context.Context, userType IAMUserType, users ...string) ([]UserIdentity, error) { + userIdentities := make([]UserIdentity, len(users)) + g := errgroup.WithNErrs(len(users)) + + for index := range users { + index := index + g.Go(func() error { + userName := path.Dir(users[index]) + user, err := iamOS.loadUserIdentity(ctx, userName, userType) + if err != nil && !errors.Is(err, errNoSuchUser) { + return fmt.Errorf("unable to load the user `%s`: %w", userName, err) + } + userIdentities[index] = user + return nil + }, index) + } + + err := errors.Join(g.Wait()...) + return userIdentities, err +} + +func (iamOS *IAMObjectStore) loadUser(ctx context.Context, user string, userType IAMUserType, m map[string]UserIdentity) error { + u, err := iamOS.loadUserIdentity(ctx, user, userType) + if err != nil { + return err + } m[user] = u return nil } @@ -358,16 +397,44 @@ func (iamOS *IAMObjectStore) loadMappedPolicyWithRetry(ctx context.Context, name } } -func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { +func (iamOS *IAMObjectStore) loadMappedPolicyInternal(ctx context.Context, name string, userType IAMUserType, isGroup bool) (MappedPolicy, error) { var p MappedPolicy err := iamOS.loadIAMConfig(ctx, &p, getMappedPolicyPath(name, userType, isGroup)) if err != nil { if err == errConfigNotFound { - return errNoSuchPolicy + return p, errNoSuchPolicy } - return err + return p, err + } + return p, nil +} + +func (iamOS *IAMObjectStore) loadMappedPolicyConcurrent(ctx context.Context, userType IAMUserType, isGroup bool, users ...string) ([]MappedPolicy, error) { + mappedPolicies := make([]MappedPolicy, len(users)) + g := errgroup.WithNErrs(len(users)) + + for index := range users { + index := index + g.Go(func() error { + userName := strings.TrimSuffix(users[index], ".json") + userMP, err := iamOS.loadMappedPolicyInternal(ctx, userName, userType, isGroup) + if err != nil && !errors.Is(err, errNoSuchPolicy) { + return fmt.Errorf("unable to load the user policy map `%s`: %w", userName, err) + } + mappedPolicies[index] = userMP + return nil + }, index) } + err := errors.Join(g.Wait()...) + return mappedPolicies, err +} + +func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { + p, err := iamOS.loadMappedPolicyInternal(ctx, name, userType, isGroup) + if err != nil { + return err + } m.Store(name, p) return nil } @@ -455,6 +522,27 @@ const ( maxIAMLoadOpTime = 5 * time.Second ) +func (iamOS *IAMObjectStore) loadPolicyDocConcurrent(ctx context.Context, policies ...string) ([]PolicyDoc, error) { + policyDocs := make([]PolicyDoc, len(policies)) + g := errgroup.WithNErrs(len(policies)) + + for index := range policies { + index := index + g.Go(func() error { + policyName := path.Dir(policies[index]) + policyDoc, err := iamOS.loadPolicy(ctx, policyName) + if err != nil && !errors.Is(err, errNoSuchPolicy) { + return fmt.Errorf("unable to load the policy doc `%s`: %w", policyName, err) + } + policyDocs[index] = policyDoc + return nil + }, index) + } + + err := errors.Join(g.Wait()...) + return policyDocs, err +} + // Assumes cache is locked by caller. func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iamCache, firstTime bool) error { bootstrapTraceMsgFirstTime := func(s string) { @@ -490,12 +578,37 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam policyLoadStartTime := UTCNow() policiesList := listedConfigItems[policiesListKey] - for _, item := range policiesList { - policyName := path.Dir(item) - if err := iamOS.loadPolicyDoc(ctx, policyName, cache.iamPolicyDocsMap); err != nil && !errors.Is(err, errNoSuchPolicy) { - return fmt.Errorf("unable to load the policy doc `%s`: %w", policyName, err) + count := 32 // number of parallel IAM loaders + for { + if len(policiesList) < count { + policyDocs, err := iamOS.loadPolicyDocConcurrent(ctx, policiesList...) + if err != nil { + return err + } + for index := range policiesList { + if policyDocs[index].Policy.Version != "" { + policyName := path.Dir(policiesList[index]) + cache.iamPolicyDocsMap[policyName] = policyDocs[index] + } + } + break } + + policyDocs, err := iamOS.loadPolicyDocConcurrent(ctx, policiesList[:count]...) + if err != nil { + return err + } + + for index := range policiesList[:count] { + if policyDocs[index].Policy.Version != "" { + policyName := path.Dir(policiesList[index]) + cache.iamPolicyDocsMap[policyName] = policyDocs[index] + } + } + + policiesList = policiesList[count:] } + if took := time.Since(policyLoadStartTime); took > maxIAMLoadOpTime { logger.Info("Policy docs load took %.2fs (for %d items)", took.Seconds(), len(policiesList)) } @@ -504,12 +617,37 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam bootstrapTraceMsgFirstTime("loading regular IAM users") regUsersLoadStartTime := UTCNow() regUsersList := listedConfigItems[usersListKey] - for _, item := range regUsersList { - userName := path.Dir(item) - if err := iamOS.loadUser(ctx, userName, regUser, cache.iamUsersMap); err != nil && err != errNoSuchUser { - return fmt.Errorf("unable to load the user: %w", err) + + for { + if len(regUsersList) < count { + users, err := iamOS.loadUserConcurrent(ctx, regUser, regUsersList...) + if err != nil { + return err + } + for index := range regUsersList { + if users[index].Credentials.AccessKey != "" { + userName := path.Dir(regUsersList[index]) + cache.iamUsersMap[userName] = users[index] + } + } + break } + + users, err := iamOS.loadUserConcurrent(ctx, regUser, regUsersList[:count]...) + if err != nil { + return err + } + + for index := range regUsersList[:count] { + if users[index].Credentials.AccessKey != "" { + userName := path.Dir(regUsersList[index]) + cache.iamUsersMap[userName] = users[index] + } + } + + regUsersList = regUsersList[count:] } + if took := time.Since(regUsersLoadStartTime); took > maxIAMLoadOpTime { actualLoaded := len(cache.iamUsersMap) logger.Info("Reg. users load took %.2fs (for %d items with %d expired items)", took.Seconds(), @@ -533,12 +671,38 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam bootstrapTraceMsgFirstTime("loading user policy mapping") userPolicyMappingLoadStartTime := UTCNow() userPolicyMappingsList := listedConfigItems[policyDBUsersListKey] - for _, item := range userPolicyMappingsList { - userName := strings.TrimSuffix(item, ".json") - if err := iamOS.loadMappedPolicy(ctx, userName, regUser, false, cache.iamUserPolicyMap); err != nil && !errors.Is(err, errNoSuchPolicy) { - return fmt.Errorf("unable to load the policy mapping for the user: %w", err) + for { + if len(userPolicyMappingsList) < count { + mappedPolicies, err := iamOS.loadMappedPolicyConcurrent(ctx, regUser, false, userPolicyMappingsList...) + if err != nil { + return err + } + + for index := range userPolicyMappingsList { + if mappedPolicies[index].Policies != "" { + userName := strings.TrimSuffix(userPolicyMappingsList[index], ".json") + cache.iamUserPolicyMap.Store(userName, mappedPolicies[index]) + } + } + + break } + + mappedPolicies, err := iamOS.loadMappedPolicyConcurrent(ctx, regUser, false, userPolicyMappingsList[:count]...) + if err != nil { + return err + } + + for index := range userPolicyMappingsList[:count] { + if mappedPolicies[index].Policies != "" { + userName := strings.TrimSuffix(userPolicyMappingsList[index], ".json") + cache.iamUserPolicyMap.Store(userName, mappedPolicies[index]) + } + } + + userPolicyMappingsList = userPolicyMappingsList[count:] } + if took := time.Since(userPolicyMappingLoadStartTime); took > maxIAMLoadOpTime { logger.Info("User policy mappings load took %.2fs (for %d items)", took.Seconds(), len(userPolicyMappingsList)) } @@ -617,21 +781,16 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam for _, item := range listedConfigItems[stsListKey] { userName := path.Dir(item) // loadUser() will delete expired user during the load. - err := iamOS.loadUser(ctx, userName, stsUser, stsAccountsFromStore) - if err != nil && !errors.Is(err, errNoSuchUser) { - return fmt.Errorf("unable to load user during STS purge: %w (%s)", err, item) - } - + iamLogIf(ctx, iamOS.loadUser(ctx, userName, stsUser, stsAccountsFromStore)) + // No need to return errors for failed expiration of STS users } + // Loading the STS policy mappings from disk ensures that stale entries // (removed during loadUser() in the loop above) are removed from memory. for _, item := range listedConfigItems[policyDBSTSUsersListKey] { stsName := strings.TrimSuffix(item, ".json") - err := iamOS.loadMappedPolicy(ctx, stsName, stsUser, false, stsAccPoliciesFromStore) - if err != nil && !errors.Is(err, errNoSuchPolicy) { - return fmt.Errorf("unable to load policies during STS purge: %w (%s)", err, item) - } - + iamLogIf(ctx, iamOS.loadMappedPolicy(ctx, stsName, stsUser, false, stsAccPoliciesFromStore)) + // No need to return errors for failed expiration of STS users } took := time.Since(purgeStart).Seconds() diff --git a/cmd/iam-store.go b/cmd/iam-store.go index 30add72ab..2c0e24b89 100644 --- a/cmd/iam-store.go +++ b/cmd/iam-store.go @@ -31,9 +31,10 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/auth" + "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/identity/openid" "github.com/minio/minio/internal/jwt" - "github.com/minio/pkg/v3/console" + "github.com/minio/pkg/v3/env" "github.com/minio/pkg/v3/policy" "github.com/puzpuzpuz/xsync/v3" "golang.org/x/sync/singleflight" @@ -642,7 +643,7 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) erro // An in-memory cache must be replaced only if we know for sure that the // values loaded from disk are not stale. They might be stale if the // cached.updatedAt is more recent than the refresh cycle began. - if cache.updatedAt.Before(loadedAt) { + if cache.updatedAt.Before(loadedAt) || firstTime { // No one has updated anything since the config was loaded, // so we just replace whatever is on the disk into memory. cache.iamGroupPolicyMap = newCache.iamGroupPolicyMap @@ -1863,8 +1864,12 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user delete(cache.iamUsersMap, u.AccessKey) case u.IsTemp(): _ = store.deleteUserIdentity(ctx, u.AccessKey, stsUser) + delete(cache.iamSTSAccountsMap, u.AccessKey) delete(cache.iamUsersMap, u.AccessKey) } + if store.group != nil { + store.group.Forget(u.AccessKey) + } } } } @@ -1878,8 +1883,13 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user // ignore if user is already deleted. err = nil } + if userType == stsUser { + delete(cache.iamSTSAccountsMap, accessKey) + } delete(cache.iamUsersMap, accessKey) - store.group.Forget(accessKey) + if store.group != nil { + store.group.Forget(accessKey) + } cache.updatedAt = time.Now() @@ -1954,8 +1964,13 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error // we are only logging errors, not handling them. err := store.deleteUserIdentity(ctx, user, userType) iamLogIf(GlobalContext, err) + if userType == stsUser { + delete(cache.iamSTSAccountsMap, user) + } delete(cache.iamUsersMap, user) - store.group.Forget(user) + if store.group != nil { + store.group.Forget(user) + } deleted = true } @@ -2726,99 +2741,138 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred // LoadUser - attempts to load user info from storage and updates cache. func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) error { - // We use singleflight to de-duplicate requests when server - // is coming up and loading accessKey and its associated assets - val, err, shared := store.group.Do(accessKey, func() (val interface{}, err error) { - cache := store.lock() - defer func() { - cache.updatedAt = time.Now() - store.unlock() - }() + groupLoad := env.Get("_MINIO_IAM_GROUP_REFRESH", config.EnableOff) == config.EnableOn - _, found := cache.iamUsersMap[accessKey] + newCachePopulate := func() (val interface{}, err error) { + newCache := newIamCache() - // Check for regular user access key - if !found { - err = store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap) - if _, found = cache.iamUsersMap[accessKey]; found { - // load mapped policies - err = store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, cache.iamUserPolicyMap, 3) + // Check for service account first + store.loadUser(ctx, accessKey, svcUser, newCache.iamUsersMap) + + svc, found := newCache.iamUsersMap[accessKey] + if found { + // Load parent user and mapped policies. + if store.getUsersSysType() == MinIOUsersSysType { + err = store.loadUser(ctx, svc.Credentials.ParentUser, regUser, newCache.iamUsersMap) + // NOTE: we are not worried about loading errors from policies. + store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, newCache.iamUserPolicyMap, 3) + } else { + // In case of LDAP the parent user's policy mapping needs to be loaded into sts map + // NOTE: we are not worried about loading errors from policies. + store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, newCache.iamSTSPolicyMap, 3) } } - // Check for service account if !found { - err = store.loadUser(ctx, accessKey, svcUser, cache.iamUsersMap) - var svc UserIdentity - svc, found = cache.iamUsersMap[accessKey] - if found { - // Load parent user and mapped policies. - if store.getUsersSysType() == MinIOUsersSysType { - err = store.loadUser(ctx, svc.Credentials.ParentUser, regUser, cache.iamUsersMap) - if err == nil { - err = store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, cache.iamUserPolicyMap, 3) - } - } else { - // In case of LDAP the parent user's policy mapping needs to be - // loaded into sts map - err = store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) - } + err = store.loadUser(ctx, accessKey, regUser, newCache.iamUsersMap) + if _, found = newCache.iamUsersMap[accessKey]; found { + // NOTE: we are not worried about loading errors from policies. + store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, newCache.iamUserPolicyMap, 3) } } // Check for STS account - stsAccountFound := false var stsUserCred UserIdentity if !found { - err = store.loadUser(ctx, accessKey, stsUser, cache.iamSTSAccountsMap) - if stsUserCred, found = cache.iamSTSAccountsMap[accessKey]; found { + err = store.loadUser(ctx, accessKey, stsUser, newCache.iamSTSAccountsMap) + if stsUserCred, found = newCache.iamSTSAccountsMap[accessKey]; found { // Load mapped policy - err = store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) - stsAccountFound = true + // NOTE: we are not worried about loading errors from policies. + store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, newCache.iamSTSPolicyMap, 3) } } // Load any associated policy definitions - if !stsAccountFound { - pols, _ := cache.iamUserPolicyMap.Load(accessKey) - for _, policy := range pols.toSlice() { - if _, found = cache.iamPolicyDocsMap[policy]; !found { - err = store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) - } - } - } else { - pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) - for _, policy := range pols.toSlice() { - if _, found = cache.iamPolicyDocsMap[policy]; !found { - err = store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) - } + pols, _ := newCache.iamUserPolicyMap.Load(accessKey) + for _, policy := range pols.toSlice() { + if _, found = newCache.iamPolicyDocsMap[policy]; !found { + // NOTE: we are not worried about loading errors from policies. + store.loadPolicyDocWithRetry(ctx, policy, newCache.iamPolicyDocsMap, 3) } } - load := len(cache.iamGroupsMap) == 0 - if store.getUsersSysType() == LDAPUsersSysType && cache.iamGroupPolicyMap.Size() == 0 { - load = true - } - if load { - if _, err = store.updateGroups(ctx, cache); err != nil { - return "done", err + pols, _ = newCache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) + for _, policy := range pols.toSlice() { + if _, found = newCache.iamPolicyDocsMap[policy]; !found { + // NOTE: we are not worried about loading errors from policies. + store.loadPolicyDocWithRetry(ctx, policy, newCache.iamPolicyDocsMap, 3) } } - cache.buildUserGroupMemberships() + if groupLoad { + load := len(newCache.iamGroupsMap) == 0 + if store.getUsersSysType() == LDAPUsersSysType && newCache.iamGroupPolicyMap.Size() == 0 { + load = true + } + if load { + // NOTE: we are not worried about loading errors from groups. + store.updateGroups(ctx, newCache) + } + newCache.buildUserGroupMemberships() + } - return "done", err + return newCache, err + } + + var ( + val interface{} + err error + ) + if store.group != nil { + val, err, _ = store.group.Do(accessKey, newCachePopulate) + } else { + val, err = newCachePopulate() + } + + // Return error right away if any. + if err != nil { + if errors.Is(err, errNoSuchUser) || errors.Is(err, errConfigNotFound) { + return nil + } + return err + } + + newCache := val.(*iamCache) + + cache := store.lock() + defer store.unlock() + + // We need to merge the new cache with the existing cache because the + // periodic IAM reload is partial. The periodic load here is to account. + newCache.iamGroupPolicyMap.Range(func(k string, v MappedPolicy) bool { + cache.iamGroupPolicyMap.Store(k, v) + return true }) - if serverDebugLog { - console.Debugln("loadUser: loading shared", val, err, shared) + for k, v := range newCache.iamGroupsMap { + cache.iamGroupsMap[k] = v } - if IsErr(err, errNoSuchUser, errNoSuchPolicy, errNoSuchGroup) { - return nil + for k, v := range newCache.iamPolicyDocsMap { + cache.iamPolicyDocsMap[k] = v } - return err + for k, v := range newCache.iamUserGroupMemberships { + cache.iamUserGroupMemberships[k] = v + } + + newCache.iamUserPolicyMap.Range(func(k string, v MappedPolicy) bool { + cache.iamUserPolicyMap.Store(k, v) + return true + }) + + for k, v := range newCache.iamUsersMap { + cache.iamUsersMap[k] = v + } + + newCache.iamSTSPolicyMap.Range(func(k string, v MappedPolicy) bool { + cache.iamSTSPolicyMap.Store(k, v) + return true + }) + + cache.updatedAt = time.Now() + + return nil } func extractJWTClaims(u UserIdentity) (jwtClaims *jwt.MapClaims, err error) { diff --git a/cmd/iam.go b/cmd/iam.go index 8de0904d9..f9b78f8b4 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -46,9 +46,9 @@ import ( "github.com/minio/minio/internal/config/policy/opa" polplugin "github.com/minio/minio/internal/config/policy/plugin" xhttp "github.com/minio/minio/internal/http" - xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/jwt" "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/v3/env" "github.com/minio/pkg/v3/ldap" "github.com/minio/pkg/v3/policy" etcd "go.etcd.io/etcd/client/v3" @@ -178,9 +178,16 @@ func (sys *IAMSys) initStore(objAPI ObjectLayer, etcdClient *etcd.Client) { } if etcdClient == nil { - sys.store = &IAMStoreSys{newIAMObjectStore(objAPI, sys.usersSysType), &singleflight.Group{}} + var group *singleflight.Group + if env.Get("_MINIO_IAM_SINGLE_FLIGHT", config.EnableOff) == config.EnableOn { + group = &singleflight.Group{} + } + sys.store = &IAMStoreSys{ + IAMStorageAPI: newIAMObjectStore(objAPI, sys.usersSysType), + group: group, + } } else { - sys.store = &IAMStoreSys{newIAMEtcdStore(etcdClient, sys.usersSysType), &singleflight.Group{}} + sys.store = &IAMStoreSys{IAMStorageAPI: newIAMEtcdStore(etcdClient, sys.usersSysType)} } } @@ -225,7 +232,7 @@ func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error { select { case <-sys.configLoaded: default: - xioutil.SafeClose(sys.configLoaded) + close(sys.configLoaded) } return nil } @@ -307,8 +314,9 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc // Migrate IAM configuration, if necessary. if err := saveIAMFormat(retryCtx, sys.store); err != nil { if configRetriableErrors(err) { - logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) - time.Sleep(time.Duration(r.Float64() * float64(time.Second))) + retryInterval := time.Duration(r.Float64() * float64(time.Second)) + logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v) (retrying in %s)", err, retryInterval) + time.Sleep(retryInterval) continue } iamLogIf(ctx, fmt.Errorf("IAM sub-system is partially initialized, unable to write the IAM format: %w", err), logger.WarningKind) @@ -340,8 +348,9 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc for { if err := sys.Load(retryCtx, true); err != nil { if configRetriableErrors(err) { - logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) - time.Sleep(time.Duration(r.Float64() * float64(time.Second))) + retryInterval := time.Duration(r.Float64() * float64(time.Second)) + logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v) (retrying in %s)", err, retryInterval) + time.Sleep(retryInterval) continue } if err != nil {