diff --git a/cmd/iam-dummy-store.go b/cmd/iam-dummy-store.go index 02bb2cc45..df69058f3 100644 --- a/cmd/iam-dummy-store.go +++ b/cmd/iam-dummy-store.go @@ -81,10 +81,6 @@ func (ids *iamDummyStore) loadMappedPolicies(ctx context.Context, userType IAMUs return nil } -func (ids *iamDummyStore) loadAll(ctx context.Context, sys *IAMSys) error { - return sys.Load(ctx, ids) -} - func (ids *iamDummyStore) saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error { return nil } @@ -128,6 +124,3 @@ func (ids *iamDummyStore) deleteUserIdentity(ctx context.Context, name string, u func (ids *iamDummyStore) deleteGroupInfo(ctx context.Context, name string) error { return nil } - -func (ids *iamDummyStore) watch(context.Context, *IAMSys) { -} diff --git a/cmd/iam-etcd-store.go b/cmd/iam-etcd-store.go index 3bfe176a6..77908df94 100644 --- a/cmd/iam-etcd-store.go +++ b/cmd/iam-etcd-store.go @@ -457,10 +457,6 @@ func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUse } -func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error { - return sys.Load(ctx, ies) -} - func (ies *IAMEtcdStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error { return ies.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName)) } @@ -509,153 +505,58 @@ func (ies *IAMEtcdStore) deleteGroupInfo(ctx context.Context, name string) error return err } -func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) { - for { - outerLoop: - // Refresh IAMSys with etcd watch. - watchCh := ies.client.Watch(ctx, - iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) +func (ies *IAMEtcdStore) watch(ctx context.Context, keyPath string) <-chan iamWatchEvent { + ch := make(chan iamWatchEvent) + // go routine to read events from the etcd watch channel and send them + // down `ch` + go func() { for { - select { - case <-ctx.Done(): - return - case watchResp, ok := <-watchCh: - if !ok { - time.Sleep(1 * time.Second) - // Upon an error on watch channel - // re-init the watch channel. - goto outerLoop - } - if err := watchResp.Err(); err != nil { - logger.LogIf(ctx, err) - // log and retry. - time.Sleep(1 * time.Second) - // Upon an error on watch channel - // re-init the watch channel. - goto outerLoop - } - for _, event := range watchResp.Events { - ies.lock() - ies.reloadFromEvent(sys, event) - ies.unlock() - } - } - } - } -} + outerLoop: + watchCh := ies.client.Watch(ctx, + keyPath, etcd.WithPrefix(), etcd.WithKeysOnly()) -// sys.RLock is held by caller. -func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event) { - eventCreate := event.IsModify() || event.IsCreate() - eventDelete := event.Type == etcd.EventTypeDelete - usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix) - groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix) - stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix) - svcPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigServiceAccountsPrefix) - policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix) - policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) - policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) - policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix) + for { + select { + case <-ctx.Done(): + return + case watchResp, ok := <-watchCh: + if !ok { + time.Sleep(1 * time.Second) + // Upon an error on watch channel + // re-init the watch channel. + goto outerLoop + } + if err := watchResp.Err(); err != nil { + logger.LogIf(ctx, err) + // log and retry. + time.Sleep(1 * time.Second) + // Upon an error on watch channel + // re-init the watch channel. + goto outerLoop + } + for _, event := range watchResp.Events { + isCreateEvent := event.IsModify() || event.IsCreate() + isDeleteEvent := event.Type == etcd.EventTypeDelete - ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) - defer cancel() + switch { + case isCreateEvent: + ch <- iamWatchEvent{ + isCreated: true, + keyPath: string(event.Kv.Key), + } + case isDeleteEvent: + ch <- iamWatchEvent{ + isCreated: false, + keyPath: string(event.Kv.Key), + } + } - switch { - case eventCreate: - switch { - case usersPrefix: - accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigUsersPrefix)) - ies.loadUser(ctx, accessKey, regUser, sys.iamUsersMap) - case stsPrefix: - accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigSTSPrefix)) - // Not using ies.loadUser due to the custom loading of an STS account - var u UserIdentity - if err := ies.loadIAMConfig(ctx, &u, getUserIdentityPath(accessKey, stsUser)); err == nil { - ies.addUser(ctx, accessKey, stsUser, u, sys.iamUsersMap) - // We are on purpose not persisting the policy map for parent - // user, although this is a hack, it is a good enough hack - // at this point in time - we need to overhaul our OIDC - // usage with service accounts with a more cleaner implementation - // - // This mapping is necessary to ensure that valid credentials - // have necessary ParentUser present - this is mainly for only - // webIdentity based STS tokens. - parentAccessKey := u.Credentials.ParentUser - if parentAccessKey != "" && parentAccessKey != globalActiveCred.AccessKey { - if _, ok := sys.iamUserPolicyMap[parentAccessKey]; !ok { - sys.iamUserPolicyMap[parentAccessKey] = sys.iamUserPolicyMap[accessKey] } } } - case svcPrefix: - accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigServiceAccountsPrefix)) - ies.loadUser(ctx, accessKey, svcUser, sys.iamUsersMap) - case groupsPrefix: - group := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigGroupsPrefix)) - ies.loadGroup(ctx, group, sys.iamGroupsMap) - gi := sys.iamGroupsMap[group] - sys.removeGroupFromMembershipsMap(group) - sys.updateGroupMembershipsMap(group, &gi) - case policyPrefix: - policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigPoliciesPrefix)) - ies.loadPolicyDoc(ctx, policyName, sys.iamPolicyDocsMap) - case policyDBUsersPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBUsersPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - ies.loadMappedPolicy(ctx, user, regUser, false, sys.iamUserPolicyMap) - case policyDBSTSUsersPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBSTSUsersPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - ies.loadMappedPolicy(ctx, user, stsUser, false, sys.iamUserPolicyMap) - case policyDBGroupsPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBGroupsPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - ies.loadMappedPolicy(ctx, user, regUser, true, sys.iamGroupPolicyMap) } - case eventDelete: - switch { - case usersPrefix: - accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigUsersPrefix)) - delete(sys.iamUsersMap, accessKey) - case stsPrefix: - accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigSTSPrefix)) - delete(sys.iamUsersMap, accessKey) - case groupsPrefix: - group := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigGroupsPrefix)) - sys.removeGroupFromMembershipsMap(group) - delete(sys.iamGroupsMap, group) - delete(sys.iamGroupPolicyMap, group) - case policyPrefix: - policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), - iamConfigPoliciesPrefix)) - delete(sys.iamPolicyDocsMap, policyName) - case policyDBUsersPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBUsersPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - delete(sys.iamUserPolicyMap, user) - case policyDBSTSUsersPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBSTSUsersPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - delete(sys.iamUserPolicyMap, user) - case policyDBGroupsPrefix: - policyMapFile := strings.TrimPrefix(string(event.Kv.Key), - iamConfigPolicyDBGroupsPrefix) - user := strings.TrimSuffix(policyMapFile, ".json") - delete(sys.iamGroupPolicyMap, user) - } - } + }() + return ch + } diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 338b1b42b..1143e3d08 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -22,7 +22,6 @@ import ( "path" "strings" "sync" - "time" "unicode/utf8" jsoniter "github.com/json-iterator/go" @@ -370,11 +369,6 @@ func (iamOS *IAMObjectStore) loadMappedPolicies(ctx context.Context, userType IA return nil } -// Refresh IAMSys. If an object layer is passed in use that, otherwise load from global. -func (iamOS *IAMObjectStore) loadAll(ctx context.Context, sys *IAMSys) error { - return sys.Load(ctx, iamOS) -} - func (iamOS *IAMObjectStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error { return iamOS.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName)) } @@ -463,13 +457,3 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri return ch } - -func (iamOS *IAMObjectStore) watch(ctx context.Context, sys *IAMSys) { - // Refresh IAMSys. - for { - time.Sleep(globalRefreshIAMInterval) - if err := iamOS.loadAll(ctx, sys); err != nil { - logger.LogIf(ctx, err) - } - } -} diff --git a/cmd/iam.go b/cmd/iam.go index c267ba615..50576a4f8 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "math/rand" + "path" "strings" "sync" "time" @@ -240,6 +241,11 @@ type options struct { ttl int64 //expiry in seconds } +type iamWatchEvent struct { + isCreated bool // !isCreated implies a delete event. + keyPath string +} + // IAMStorageAPI defines an interface for the IAM persistence layer type IAMStorageAPI interface { lock() @@ -262,8 +268,6 @@ type IAMStorageAPI interface { loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error - loadAll(context.Context, *IAMSys) error - saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error loadIAMConfig(ctx context.Context, item interface{}, path string) error deleteIAMConfig(ctx context.Context, path string) error @@ -277,8 +281,12 @@ type IAMStorageAPI interface { deleteMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool) error deleteUserIdentity(ctx context.Context, name string, userType IAMUserType) error deleteGroupInfo(ctx context.Context, name string) error +} - watch(context.Context, *IAMSys) +// iamStorageWatcher is implemented by `IAMStorageAPI` implementers that +// additionally support watching storage for changes. +type iamStorageWatcher interface { + watch(ctx context.Context, keyPath string) <-chan iamWatchEvent } // LoadGroup - loads a specific group from storage, and updates the @@ -642,7 +650,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { } for { - if err := sys.store.loadAll(retryCtx, sys); err != nil { + if err := sys.Load(retryCtx, sys.store); err != nil { if configRetriableErrors(err) { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) @@ -674,7 +682,146 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { }() } - go sys.store.watch(ctx, sys) + go sys.watch(ctx) +} + +func (sys *IAMSys) watch(ctx context.Context) { + watcher, ok := sys.store.(iamStorageWatcher) + if ok { + ch := watcher.watch(ctx, iamConfigPrefix) + for event := range ch { + // we simply log errors + err := sys.loadWatchedEvent(ctx, event) + logger.LogIf(ctx, err) + } + + } else { + // Fall back to loading all items + for { + time.Sleep(globalRefreshIAMInterval) + if err := sys.Load(ctx, sys.store); err != nil { + logger.LogIf(ctx, err) + } + } + } +} + +func (sys *IAMSys) loadWatchedEvent(outerCtx context.Context, event iamWatchEvent) (err error) { + usersPrefix := strings.HasPrefix(event.keyPath, iamConfigUsersPrefix) + groupsPrefix := strings.HasPrefix(event.keyPath, iamConfigGroupsPrefix) + stsPrefix := strings.HasPrefix(event.keyPath, iamConfigSTSPrefix) + svcPrefix := strings.HasPrefix(event.keyPath, iamConfigServiceAccountsPrefix) + policyPrefix := strings.HasPrefix(event.keyPath, iamConfigPoliciesPrefix) + policyDBUsersPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix) + policyDBSTSUsersPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix) + policyDBGroupsPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix) + + ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) + defer cancel() + + // We need to read from storage and write to in-memory map, so we need + // only a read lock on storage, however in some cases we modify storage + // too (e.g. when credentials from storage are expired, we delete them), + // so we take write locks for both. + sys.Lock() + defer sys.Unlock() + + sys.store.lock() + defer sys.store.unlock() + + if event.isCreated { + switch { + case usersPrefix: + accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigUsersPrefix)) + err = sys.store.loadUser(ctx, accessKey, regUser, sys.iamUsersMap) + case stsPrefix: + accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigSTSPrefix)) + err = sys.store.loadUser(ctx, accessKey, stsUser, sys.iamUsersMap) + if err == nil { + // We need to update the policy map for the + // parent below, so we retrieve the credentials + // just added. + creds, ok := sys.iamUsersMap[accessKey] + if !ok { + // This could happen, if the credential + // being loaded has expired. + break + } + + // We are on purpose not persisting the policy map for parent + // user, although this is a hack, it is a good enough hack + // at this point in time - we need to overhaul our OIDC + // usage with service accounts with a more cleaner implementation + // + // This mapping is necessary to ensure that valid credentials + // have necessary ParentUser present - this is mainly for only + // webIdentity based STS tokens. + parentAccessKey := creds.ParentUser + if parentAccessKey != "" && parentAccessKey != globalActiveCred.AccessKey { + if _, ok := sys.iamUserPolicyMap[parentAccessKey]; !ok { + sys.iamUserPolicyMap[parentAccessKey] = sys.iamUserPolicyMap[accessKey] + } + } + } + case svcPrefix: + accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigServiceAccountsPrefix)) + err = sys.store.loadUser(ctx, accessKey, svcUser, sys.iamUsersMap) + case groupsPrefix: + group := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigGroupsPrefix)) + err = sys.store.loadGroup(ctx, group, sys.iamGroupsMap) + if err == nil { + gi := sys.iamGroupsMap[group] + sys.removeGroupFromMembershipsMap(group) + sys.updateGroupMembershipsMap(group, &gi) + } + case policyPrefix: + policyName := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigPoliciesPrefix)) + err = sys.store.loadPolicyDoc(ctx, policyName, sys.iamPolicyDocsMap) + case policyDBUsersPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + err = sys.store.loadMappedPolicy(ctx, user, regUser, false, sys.iamUserPolicyMap) + case policyDBSTSUsersPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + err = sys.store.loadMappedPolicy(ctx, user, stsUser, false, sys.iamUserPolicyMap) + case policyDBGroupsPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + err = sys.store.loadMappedPolicy(ctx, user, regUser, true, sys.iamGroupPolicyMap) + } + } else { + // delete event + switch { + case usersPrefix: + accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigUsersPrefix)) + delete(sys.iamUsersMap, accessKey) + case stsPrefix: + accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigSTSPrefix)) + delete(sys.iamUsersMap, accessKey) + case groupsPrefix: + group := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigGroupsPrefix)) + sys.removeGroupFromMembershipsMap(group) + delete(sys.iamGroupsMap, group) + delete(sys.iamGroupPolicyMap, group) + case policyPrefix: + policyName := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigPoliciesPrefix)) + delete(sys.iamPolicyDocsMap, policyName) + case policyDBUsersPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + delete(sys.iamUserPolicyMap, user) + case policyDBSTSUsersPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + delete(sys.iamUserPolicyMap, user) + case policyDBGroupsPrefix: + policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + delete(sys.iamGroupPolicyMap, user) + } + } + return err } // DeletePolicy - deletes a canned policy from backend or etcd.