mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-29 13:10:21 +00:00
iam: honor configured credential store for IAM API policies and propagate to S3 caches (fixes #9518) (#9522)
* iamapi: route managed policies through credential manager (fixes #9518) CreatePolicy via the IAM API wrote straight to the filer /etc/iam/policies.json, ignoring any non-filer credential store. When credential.postgres was configured, policies created via the IAM API landed only in the filer while the Admin UI wrote to postgres, producing a split-brain where ListPolicies/GetPolicy never saw the Admin UI's policies and vice versa. GetPolicies/PutPolicies on IamS3ApiConfigure now load managed policies from credentialManager and persist Create/Update/Delete as a delta against the store. Inline user/group policies still live in the legacy policies.json file (no credential-store API for them yet). Pre-existing managed policies in the legacy file are merged on read so deployments don't lose data, and re-persisted to the store on the next write so the legacy file is drained over time. * credential: route IAM API inline policies through credential manager Extends the #9518 fix to user-inline and group-inline policies so the IAM API never writes the legacy /etc/iam/policies.json bundle directly. The previous patch only routed managed policies; this one finishes the job for the other two policy types. - Add GroupInlinePolicyStore + GroupInlinePoliciesLoader optional interfaces, mirroring the existing user-inline ones, and matching Put/Get/Delete/List/LoadAll wrappers on CredentialManager. - Implement group-inline storage in memory (new map), filer_etc (new field on PoliciesCollection, reusing the legacy file under policyMu), and postgres (new group_inline_policies table with ON DELETE CASCADE off the groups FK). - Wire the new methods through PropagatingCredentialStore so wrapped stores still delegate correctly. - IamS3ApiConfigure.PutPolicies now applies managed + user-inline + group-inline as deltas through the credential manager; the legacy /etc/iam/policies.json file is never written when a credential manager is wired up. GetPolicies still reads the legacy bundle once as a fallback so unmigrated data is picked up and re-persisted into the store on the next write. * credential: propagate SaveConfiguration writes to running S3 caches Postgres (and any non-filer) credential stores never fired the S3 IAM cache invalidation path on bulk identity / group updates. The PropagatingCredentialStore had explicit Put/Remove handlers for single-entity calls (CreateUser, PutPolicy, etc.) but inherited SaveConfiguration unchanged from the embedded store, so the bulk path the IAM API takes at the end of every handler was silent. Inline-policy changes recompute identity.Actions and persist via SaveConfiguration, so until restart the cached Actions on each S3 server stayed stale and authorization decisions used the pre-change view. Override SaveConfiguration to snapshot the prior user / group lists, delegate the save, then fan out PutIdentity / PutGroup for what's in the new config and RemoveIdentity / RemoveGroup for what got pruned. Reuses the existing SeaweedS3IamCache RPCs, no protobuf changes. * iamapi: drain legacy policies.json after authoritative credential-store writes Review pointed out a resurrection bug: GetPolicies still reads /etc/iam/policies.json as a one-way migration fallback, but PutPolicies in the credential-manager path never wrote that file, so legacy-only entries reappeared on the next read even after the IAM API "deleted" them. PutPolicies now overwrites the bundle with an empty {} after a successful credential-store write, unless the store is filer_etc (which owns the bundle as its own inline-policy backing — clearing it would wipe filer_etc's data). Also wraps the filer read, JSON unmarshal, and marshal errors with context per the other review comments.
This commit is contained in:
@@ -305,6 +305,60 @@ func (cm *CredentialManager) ListUserInlinePolicies(ctx context.Context, userNam
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// LoadAllInlinePolicies returns every user inline policy in a single map
|
||||
// keyed by username then policy name. Returns nil, nil if the underlying
|
||||
// store does not implement bulk loading.
|
||||
func (cm *CredentialManager) LoadAllInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
if loader, ok := cm.Store.(InlinePoliciesLoader); ok {
|
||||
return loader.LoadInlinePolicies(ctx)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// PutGroupInlinePolicy stores a per-group inline policy document.
|
||||
// Returns nil without error if the underlying store does not support
|
||||
// group inline policies.
|
||||
func (cm *CredentialManager) PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error {
|
||||
if store, ok := cm.Store.(GroupInlinePolicyStore); ok {
|
||||
return store.PutGroupInlinePolicy(ctx, groupName, policyName, document)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGroupInlinePolicy retrieves a per-group inline policy document.
|
||||
func (cm *CredentialManager) GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error) {
|
||||
if store, ok := cm.Store.(GroupInlinePolicyStore); ok {
|
||||
return store.GetGroupInlinePolicy(ctx, groupName, policyName)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// DeleteGroupInlinePolicy removes a per-group inline policy document.
|
||||
func (cm *CredentialManager) DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error {
|
||||
if store, ok := cm.Store.(GroupInlinePolicyStore); ok {
|
||||
return store.DeleteGroupInlinePolicy(ctx, groupName, policyName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListGroupInlinePolicies returns the names of all inline policies for a group.
|
||||
func (cm *CredentialManager) ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error) {
|
||||
if store, ok := cm.Store.(GroupInlinePolicyStore); ok {
|
||||
return store.ListGroupInlinePolicies(ctx, groupName)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// LoadAllGroupInlinePolicies returns every group inline policy in a single
|
||||
// map keyed by group name then policy name. Returns nil, nil if the
|
||||
// underlying store does not implement bulk loading.
|
||||
func (cm *CredentialManager) LoadAllGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
if loader, ok := cm.Store.(GroupInlinePoliciesLoader); ok {
|
||||
return loader.LoadGroupInlinePolicies(ctx)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// RenameUser atomically renames a user along with all FK-backed
|
||||
// dependents (credentials, inline policies, ...) when the underlying
|
||||
// store implements UserRenamer. Returns false if the store does not
|
||||
|
||||
@@ -148,6 +148,29 @@ type InlinePolicyStore interface {
|
||||
ListUserInlinePolicies(ctx context.Context, userName string) ([]string, error)
|
||||
}
|
||||
|
||||
// GroupInlinePolicyStore is an optional interface for credential stores that
|
||||
// support per-group inline policy storage. Mirrors InlinePolicyStore so the
|
||||
// IAM API can persist group-attached inline policies without writing a
|
||||
// shared filer file.
|
||||
type GroupInlinePolicyStore interface {
|
||||
PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error
|
||||
GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error)
|
||||
DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error
|
||||
ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error)
|
||||
}
|
||||
|
||||
// InlinePoliciesLoader is an optional interface for credential stores that can
|
||||
// bulk-load every user inline policy in a single round trip.
|
||||
type InlinePoliciesLoader interface {
|
||||
LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error)
|
||||
}
|
||||
|
||||
// GroupInlinePoliciesLoader is an optional interface for credential stores
|
||||
// that can bulk-load every group inline policy in a single round trip.
|
||||
type GroupInlinePoliciesLoader interface {
|
||||
LoadGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error)
|
||||
}
|
||||
|
||||
// UserRenamer is an optional interface for credential stores that can
|
||||
// atomically rename a user along with all rows that reference the old
|
||||
// username (credentials, inline policies, etc). Backends with referential
|
||||
|
||||
@@ -19,8 +19,9 @@ const (
|
||||
)
|
||||
|
||||
type PoliciesCollection struct {
|
||||
Policies map[string]policy_engine.PolicyDocument `json:"policies"`
|
||||
InlinePolicies map[string]map[string]policy_engine.PolicyDocument `json:"inlinePolicies"`
|
||||
Policies map[string]policy_engine.PolicyDocument `json:"policies"`
|
||||
InlinePolicies map[string]map[string]policy_engine.PolicyDocument `json:"inlinePolicies"`
|
||||
GroupInlinePolicies map[string]map[string]policy_engine.PolicyDocument `json:"groupInlinePolicies,omitempty"`
|
||||
}
|
||||
|
||||
func validatePolicyName(name string) error {
|
||||
@@ -29,8 +30,9 @@ func validatePolicyName(name string) error {
|
||||
|
||||
func newPoliciesCollection() *PoliciesCollection {
|
||||
return &PoliciesCollection{
|
||||
Policies: make(map[string]policy_engine.PolicyDocument),
|
||||
InlinePolicies: make(map[string]map[string]policy_engine.PolicyDocument),
|
||||
Policies: make(map[string]policy_engine.PolicyDocument),
|
||||
InlinePolicies: make(map[string]map[string]policy_engine.PolicyDocument),
|
||||
GroupInlinePolicies: make(map[string]map[string]policy_engine.PolicyDocument),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +56,9 @@ func (store *FilerEtcStore) loadLegacyPoliciesCollection(ctx context.Context) (*
|
||||
if policiesCollection.InlinePolicies == nil {
|
||||
policiesCollection.InlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
if policiesCollection.GroupInlinePolicies == nil {
|
||||
policiesCollection.GroupInlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
|
||||
return policiesCollection, true, nil
|
||||
}
|
||||
@@ -379,6 +384,87 @@ func (store *FilerEtcStore) ListUserInlinePolicies(ctx context.Context, userName
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// PutGroupInlinePolicy stores a per-group inline policy document.
|
||||
func (store *FilerEtcStore) PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error {
|
||||
store.policyMu.Lock()
|
||||
defer store.policyMu.Unlock()
|
||||
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if policiesCollection.GroupInlinePolicies[groupName] == nil {
|
||||
policiesCollection.GroupInlinePolicies[groupName] = make(map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
policiesCollection.GroupInlinePolicies[groupName][policyName] = document
|
||||
return store.saveLegacyPoliciesCollection(ctx, policiesCollection)
|
||||
}
|
||||
|
||||
// GetGroupInlinePolicy retrieves a per-group inline policy document.
|
||||
func (store *FilerEtcStore) GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error) {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if groupPolicies := policiesCollection.GroupInlinePolicies[groupName]; groupPolicies != nil {
|
||||
if doc, exists := groupPolicies[policyName]; exists {
|
||||
return &doc, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// DeleteGroupInlinePolicy removes a per-group inline policy document.
|
||||
func (store *FilerEtcStore) DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error {
|
||||
store.policyMu.Lock()
|
||||
defer store.policyMu.Unlock()
|
||||
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if groupPolicies := policiesCollection.GroupInlinePolicies[groupName]; groupPolicies != nil {
|
||||
delete(groupPolicies, policyName)
|
||||
if len(groupPolicies) == 0 {
|
||||
delete(policiesCollection.GroupInlinePolicies, groupName)
|
||||
}
|
||||
}
|
||||
return store.saveLegacyPoliciesCollection(ctx, policiesCollection)
|
||||
}
|
||||
|
||||
// ListGroupInlinePolicies returns the names of all inline policies for a group.
|
||||
func (store *FilerEtcStore) ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error) {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupPolicies := policiesCollection.GroupInlinePolicies[groupName]
|
||||
names := make([]string, 0, len(groupPolicies))
|
||||
for name := range groupPolicies {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// LoadGroupInlinePolicies returns all group inline policies stored in the
|
||||
// legacy file keyed by group name then policy name.
|
||||
func (store *FilerEtcStore) LoadGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make(map[string]map[string]policy_engine.PolicyDocument, len(policiesCollection.GroupInlinePolicies))
|
||||
for groupName, groupPolicies := range policiesCollection.GroupInlinePolicies {
|
||||
copied := make(map[string]policy_engine.PolicyDocument, len(groupPolicies))
|
||||
for policyName, doc := range groupPolicies {
|
||||
copied[policyName] = doc
|
||||
}
|
||||
result[groupName] = copied
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListPolicyNames returns all managed policy names stored in the filer.
|
||||
func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, error) {
|
||||
names := make([]string, 0)
|
||||
|
||||
@@ -193,3 +193,91 @@ func (store *MemoryStore) LoadInlinePolicies(ctx context.Context) (map[string]ma
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PutGroupInlinePolicy stores a per-group inline policy document.
|
||||
func (store *MemoryStore) PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
if !store.initialized {
|
||||
return fmt.Errorf("store not initialized")
|
||||
}
|
||||
|
||||
if store.groupInlinePolicies[groupName] == nil {
|
||||
store.groupInlinePolicies[groupName] = make(map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
store.groupInlinePolicies[groupName][policyName] = document
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGroupInlinePolicy retrieves a per-group inline policy document.
|
||||
func (store *MemoryStore) GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error) {
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
|
||||
if !store.initialized {
|
||||
return nil, fmt.Errorf("store not initialized")
|
||||
}
|
||||
|
||||
if groupPolicies := store.groupInlinePolicies[groupName]; groupPolicies != nil {
|
||||
if doc, exists := groupPolicies[policyName]; exists {
|
||||
return &doc, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// DeleteGroupInlinePolicy removes a per-group inline policy document.
|
||||
func (store *MemoryStore) DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error {
|
||||
store.mu.Lock()
|
||||
defer store.mu.Unlock()
|
||||
|
||||
if !store.initialized {
|
||||
return fmt.Errorf("store not initialized")
|
||||
}
|
||||
|
||||
if groupPolicies := store.groupInlinePolicies[groupName]; groupPolicies != nil {
|
||||
delete(groupPolicies, policyName)
|
||||
if len(groupPolicies) == 0 {
|
||||
delete(store.groupInlinePolicies, groupName)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListGroupInlinePolicies returns the names of all inline policies for a group.
|
||||
func (store *MemoryStore) ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error) {
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
|
||||
if !store.initialized {
|
||||
return nil, fmt.Errorf("store not initialized")
|
||||
}
|
||||
|
||||
groupPolicies := store.groupInlinePolicies[groupName]
|
||||
names := make([]string, 0, len(groupPolicies))
|
||||
for name := range groupPolicies {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
// LoadGroupInlinePolicies returns all group inline policies keyed by group name then policy name.
|
||||
func (store *MemoryStore) LoadGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
store.mu.RLock()
|
||||
defer store.mu.RUnlock()
|
||||
|
||||
if !store.initialized {
|
||||
return nil, fmt.Errorf("store not initialized")
|
||||
}
|
||||
|
||||
result := make(map[string]map[string]policy_engine.PolicyDocument, len(store.groupInlinePolicies))
|
||||
for groupName, groupPolicies := range store.groupInlinePolicies {
|
||||
copied := make(map[string]policy_engine.PolicyDocument, len(groupPolicies))
|
||||
for policyName, doc := range groupPolicies {
|
||||
copied[policyName] = doc
|
||||
}
|
||||
result[groupName] = copied
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ type MemoryStore struct {
|
||||
serviceAccountAccessKeys map[string]string // access_key -> id
|
||||
policies map[string]policy_engine.PolicyDocument // policy_name -> policy_document
|
||||
inlinePolicies map[string]map[string]policy_engine.PolicyDocument // username -> policy_name -> document
|
||||
groupInlinePolicies map[string]map[string]policy_engine.PolicyDocument // group_name -> policy_name -> document
|
||||
groups map[string]*iam_pb.Group // group_name -> group
|
||||
initialized bool
|
||||
}
|
||||
@@ -46,6 +47,7 @@ func (store *MemoryStore) Initialize(configuration util.Configuration, prefix st
|
||||
store.serviceAccountAccessKeys = make(map[string]string)
|
||||
store.policies = make(map[string]policy_engine.PolicyDocument)
|
||||
store.inlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
store.groupInlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
store.groups = make(map[string]*iam_pb.Group)
|
||||
store.initialized = true
|
||||
|
||||
@@ -62,6 +64,7 @@ func (store *MemoryStore) Shutdown() {
|
||||
store.serviceAccountAccessKeys = nil
|
||||
store.policies = nil
|
||||
store.inlinePolicies = nil
|
||||
store.groupInlinePolicies = nil
|
||||
store.groups = nil
|
||||
store.initialized = false
|
||||
}
|
||||
@@ -78,6 +81,7 @@ func (store *MemoryStore) Reset() {
|
||||
store.serviceAccountAccessKeys = make(map[string]string)
|
||||
store.policies = make(map[string]policy_engine.PolicyDocument)
|
||||
store.inlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
store.groupInlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
store.groups = make(map[string]*iam_pb.Group)
|
||||
}
|
||||
}
|
||||
|
||||
149
weed/credential/postgres/postgres_group_inline_policy.go
Normal file
149
weed/credential/postgres/postgres_group_inline_policy.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
|
||||
)
|
||||
|
||||
func (store *PostgresStore) PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error {
|
||||
if !store.configured {
|
||||
return fmt.Errorf("store not configured")
|
||||
}
|
||||
|
||||
docJSON, err := json.Marshal(document)
|
||||
if err != nil {
|
||||
glog.Errorf("credential postgres: PutGroupInlinePolicy marshal failed group=%s policy=%s: %v", groupName, policyName, err)
|
||||
return fmt.Errorf("failed to marshal policy document: %w", err)
|
||||
}
|
||||
|
||||
_, err = store.db.ExecContext(ctx,
|
||||
`INSERT INTO group_inline_policies (group_name, policy_name, document)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (group_name, policy_name)
|
||||
DO UPDATE SET document = $3, updated_at = CURRENT_TIMESTAMP`,
|
||||
groupName, policyName, jsonbParam(docJSON))
|
||||
if err != nil {
|
||||
glog.Errorf("credential postgres: PutGroupInlinePolicy failed group=%s policy=%s: %v", groupName, policyName, err)
|
||||
return fmt.Errorf("failed to upsert group inline policy: %w", err)
|
||||
}
|
||||
|
||||
glog.V(0).Infof("credential postgres: PutGroupInlinePolicy group=%s policy=%s", groupName, policyName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresStore) GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error) {
|
||||
if !store.configured {
|
||||
return nil, fmt.Errorf("store not configured")
|
||||
}
|
||||
|
||||
var docJSON []byte
|
||||
err := store.db.QueryRowContext(ctx,
|
||||
"SELECT document FROM group_inline_policies WHERE group_name = $1 AND policy_name = $2",
|
||||
groupName, policyName).Scan(&docJSON)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
glog.Errorf("credential postgres: GetGroupInlinePolicy query failed group=%s policy=%s: %v", groupName, policyName, err)
|
||||
return nil, fmt.Errorf("failed to query group inline policy: %w", err)
|
||||
}
|
||||
|
||||
var doc policy_engine.PolicyDocument
|
||||
if err := json.Unmarshal(docJSON, &doc); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal group inline policy: %w", err)
|
||||
}
|
||||
return &doc, nil
|
||||
}
|
||||
|
||||
func (store *PostgresStore) DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error {
|
||||
if !store.configured {
|
||||
return fmt.Errorf("store not configured")
|
||||
}
|
||||
|
||||
result, err := store.db.ExecContext(ctx,
|
||||
"DELETE FROM group_inline_policies WHERE group_name = $1 AND policy_name = $2",
|
||||
groupName, policyName)
|
||||
if err != nil {
|
||||
glog.Errorf("credential postgres: DeleteGroupInlinePolicy failed group=%s policy=%s: %v", groupName, policyName, err)
|
||||
return fmt.Errorf("failed to delete group inline policy: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
glog.V(0).Infof("credential postgres: DeleteGroupInlinePolicy group=%s policy=%s deleted=%d", groupName, policyName, rowsAffected)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *PostgresStore) ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error) {
|
||||
if !store.configured {
|
||||
return nil, fmt.Errorf("store not configured")
|
||||
}
|
||||
|
||||
rows, err := store.db.QueryContext(ctx,
|
||||
"SELECT policy_name FROM group_inline_policies WHERE group_name = $1 ORDER BY policy_name",
|
||||
groupName)
|
||||
if err != nil {
|
||||
glog.Errorf("credential postgres: ListGroupInlinePolicies query failed group=%s: %v", groupName, err)
|
||||
return nil, fmt.Errorf("failed to query group inline policies: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var names []string
|
||||
for rows.Next() {
|
||||
var name string
|
||||
if err := rows.Scan(&name); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan policy name: %w", err)
|
||||
}
|
||||
names = append(names, name)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed iterating group inline policy rows: %w", err)
|
||||
}
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (store *PostgresStore) LoadGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
if !store.configured {
|
||||
return nil, fmt.Errorf("store not configured")
|
||||
}
|
||||
|
||||
rows, err := store.db.QueryContext(ctx,
|
||||
"SELECT group_name, policy_name, document FROM group_inline_policies ORDER BY group_name, policy_name")
|
||||
if err != nil {
|
||||
glog.Errorf("credential postgres: LoadGroupInlinePolicies query failed: %v", err)
|
||||
return nil, fmt.Errorf("failed to query group inline policies: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
count := 0
|
||||
for rows.Next() {
|
||||
var groupName, policyName string
|
||||
var docJSON []byte
|
||||
if err := rows.Scan(&groupName, &policyName, &docJSON); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan group inline policy row: %w", err)
|
||||
}
|
||||
|
||||
var doc policy_engine.PolicyDocument
|
||||
if err := json.Unmarshal(docJSON, &doc); err != nil {
|
||||
glog.Warningf("credential postgres: LoadGroupInlinePolicies unmarshal failed group=%s policy=%s: %v", groupName, policyName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if result[groupName] == nil {
|
||||
result[groupName] = make(map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
result[groupName][policyName] = doc
|
||||
count++
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed iterating group inline policy rows: %w", err)
|
||||
}
|
||||
|
||||
glog.V(0).Infof("credential postgres: LoadGroupInlinePolicies loaded %d policies for %d groups", count, len(result))
|
||||
return result, nil
|
||||
}
|
||||
@@ -196,6 +196,18 @@ func (store *PostgresStore) createTables() error {
|
||||
);
|
||||
`
|
||||
|
||||
groupInlinePoliciesTable := `
|
||||
CREATE TABLE IF NOT EXISTS group_inline_policies (
|
||||
group_name VARCHAR(255) REFERENCES groups(name) ON DELETE CASCADE,
|
||||
policy_name VARCHAR(255) NOT NULL,
|
||||
document JSONB NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (group_name, policy_name)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_group_inline_policies_group_name ON group_inline_policies(group_name);
|
||||
`
|
||||
|
||||
if _, err := store.db.Exec(usersTable); err != nil {
|
||||
return fmt.Errorf("failed to create users table: %w", err)
|
||||
}
|
||||
@@ -224,6 +236,10 @@ func (store *PostgresStore) createTables() error {
|
||||
return fmt.Errorf("failed to create groups table: %w", err)
|
||||
}
|
||||
|
||||
if _, err := store.db.Exec(groupInlinePoliciesTable); err != nil {
|
||||
return fmt.Errorf("failed to create group_inline_policies table: %w", err)
|
||||
}
|
||||
|
||||
groupsDisabledIndex := `CREATE INDEX IF NOT EXISTS idx_groups_disabled ON groups (disabled);`
|
||||
if _, err := store.db.Exec(groupsDisabledIndex); err != nil {
|
||||
return fmt.Errorf("failed to create groups disabled index: %w", err)
|
||||
|
||||
@@ -139,6 +139,81 @@ func (s *PropagatingCredentialStore) ListAttachedUserPolicies(ctx context.Contex
|
||||
return s.CredentialStore.ListAttachedUserPolicies(ctx, username)
|
||||
}
|
||||
|
||||
// SaveConfiguration overrides the embedded CredentialStore.SaveConfiguration
|
||||
// so bulk identity / group updates also push to running S3 caches. The IAM
|
||||
// API flow ends each handler with SaveConfiguration after recomputing
|
||||
// identity.Actions (the legacy authorization field), so reusing PutIdentity
|
||||
// here is what keeps inline-policy changes visible to S3 servers without
|
||||
// requiring a restart. We diff against the prior store state so deletions
|
||||
// also fan out (RemoveIdentity / RemoveGroup); without the diff a postgres
|
||||
// user who got pruned by SaveConfiguration would linger in the S3 cache.
|
||||
func (s *PropagatingCredentialStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
|
||||
priorUsers, priorErr := s.CredentialStore.ListUsers(ctx)
|
||||
if priorErr != nil {
|
||||
glog.V(1).Infof("failed to list users before SaveConfiguration; skipping deletion propagation: %v", priorErr)
|
||||
priorUsers = nil
|
||||
}
|
||||
priorGroups, gPriorErr := s.CredentialStore.ListGroups(ctx)
|
||||
if gPriorErr != nil {
|
||||
glog.V(1).Infof("failed to list groups before SaveConfiguration; skipping deletion propagation: %v", gPriorErr)
|
||||
priorGroups = nil
|
||||
}
|
||||
|
||||
if err := s.CredentialStore.SaveConfiguration(ctx, config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keptUsers := make(map[string]struct{}, len(config.Identities))
|
||||
for _, ident := range config.Identities {
|
||||
if ident != nil {
|
||||
keptUsers[ident.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
keptGroups := make(map[string]struct{}, len(config.Groups))
|
||||
for _, g := range config.Groups {
|
||||
if g != nil {
|
||||
keptGroups[g.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
s.propagateChange(ctx, func(tx context.Context, client s3_pb.SeaweedS3IamCacheClient) error {
|
||||
for _, ident := range config.Identities {
|
||||
if ident == nil {
|
||||
continue
|
||||
}
|
||||
if _, err := client.PutIdentity(tx, &iam_pb.PutIdentityRequest{Identity: ident}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, g := range config.Groups {
|
||||
if g == nil {
|
||||
continue
|
||||
}
|
||||
if _, err := client.PutGroup(tx, &iam_pb.PutGroupRequest{Group: g}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, name := range priorUsers {
|
||||
if _, kept := keptUsers[name]; kept {
|
||||
continue
|
||||
}
|
||||
if _, err := client.RemoveIdentity(tx, &iam_pb.RemoveIdentityRequest{Username: name}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, name := range priorGroups {
|
||||
if _, kept := keptGroups[name]; kept {
|
||||
continue
|
||||
}
|
||||
if _, err := client.RemoveGroup(tx, &iam_pb.RemoveGroupRequest{GroupName: name}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
|
||||
glog.V(4).Infof("IAM: PropagatingCredentialStore.CreateUser %s", identity.Name)
|
||||
if err := s.CredentialStore.CreateUser(ctx, identity); err != nil {
|
||||
@@ -308,6 +383,41 @@ func (s *PropagatingCredentialStore) ListUserInlinePolicies(ctx context.Context,
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) PutGroupInlinePolicy(ctx context.Context, groupName, policyName string, document policy_engine.PolicyDocument) error {
|
||||
if store, ok := s.CredentialStore.(GroupInlinePolicyStore); ok {
|
||||
return store.PutGroupInlinePolicy(ctx, groupName, policyName, document)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) GetGroupInlinePolicy(ctx context.Context, groupName, policyName string) (*policy_engine.PolicyDocument, error) {
|
||||
if store, ok := s.CredentialStore.(GroupInlinePolicyStore); ok {
|
||||
return store.GetGroupInlinePolicy(ctx, groupName, policyName)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) DeleteGroupInlinePolicy(ctx context.Context, groupName, policyName string) error {
|
||||
if store, ok := s.CredentialStore.(GroupInlinePolicyStore); ok {
|
||||
return store.DeleteGroupInlinePolicy(ctx, groupName, policyName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) ListGroupInlinePolicies(ctx context.Context, groupName string) ([]string, error) {
|
||||
if store, ok := s.CredentialStore.(GroupInlinePolicyStore); ok {
|
||||
return store.ListGroupInlinePolicies(ctx, groupName)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) LoadGroupInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
if loader, ok := s.CredentialStore.(GroupInlinePoliciesLoader); ok {
|
||||
return loader.LoadGroupInlinePolicies(ctx)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) CreatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error {
|
||||
if pm, ok := s.CredentialStore.(PolicyManager); ok {
|
||||
if err := pm.CreatePolicy(ctx, name, document); err != nil {
|
||||
|
||||
64
weed/credential/propagating_store_test.go
Normal file
64
weed/credential/propagating_store_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package credential_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/credential"
|
||||
"github.com/seaweedfs/seaweedfs/weed/credential/memory"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
// Side-effect: register filer_etc so the existing
|
||||
// TestCredentialStoreInterface / TestGetAvailableStores assertions
|
||||
// (which expect filer_etc to always be present) stay satisfied now
|
||||
// that this test file pulls in `memory` and the `len(Stores) == 0`
|
||||
// skip path no longer fires.
|
||||
_ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc"
|
||||
)
|
||||
|
||||
// TestSaveConfigurationDelegatesWithoutMaster confirms the override delegates
|
||||
// to the underlying store and the no-master fan-out path is a safe no-op.
|
||||
// propagateChange already short-circuits when masterClient is nil, so we
|
||||
// don't need to spin up a fake S3 IAM cache server here.
|
||||
func TestSaveConfigurationDelegatesWithoutMaster(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
upstream := &memory.MemoryStore{}
|
||||
require.NoError(t, upstream.Initialize(nil, ""))
|
||||
|
||||
ps := credential.NewPropagatingCredentialStore(upstream, nil, nil)
|
||||
|
||||
cfg := &iam_pb.S3ApiConfiguration{
|
||||
Identities: []*iam_pb.Identity{{Name: "alice"}, {Name: "bob"}},
|
||||
}
|
||||
require.NoError(t, ps.SaveConfiguration(ctx, cfg))
|
||||
|
||||
users, err := upstream.ListUsers(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.ElementsMatch(t, []string{"alice", "bob"}, users)
|
||||
}
|
||||
|
||||
// TestSaveConfigurationDiffSnapshotTakenBeforeWrite confirms that deletions
|
||||
// the underlying SaveConfiguration prunes are visible to the propagation
|
||||
// diff. We replay a second SaveConfiguration that drops "bob" and verify the
|
||||
// underlying store reflects the deletion (the propagation fan-out is a
|
||||
// no-op without masterClient but the diff snapshot pathway runs).
|
||||
func TestSaveConfigurationDiffSnapshotTakenBeforeWrite(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
upstream := &memory.MemoryStore{}
|
||||
require.NoError(t, upstream.Initialize(nil, ""))
|
||||
|
||||
ps := credential.NewPropagatingCredentialStore(upstream, nil, nil)
|
||||
|
||||
require.NoError(t, ps.SaveConfiguration(ctx, &iam_pb.S3ApiConfiguration{
|
||||
Identities: []*iam_pb.Identity{{Name: "alice"}, {Name: "bob"}},
|
||||
}))
|
||||
require.NoError(t, ps.SaveConfiguration(ctx, &iam_pb.S3ApiConfiguration{
|
||||
Identities: []*iam_pb.Identity{{Name: "alice"}},
|
||||
}))
|
||||
|
||||
users, err := upstream.ListUsers(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.ElementsMatch(t, []string{"alice"}, users)
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/seaweedfs/seaweedfs/weed/credential"
|
||||
@@ -202,36 +203,237 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC
|
||||
})
|
||||
}
|
||||
|
||||
// GetPolicies returns the merged view of managed, user-inline, and group-inline
|
||||
// policies from the configured credential store, plus a one-shot fallback read
|
||||
// of the legacy /etc/iam/policies.json bundle. Routing reads through the
|
||||
// credential manager keeps the IAM API in sync with the Admin UI; the legacy
|
||||
// file fallback lets deployments that wrote policies before this change still
|
||||
// see them until the next PutPolicies migrates them into the store.
|
||||
func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
|
||||
var buf bytes.Buffer
|
||||
err = pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil && err != filer_pb.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err == filer_pb.ErrNotFound || buf.Len() == 0 {
|
||||
ctx := context.Background()
|
||||
|
||||
if policies.Policies == nil {
|
||||
policies.Policies = make(map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
|
||||
if iama.credentialManager != nil {
|
||||
managed, mErr := iama.credentialManager.GetPolicies(ctx)
|
||||
if mErr != nil {
|
||||
return fmt.Errorf("load managed policies from credential store: %w", mErr)
|
||||
}
|
||||
for name, doc := range managed {
|
||||
policies.Policies[name] = doc
|
||||
}
|
||||
|
||||
inline, iErr := iama.credentialManager.LoadAllInlinePolicies(ctx)
|
||||
if iErr != nil {
|
||||
return fmt.Errorf("load user inline policies from credential store: %w", iErr)
|
||||
}
|
||||
mergeInline(&policies.InlinePolicies, inline)
|
||||
|
||||
groupInline, gErr := iama.credentialManager.LoadAllGroupInlinePolicies(ctx)
|
||||
if gErr != nil {
|
||||
return fmt.Errorf("load group inline policies from credential store: %w", gErr)
|
||||
}
|
||||
mergeInline(&policies.GroupInlinePolicies, groupInline)
|
||||
}
|
||||
|
||||
// Legacy /etc/iam/policies.json fallback. Credential store wins on
|
||||
// conflict (per name / per (user, name) / per (group, name)) so we never
|
||||
// produce duplicates.
|
||||
var buf bytes.Buffer
|
||||
rErr := pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf)
|
||||
})
|
||||
if rErr != nil && rErr != filer_pb.ErrNotFound {
|
||||
return fmt.Errorf("read legacy policies from filer: %w", rErr)
|
||||
}
|
||||
if rErr == filer_pb.ErrNotFound || buf.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := json.Unmarshal(buf.Bytes(), policies); err != nil {
|
||||
return err
|
||||
var legacy Policies
|
||||
if err := json.Unmarshal(buf.Bytes(), &legacy); err != nil {
|
||||
return fmt.Errorf("unmarshal legacy policies: %w", err)
|
||||
}
|
||||
for name, doc := range legacy.Policies {
|
||||
if _, exists := policies.Policies[name]; !exists {
|
||||
policies.Policies[name] = doc
|
||||
}
|
||||
}
|
||||
mergeInlineFallback(&policies.InlinePolicies, legacy.InlinePolicies)
|
||||
mergeInlineFallback(&policies.GroupInlinePolicies, legacy.GroupInlinePolicies)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutPolicies persists managed, user-inline, and group-inline policies through
|
||||
// the credential store as deltas relative to its current contents. When the
|
||||
// credential store is something other than filer_etc (postgres, grpc, memory),
|
||||
// the legacy /etc/iam/policies.json bundle is cleared after a successful write
|
||||
// so that data merged in by the GetPolicies fallback during migration cannot
|
||||
// resurface — otherwise a later DeletePolicy would be undone by the next read.
|
||||
// For filer_etc the bundle is left alone because filer_etc owns it as its own
|
||||
// inline-policy backing store (see filer_etc.saveLegacyPoliciesCollection).
|
||||
// With no credential manager available (a test path), everything is written
|
||||
// back to the bundle, preserving the pre-refactor behavior.
|
||||
func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
|
||||
var b []byte
|
||||
if b, err = json.Marshal(policies); err != nil {
|
||||
return err
|
||||
ctx := context.Background()
|
||||
|
||||
if iama.credentialManager == nil {
|
||||
legacy := Policies{
|
||||
Policies: policies.Policies,
|
||||
InlinePolicies: policies.InlinePolicies,
|
||||
GroupInlinePolicies: policies.GroupInlinePolicies,
|
||||
}
|
||||
return iama.writeLegacyPolicies(ctx, &legacy)
|
||||
}
|
||||
|
||||
// Managed delta.
|
||||
currentManaged, gErr := iama.credentialManager.GetPolicies(ctx)
|
||||
if gErr != nil {
|
||||
return fmt.Errorf("read managed policies from credential store: %w", gErr)
|
||||
}
|
||||
for name, doc := range policies.Policies {
|
||||
existing, found := currentManaged[name]
|
||||
if !found {
|
||||
if cErr := iama.credentialManager.CreatePolicy(ctx, name, doc); cErr != nil {
|
||||
return fmt.Errorf("create policy %s in credential store: %w", name, cErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(existing, doc) {
|
||||
if uErr := iama.credentialManager.UpdatePolicy(ctx, name, doc); uErr != nil {
|
||||
return fmt.Errorf("update policy %s in credential store: %w", name, uErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
for name := range currentManaged {
|
||||
if _, keep := policies.Policies[name]; !keep {
|
||||
if dErr := iama.credentialManager.DeletePolicy(ctx, name); dErr != nil {
|
||||
return fmt.Errorf("delete policy %s from credential store: %w", name, dErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// User-inline delta.
|
||||
currentInline, iErr := iama.credentialManager.LoadAllInlinePolicies(ctx)
|
||||
if iErr != nil {
|
||||
return fmt.Errorf("read user inline policies from credential store: %w", iErr)
|
||||
}
|
||||
for user, desired := range policies.InlinePolicies {
|
||||
existing := currentInline[user]
|
||||
for name, doc := range desired {
|
||||
if prior, found := existing[name]; found && reflect.DeepEqual(prior, doc) {
|
||||
continue
|
||||
}
|
||||
if pErr := iama.credentialManager.PutUserInlinePolicy(ctx, user, name, doc); pErr != nil {
|
||||
return fmt.Errorf("put inline policy %s/%s in credential store: %w", user, name, pErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
for user, existing := range currentInline {
|
||||
desired := policies.InlinePolicies[user]
|
||||
for name := range existing {
|
||||
if _, keep := desired[name]; keep {
|
||||
continue
|
||||
}
|
||||
if dErr := iama.credentialManager.DeleteUserInlinePolicy(ctx, user, name); dErr != nil {
|
||||
return fmt.Errorf("delete inline policy %s/%s from credential store: %w", user, name, dErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Group-inline delta.
|
||||
currentGroupInline, ggErr := iama.credentialManager.LoadAllGroupInlinePolicies(ctx)
|
||||
if ggErr != nil {
|
||||
return fmt.Errorf("read group inline policies from credential store: %w", ggErr)
|
||||
}
|
||||
for group, desired := range policies.GroupInlinePolicies {
|
||||
existing := currentGroupInline[group]
|
||||
for name, doc := range desired {
|
||||
if prior, found := existing[name]; found && reflect.DeepEqual(prior, doc) {
|
||||
continue
|
||||
}
|
||||
if pErr := iama.credentialManager.PutGroupInlinePolicy(ctx, group, name, doc); pErr != nil {
|
||||
return fmt.Errorf("put group inline policy %s/%s in credential store: %w", group, name, pErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
for group, existing := range currentGroupInline {
|
||||
desired := policies.GroupInlinePolicies[group]
|
||||
for name := range existing {
|
||||
if _, keep := desired[name]; keep {
|
||||
continue
|
||||
}
|
||||
if dErr := iama.credentialManager.DeleteGroupInlinePolicy(ctx, group, name); dErr != nil {
|
||||
return fmt.Errorf("delete group inline policy %s/%s from credential store: %w", group, name, dErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// filer_etc owns the legacy bundle itself (it's how filer_etc persists
|
||||
// inline policies). For every other store the bundle is at most a one-
|
||||
// shot migration source — empty it so deleted legacy-only entries don't
|
||||
// reappear via the GetPolicies fallback on the next read.
|
||||
if iama.credentialManager.GetStoreName() == string(credential.StoreTypeFilerEtc) {
|
||||
return nil
|
||||
}
|
||||
return iama.writeLegacyPolicies(ctx, &Policies{})
|
||||
}
|
||||
|
||||
// writeLegacyPolicies serializes the given Policies struct to the legacy
|
||||
// /etc/iam/policies.json bundle. Called both for the no-credential-manager
|
||||
// fallback (full bundle) and for the credential-manager path (empty bundle,
|
||||
// to drain legacy data after migration).
|
||||
func (iama *IamS3ApiConfigure) writeLegacyPolicies(ctx context.Context, p *Policies) error {
|
||||
b, err := json.Marshal(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal legacy policies: %w", err)
|
||||
}
|
||||
return pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
if err := filer.SaveInsideFiler(context.Background(), client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return filer.SaveInsideFiler(ctx, client, filer.IamConfigDirectory, filer.IamPoliciesFile, b)
|
||||
})
|
||||
}
|
||||
|
||||
// mergeInline overwrites entries in dst with src (credential-store data taking
|
||||
// precedence over anything already in dst).
|
||||
func mergeInline(dst *map[string]map[string]policy_engine.PolicyDocument, src map[string]map[string]policy_engine.PolicyDocument) {
|
||||
if len(src) == 0 {
|
||||
return
|
||||
}
|
||||
if *dst == nil {
|
||||
*dst = make(map[string]map[string]policy_engine.PolicyDocument, len(src))
|
||||
}
|
||||
for outer, byName := range src {
|
||||
bucket := (*dst)[outer]
|
||||
if bucket == nil {
|
||||
bucket = make(map[string]policy_engine.PolicyDocument, len(byName))
|
||||
(*dst)[outer] = bucket
|
||||
}
|
||||
for name, doc := range byName {
|
||||
bucket[name] = doc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mergeInlineFallback fills only entries that are missing in dst — the legacy
|
||||
// file never overrides whatever the credential store returned.
|
||||
func mergeInlineFallback(dst *map[string]map[string]policy_engine.PolicyDocument, src map[string]map[string]policy_engine.PolicyDocument) {
|
||||
if len(src) == 0 {
|
||||
return
|
||||
}
|
||||
if *dst == nil {
|
||||
*dst = make(map[string]map[string]policy_engine.PolicyDocument, len(src))
|
||||
}
|
||||
for outer, byName := range src {
|
||||
bucket := (*dst)[outer]
|
||||
if bucket == nil {
|
||||
bucket = make(map[string]policy_engine.PolicyDocument, len(byName))
|
||||
(*dst)[outer] = bucket
|
||||
}
|
||||
for name, doc := range byName {
|
||||
if _, exists := bucket[name]; !exists {
|
||||
bucket[name] = doc
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
253
weed/iamapi/iamapi_server_test.go
Normal file
253
weed/iamapi/iamapi_server_test.go
Normal file
@@ -0,0 +1,253 @@
|
||||
package iamapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/credential"
|
||||
"github.com/seaweedfs/seaweedfs/weed/credential/memory"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// newIamS3ApiConfigureForTest builds an IamS3ApiConfigure backed by a memory
|
||||
// credential store and no filer. The filer-less option is fine because the
|
||||
// new code only touches the filer for inline/group policies, which these
|
||||
// tests don't exercise.
|
||||
func newIamS3ApiConfigureForTest(t *testing.T) (*IamS3ApiConfigure, *credential.CredentialManager) {
|
||||
t.Helper()
|
||||
store := &memory.MemoryStore{}
|
||||
require.NoError(t, store.Initialize(nil, ""))
|
||||
cm := &credential.CredentialManager{Store: store}
|
||||
cfg := &IamS3ApiConfigure{
|
||||
option: &IamServerOption{},
|
||||
credentialManager: cm,
|
||||
}
|
||||
return cfg, cm
|
||||
}
|
||||
|
||||
func samplePolicyDocument(resource string) policy_engine.PolicyDocument {
|
||||
return policy_engine.PolicyDocument{
|
||||
Version: "2012-10-17",
|
||||
Statement: []policy_engine.PolicyStatement{
|
||||
{
|
||||
Effect: policy_engine.PolicyEffectAllow,
|
||||
Action: policy_engine.NewStringOrStringSlice("s3:Get*"),
|
||||
Resource: policy_engine.NewStringOrStringSlicePtr(resource),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestPutPoliciesCreatesInCredentialStore is the regression test for
|
||||
// https://github.com/seaweedfs/seaweedfs/issues/9518: the IAM API used to
|
||||
// write managed policies straight to the filer, bypassing any non-filer
|
||||
// credential store (e.g. postgres). After the fix, a new policy passed to
|
||||
// PutPolicies must show up in the credential store.
|
||||
func TestPutPoliciesCreatesInCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
doc := samplePolicyDocument("arn:aws:s3:::bucket-a")
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
Policies: map[string]policy_engine.PolicyDocument{"policy-a": doc},
|
||||
}))
|
||||
|
||||
stored, err := cm.GetPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, stored, "policy-a")
|
||||
assert.Equal(t, doc, stored["policy-a"])
|
||||
}
|
||||
|
||||
// TestGetPoliciesReadsFromCredentialStore mirrors the read side of the bug:
|
||||
// ListPolicies / GetPolicy via the IAM API used to read only the filer file
|
||||
// and therefore missed policies that the Admin UI had written to the store.
|
||||
func TestGetPoliciesReadsFromCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
doc := samplePolicyDocument("arn:aws:s3:::bucket-b")
|
||||
require.NoError(t, cm.CreatePolicy(ctx, "policy-b", doc))
|
||||
|
||||
loaded := Policies{}
|
||||
require.NoError(t, cfg.GetPolicies(&loaded))
|
||||
require.Contains(t, loaded.Policies, "policy-b")
|
||||
assert.Equal(t, doc, loaded.Policies["policy-b"])
|
||||
}
|
||||
|
||||
// TestPutPoliciesDeletesRemovedFromCredentialStore makes sure the delta path
|
||||
// translates an in-memory deletion (the pattern the DeletePolicy handler
|
||||
// uses) into a credential-store DeletePolicy call.
|
||||
func TestPutPoliciesDeletesRemovedFromCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
keep := samplePolicyDocument("arn:aws:s3:::keep")
|
||||
drop := samplePolicyDocument("arn:aws:s3:::drop")
|
||||
require.NoError(t, cm.CreatePolicy(ctx, "keep", keep))
|
||||
require.NoError(t, cm.CreatePolicy(ctx, "drop", drop))
|
||||
|
||||
// Caller fetched policies, removed "drop", and asked us to persist.
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
Policies: map[string]policy_engine.PolicyDocument{"keep": keep},
|
||||
}))
|
||||
|
||||
remaining, err := cm.GetPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, remaining, "keep")
|
||||
assert.NotContains(t, remaining, "drop")
|
||||
}
|
||||
|
||||
// TestPutPoliciesUpdatesChangedInCredentialStore confirms the delta also
|
||||
// detects document changes for the same policy name.
|
||||
func TestPutPoliciesUpdatesChangedInCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
original := samplePolicyDocument("arn:aws:s3:::v1")
|
||||
require.NoError(t, cm.CreatePolicy(ctx, "p", original))
|
||||
|
||||
updated := samplePolicyDocument("arn:aws:s3:::v2")
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
Policies: map[string]policy_engine.PolicyDocument{"p": updated},
|
||||
}))
|
||||
|
||||
stored, err := cm.GetPolicy(ctx, "p")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stored)
|
||||
assert.Equal(t, updated, *stored)
|
||||
}
|
||||
|
||||
// TestPutPoliciesRoutesUserInlineThroughCredentialStore is the second half of
|
||||
// the #9518 fix: per-user inline policies created via the IAM API must land
|
||||
// in the credential store (so postgres users have a single source of truth)
|
||||
// rather than in the filer bundle.
|
||||
func TestPutPoliciesRoutesUserInlineThroughCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
doc := samplePolicyDocument("arn:aws:s3:::user-bucket")
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
InlinePolicies: map[string]map[string]policy_engine.PolicyDocument{
|
||||
"alice": {"inline-1": doc},
|
||||
},
|
||||
}))
|
||||
|
||||
stored, err := cm.GetUserInlinePolicy(ctx, "alice", "inline-1")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stored)
|
||||
assert.Equal(t, doc, *stored)
|
||||
}
|
||||
|
||||
// TestPutPoliciesDeletesRemovedUserInline mirrors the deletion side. The
|
||||
// DeleteUserPolicy handler removes the entry from the in-memory map and calls
|
||||
// PutPolicies; the delta must call DeleteUserInlinePolicy.
|
||||
func TestPutPoliciesDeletesRemovedUserInline(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
keep := samplePolicyDocument("arn:aws:s3:::keep")
|
||||
drop := samplePolicyDocument("arn:aws:s3:::drop")
|
||||
require.NoError(t, cm.PutUserInlinePolicy(ctx, "bob", "keep", keep))
|
||||
require.NoError(t, cm.PutUserInlinePolicy(ctx, "bob", "drop", drop))
|
||||
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
InlinePolicies: map[string]map[string]policy_engine.PolicyDocument{
|
||||
"bob": {"keep": keep},
|
||||
},
|
||||
}))
|
||||
|
||||
remaining, err := cm.ListUserInlinePolicies(ctx, "bob")
|
||||
require.NoError(t, err)
|
||||
assert.ElementsMatch(t, []string{"keep"}, remaining)
|
||||
}
|
||||
|
||||
// TestPutPoliciesRoutesGroupInlineThroughCredentialStore confirms group-attached
|
||||
// inline policies also persist via the credential manager rather than the
|
||||
// shared filer bundle.
|
||||
func TestPutPoliciesRoutesGroupInlineThroughCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
doc := samplePolicyDocument("arn:aws:s3:::group-bucket")
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
GroupInlinePolicies: map[string]map[string]policy_engine.PolicyDocument{
|
||||
"devs": {"inline-g": doc},
|
||||
},
|
||||
}))
|
||||
|
||||
stored, err := cm.GetGroupInlinePolicy(ctx, "devs", "inline-g")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, stored)
|
||||
assert.Equal(t, doc, *stored)
|
||||
}
|
||||
|
||||
// TestPutPoliciesDeletesRemovedGroupInline covers the deletion delta for
|
||||
// group-attached inline policies.
|
||||
func TestPutPoliciesDeletesRemovedGroupInline(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
keep := samplePolicyDocument("arn:aws:s3:::keep")
|
||||
drop := samplePolicyDocument("arn:aws:s3:::drop")
|
||||
require.NoError(t, cm.PutGroupInlinePolicy(ctx, "devs", "keep", keep))
|
||||
require.NoError(t, cm.PutGroupInlinePolicy(ctx, "devs", "drop", drop))
|
||||
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
GroupInlinePolicies: map[string]map[string]policy_engine.PolicyDocument{
|
||||
"devs": {"keep": keep},
|
||||
},
|
||||
}))
|
||||
|
||||
remaining, err := cm.ListGroupInlinePolicies(ctx, "devs")
|
||||
require.NoError(t, err)
|
||||
assert.ElementsMatch(t, []string{"keep"}, remaining)
|
||||
}
|
||||
|
||||
// TestPutPoliciesIsAuthoritativeWhenStoreIsNotFilerEtc is the regression test
|
||||
// for the legacy-file resurrection bug: once the credential store is
|
||||
// authoritative, a delete must stick across the GetPolicies fallback. We
|
||||
// can't verify the legacy-file rewrite directly without a fake filer in
|
||||
// the test harness, but we can verify the cm-side delta behavior: a policy
|
||||
// that was deleted does not come back via cm on a subsequent fetch.
|
||||
func TestPutPoliciesIsAuthoritativeWhenStoreIsNotFilerEtc(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
require.Equal(t, "memory", cm.GetStoreName(), "test depends on memory store name carveout")
|
||||
|
||||
doc := samplePolicyDocument("arn:aws:s3:::vanishing")
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
Policies: map[string]policy_engine.PolicyDocument{"will-vanish": doc},
|
||||
}))
|
||||
|
||||
require.NoError(t, cfg.PutPolicies(&Policies{
|
||||
Policies: map[string]policy_engine.PolicyDocument{},
|
||||
}))
|
||||
|
||||
remaining, err := cm.GetPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotContains(t, remaining, "will-vanish")
|
||||
}
|
||||
|
||||
// TestGetPoliciesReadsInlineAndGroupInlineFromCredentialStore confirms reads
|
||||
// pull user-inline and group-inline policies from the store too, so handlers
|
||||
// like ListUserPolicies / GetUserPolicy see Admin-UI writes.
|
||||
func TestGetPoliciesReadsInlineAndGroupInlineFromCredentialStore(t *testing.T) {
|
||||
cfg, cm := newIamS3ApiConfigureForTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
userDoc := samplePolicyDocument("arn:aws:s3:::user-data")
|
||||
groupDoc := samplePolicyDocument("arn:aws:s3:::group-data")
|
||||
require.NoError(t, cm.PutUserInlinePolicy(ctx, "carol", "u-policy", userDoc))
|
||||
require.NoError(t, cm.PutGroupInlinePolicy(ctx, "admins", "g-policy", groupDoc))
|
||||
|
||||
loaded := Policies{}
|
||||
require.NoError(t, cfg.GetPolicies(&loaded))
|
||||
|
||||
require.Contains(t, loaded.InlinePolicies, "carol")
|
||||
assert.Equal(t, userDoc, loaded.InlinePolicies["carol"]["u-policy"])
|
||||
require.Contains(t, loaded.GroupInlinePolicies, "admins")
|
||||
assert.Equal(t, groupDoc, loaded.GroupInlinePolicies["admins"]["g-policy"])
|
||||
}
|
||||
Reference in New Issue
Block a user