mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
cache IAM policy engines
This commit is contained in:
@@ -2,6 +2,7 @@ package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -49,12 +50,14 @@ type IdentityAccessManagement struct {
|
||||
accessKeyIdent map[string]*Identity
|
||||
nameToIdentity map[string]*Identity // O(1) lookup by identity name
|
||||
policies map[string]*iam_pb.Policy
|
||||
iamPolicyCache map[iamPolicyCacheKey]*policy_engine.PolicyEngine
|
||||
accounts map[string]*Account
|
||||
emailAccount map[string]*Account
|
||||
hashes map[string]*sync.Pool
|
||||
hashCounters map[string]*int32
|
||||
identityAnonymous *Identity
|
||||
hashMu sync.RWMutex
|
||||
policyCacheMu sync.RWMutex
|
||||
domain string
|
||||
externalHost string // pre-computed host for S3 signature verification (from ExternalUrl)
|
||||
isAuthEnabled bool
|
||||
@@ -132,6 +135,11 @@ type Credential struct {
|
||||
Expiration int64 // Unix timestamp when credential expires (0 = no expiration)
|
||||
}
|
||||
|
||||
type iamPolicyCacheKey struct {
|
||||
name string
|
||||
contentHash [32]byte
|
||||
}
|
||||
|
||||
// isCredentialExpired checks if a credential has expired
|
||||
func (c *Credential) isCredentialExpired() bool {
|
||||
return c.Expiration > 0 && c.Expiration < time.Now().Unix()
|
||||
@@ -206,11 +214,12 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, filerClient
|
||||
}
|
||||
|
||||
iam := &IdentityAccessManagement{
|
||||
domain: option.DomainName,
|
||||
externalHost: externalHost,
|
||||
hashes: make(map[string]*sync.Pool),
|
||||
hashCounters: make(map[string]*int32),
|
||||
filerClient: filerClient,
|
||||
domain: option.DomainName,
|
||||
externalHost: externalHost,
|
||||
hashes: make(map[string]*sync.Pool),
|
||||
hashCounters: make(map[string]*int32),
|
||||
iamPolicyCache: make(map[iamPolicyCacheKey]*policy_engine.PolicyEngine),
|
||||
filerClient: filerClient,
|
||||
}
|
||||
|
||||
// Always initialize credential manager with fallback to defaults
|
||||
@@ -684,6 +693,7 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3
|
||||
// Once enabled, keep it enabled (one-way toggle)
|
||||
authJustEnabled := iam.updateAuthenticationState(len(iam.identities))
|
||||
iam.m.Unlock()
|
||||
iam.clearIAMPolicyCache()
|
||||
|
||||
if authJustEnabled {
|
||||
glog.V(1).Infof("S3 authentication enabled - credentials were added dynamically")
|
||||
@@ -919,6 +929,7 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap
|
||||
// Once enabled, keep it enabled (one-way toggle)
|
||||
authJustEnabled := iam.updateAuthenticationState(len(identities))
|
||||
iam.m.Unlock()
|
||||
iam.clearIAMPolicyCache()
|
||||
|
||||
if authJustEnabled {
|
||||
glog.V(1).Infof("S3 authentication enabled because credentials were added dynamically")
|
||||
@@ -1659,6 +1670,58 @@ func determineIAMAuthPath(sessionToken, principal, principalArn string) iamAuthP
|
||||
return iamAuthPathNone
|
||||
}
|
||||
|
||||
func newIAMPolicyCacheKey(name, content string) iamPolicyCacheKey {
|
||||
return iamPolicyCacheKey{
|
||||
name: name,
|
||||
contentHash: sha256.Sum256([]byte(content)),
|
||||
}
|
||||
}
|
||||
|
||||
func (iam *IdentityAccessManagement) getOrCreateIAMPolicyEngine(policyName, content string) (*policy_engine.PolicyEngine, error) {
|
||||
cacheKey := newIAMPolicyCacheKey(policyName, content)
|
||||
|
||||
iam.policyCacheMu.RLock()
|
||||
if engine, ok := iam.iamPolicyCache[cacheKey]; ok {
|
||||
iam.policyCacheMu.RUnlock()
|
||||
return engine, nil
|
||||
}
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
engine := policy_engine.NewPolicyEngine()
|
||||
if err := engine.SetBucketPolicy(policyName, content); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
iam.policyCacheMu.Lock()
|
||||
defer iam.policyCacheMu.Unlock()
|
||||
|
||||
if engine, ok := iam.iamPolicyCache[cacheKey]; ok {
|
||||
return engine, nil
|
||||
}
|
||||
if iam.iamPolicyCache == nil {
|
||||
iam.iamPolicyCache = make(map[iamPolicyCacheKey]*policy_engine.PolicyEngine)
|
||||
}
|
||||
iam.iamPolicyCache[cacheKey] = engine
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
func (iam *IdentityAccessManagement) invalidateIAMPolicyCache(policyName string) {
|
||||
iam.policyCacheMu.Lock()
|
||||
defer iam.policyCacheMu.Unlock()
|
||||
|
||||
for cacheKey := range iam.iamPolicyCache {
|
||||
if cacheKey.name == policyName {
|
||||
delete(iam.iamPolicyCache, cacheKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (iam *IdentityAccessManagement) clearIAMPolicyCache() {
|
||||
iam.policyCacheMu.Lock()
|
||||
defer iam.policyCacheMu.Unlock()
|
||||
iam.iamPolicyCache = make(map[iamPolicyCacheKey]*policy_engine.PolicyEngine)
|
||||
}
|
||||
|
||||
// evaluateIAMPolicies evaluates attached IAM policies for a user identity.
|
||||
// Returns true if any matching statement explicitly allows the action.
|
||||
func (iam *IdentityAccessManagement) evaluateIAMPolicies(r *http.Request, identity *Identity, action Action, bucket, object string) bool {
|
||||
@@ -1681,8 +1744,8 @@ func (iam *IdentityAccessManagement) evaluateIAMPolicies(r *http.Request, identi
|
||||
continue
|
||||
}
|
||||
|
||||
engine := policy_engine.NewPolicyEngine()
|
||||
if err := engine.SetBucketPolicy(policyName, policy.Content); err != nil {
|
||||
engine, err := iam.getOrCreateIAMPolicyEngine(policyName, policy.Content)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1803,11 +1866,13 @@ func (iam *IdentityAccessManagement) authorizeWithIAM(r *http.Request, identity
|
||||
// PutPolicy adds or updates a policy
|
||||
func (iam *IdentityAccessManagement) PutPolicy(name string, content string) error {
|
||||
iam.m.Lock()
|
||||
defer iam.m.Unlock()
|
||||
if iam.policies == nil {
|
||||
iam.policies = make(map[string]*iam_pb.Policy)
|
||||
}
|
||||
iam.policies[name] = &iam_pb.Policy{Name: name, Content: content}
|
||||
iam.m.Unlock()
|
||||
|
||||
iam.invalidateIAMPolicyCache(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1824,8 +1889,10 @@ func (iam *IdentityAccessManagement) GetPolicy(name string) (*iam_pb.Policy, err
|
||||
// DeletePolicy removes a policy
|
||||
func (iam *IdentityAccessManagement) DeletePolicy(name string) error {
|
||||
iam.m.Lock()
|
||||
defer iam.m.Unlock()
|
||||
delete(iam.policies, name)
|
||||
iam.m.Unlock()
|
||||
|
||||
iam.invalidateIAMPolicyCache(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -387,6 +387,136 @@ func TestVerifyActionPermissionPolicyFallback(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestVerifyActionPermissionPolicyFallbackCachesParsedPolicies(t *testing.T) {
|
||||
buildRequest := func(t *testing.T, method string) *http.Request {
|
||||
t.Helper()
|
||||
req, err := http.NewRequest(method, "http://s3.amazonaws.com/test-bucket/test-object", nil)
|
||||
assert.NoError(t, err)
|
||||
return req
|
||||
}
|
||||
|
||||
iam := &IdentityAccessManagement{}
|
||||
content := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`
|
||||
assert.NoError(t, iam.PutPolicy("allowGet", content))
|
||||
|
||||
identity := &Identity{
|
||||
Name: "policy-user",
|
||||
Account: &AccountAdmin,
|
||||
PolicyNames: []string{"allowGet"},
|
||||
}
|
||||
|
||||
errCode := iam.VerifyActionPermission(buildRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object")
|
||||
assert.Equal(t, s3err.ErrNone, errCode)
|
||||
|
||||
cacheKey := newIAMPolicyCacheKey("allowGet", content)
|
||||
iam.policyCacheMu.RLock()
|
||||
firstEngine := iam.iamPolicyCache[cacheKey]
|
||||
firstCacheSize := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.NotNil(t, firstEngine)
|
||||
assert.Equal(t, 1, firstCacheSize)
|
||||
|
||||
errCode = iam.VerifyActionPermission(buildRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object")
|
||||
assert.Equal(t, s3err.ErrNone, errCode)
|
||||
|
||||
iam.policyCacheMu.RLock()
|
||||
secondEngine := iam.iamPolicyCache[cacheKey]
|
||||
secondCacheSize := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.Same(t, firstEngine, secondEngine)
|
||||
assert.Equal(t, 1, secondCacheSize)
|
||||
}
|
||||
|
||||
func TestPutPolicyInvalidatesCachedPolicyEngine(t *testing.T) {
|
||||
buildRequest := func(t *testing.T, method string) *http.Request {
|
||||
t.Helper()
|
||||
req, err := http.NewRequest(method, "http://s3.amazonaws.com/test-bucket/test-object", nil)
|
||||
assert.NoError(t, err)
|
||||
return req
|
||||
}
|
||||
|
||||
iam := &IdentityAccessManagement{}
|
||||
initialContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`
|
||||
assert.NoError(t, iam.PutPolicy("allowGet", initialContent))
|
||||
|
||||
identity := &Identity{
|
||||
Name: "policy-user",
|
||||
Account: &AccountAdmin,
|
||||
PolicyNames: []string{"allowGet"},
|
||||
}
|
||||
|
||||
errCode := iam.VerifyActionPermission(buildRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object")
|
||||
assert.Equal(t, s3err.ErrNone, errCode)
|
||||
|
||||
initialCacheKey := newIAMPolicyCacheKey("allowGet", initialContent)
|
||||
iam.policyCacheMu.RLock()
|
||||
initialEngine := iam.iamPolicyCache[initialCacheKey]
|
||||
initialCacheSize := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.NotNil(t, initialEngine)
|
||||
assert.Equal(t, 1, initialCacheSize)
|
||||
|
||||
updatedContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::other-bucket/*"}]}`
|
||||
assert.NoError(t, iam.PutPolicy("allowGet", updatedContent))
|
||||
|
||||
iam.policyCacheMu.RLock()
|
||||
_, oldEntryStillCached := iam.iamPolicyCache[initialCacheKey]
|
||||
cacheSizeAfterUpdate := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.False(t, oldEntryStillCached)
|
||||
assert.Equal(t, 0, cacheSizeAfterUpdate)
|
||||
|
||||
errCode = iam.VerifyActionPermission(buildRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object")
|
||||
assert.Equal(t, s3err.ErrAccessDenied, errCode)
|
||||
|
||||
updatedCacheKey := newIAMPolicyCacheKey("allowGet", updatedContent)
|
||||
iam.policyCacheMu.RLock()
|
||||
updatedEngine := iam.iamPolicyCache[updatedCacheKey]
|
||||
updatedCacheSize := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.NotNil(t, updatedEngine)
|
||||
assert.NotSame(t, initialEngine, updatedEngine)
|
||||
assert.Equal(t, 1, updatedCacheSize)
|
||||
}
|
||||
|
||||
func TestReplaceS3ApiConfigurationClearsCachedPolicyEngines(t *testing.T) {
|
||||
t.Setenv("AWS_ACCESS_KEY_ID", "")
|
||||
t.Setenv("AWS_SECRET_ACCESS_KEY", "")
|
||||
|
||||
iam := &IdentityAccessManagement{}
|
||||
content := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`
|
||||
_, err := iam.getOrCreateIAMPolicyEngine("allowGet", content)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cacheKey := newIAMPolicyCacheKey("allowGet", content)
|
||||
iam.policyCacheMu.RLock()
|
||||
_, existsBeforeReload := iam.iamPolicyCache[cacheKey]
|
||||
iam.policyCacheMu.RUnlock()
|
||||
assert.True(t, existsBeforeReload)
|
||||
|
||||
config := &iam_pb.S3ApiConfiguration{
|
||||
Policies: []*iam_pb.Policy{
|
||||
{
|
||||
Name: "replacement",
|
||||
Content: `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:ListBucket","Resource":"arn:aws:s3:::test-bucket"}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.NoError(t, iam.ReplaceS3ApiConfiguration(config))
|
||||
|
||||
iam.policyCacheMu.RLock()
|
||||
cacheSizeAfterReload := len(iam.iamPolicyCache)
|
||||
iam.policyCacheMu.RUnlock()
|
||||
|
||||
assert.Equal(t, 0, cacheSizeAfterReload)
|
||||
}
|
||||
|
||||
type LoadS3ApiConfigurationTestCase struct {
|
||||
pbAccount *iam_pb.Account
|
||||
pbIdent *iam_pb.Identity
|
||||
|
||||
Reference in New Issue
Block a user