mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* refactor(util): extract pgx OpenDB + DSN builder into shared pgxutil
The postgres filer store had OpenPGXDB plus duplicated key=value DSN
assembly across postgres/ and postgres2/. Move the connection helper to
weed/util/pgxutil and add BuildDSN so the credential postgres store can
land on the same code path.
filer/postgres/pgx_conn.go keeps OpenPGXDB as a thin alias so postgres2
keeps building unchanged.
* refactor(credential/postgres): use shared pgxutil for connection setup
Replace the bespoke fmt.Sprintf DSN + sql.Open("pgx", ...) path with
pgxutil.BuildDSN + pgxutil.OpenDB so the credential store mirrors the
postgres filer store. This also drops the leaky RegisterConnConfig-style
init in favor of stdlib.OpenDB(*config), which doesn't accumulate
entries in the global pgx config map.
Adds parity knobs the filer store already exposes: sslcrl, and
configurable connection_max_idle / connection_max_open /
connection_max_lifetime_seconds (with the previous hardcoded 25/5/5min
as defaults). Also moves the jsonbParam helper here so other store
files can reuse it. (Helper is also referenced by postgres_identity.go,
which is migrated to it in the next commit.)
* refactor(credential/postgres): use jsonbParam helper across all writers
Consolidate JSONB write handling on the new pgxutil-adjacent helper
jsonbParam(b []byte) interface{}, which returns nil (driver writes SQL
NULL) when the marshaled JSON is empty and string(b) otherwise.
postgres_identity.go: replace the inline 'var fooParam any' /
'fooParam = string(b)' pattern with the helper. Same in CreateUser
and UpdateUser.
postgres_inline_policy.go, postgres_policy.go, postgres_service_account.go,
postgres_group.go: every JSONB writer was still passing []byte. Under
pgx simple_protocol (pgbouncer_compatible=true), []byte is encoded as
bytea and Postgres rejects that against a JSONB column with "invalid
input syntax for type json". Route them through jsonbParam too.
* fix(credential/postgres): rework SaveConfiguration to handle rename + UNIQUE access keys
The IAM rename path (s3api UpdateUser) renames an identity in place
and keeps its access keys. With the previous flow — upsert each user,
then per-user delete-and-insert credentials, then prune absent users —
the renamed user's access keys were still owned by the old row when
the INSERT for the new name ran, tripping credentials.access_key's
global UNIQUE constraint and failing every rename of a user with
credentials.
Reorder the SaveConfiguration body so the prune step runs BEFORE the
credential replace. CASCADE on the old user releases its access keys
in the same transaction, and the new name can then claim them.
While here:
- Replace the per-user loop DELETE FROM users WHERE username = $1 with
a single DELETE ... WHERE username = ANY($1), one round trip instead
of N inside the transaction.
- Surface inline-policy CASCADE losses: count user_inline_policies for
the prune set and emit a Warningf when the count is non-zero so
rename-driven drops are visible in operator logs (the structural
fix for renames lives at the IAM layer in a follow-up commit).
- Two-pass credential replace: clear credentials for every user we are
about to rewrite first, then insert, so an access key can be moved
between two users in the same SaveConfiguration call.
- credErr := credRows.Err() before credRows.Close() in
LoadConfiguration — Err() is documented as safe after Close, but
the leading-capture pattern matches the rest of the file.
* fix(s3api/iam): preserve inline policies when renaming a user
EmbeddedIamApi.UpdateUser renames an identity in place and the caller
persists via SaveConfiguration, which prunes the old username and
CASCADE-drops its rows from user_inline_policies. GetUserPolicy and
ListUserPolicies then return nothing under the new name even though
the API reported success — silent data loss.
Before flipping sourceIdent.Name, list the user's stored inline
policies and re-attach each one under the new name. The subsequent
SaveConfiguration prune still CASCADE-removes the old-name rows; only
the duplicates we just wrote under the new name survive. Adds a
regression test that puts a policy on the old name, renames, and
asserts the policy is readable under the new name.
* perf(credential/postgres): batch the credential clear in SaveConfiguration
The two-pass credential replace was clearing each incoming user's
credentials with its own DELETE statement — N round-trips inside the
transaction. Match the pattern already used for the user prune and
issue a single DELETE FROM credentials WHERE username = ANY($1)
instead.
* refactor(s3api/iam): plumb context through UpdateUser
UpdateUser was synthesizing a fresh context.Background() inside the
inline-policy migration block, which discards the request deadline,
cancellation, and tracing carried by the caller. Add ctx as the first
parameter and pass r.Context() in via the ExecuteAction dispatcher,
mirroring the signature already used by CreatePolicy /
AttachUserPolicy / DetachUserPolicy.
* fix(util/pgxutil): quote DSN values per libpq rules
BuildDSN was concatenating values directly, so any password / cert path
/ database name with a space, single quote, or backslash produced a
malformed connection string and pgx.ParseConfig either errored or
mis-parsed the remainder. Critical now that the helper is shared with
the credential store: mTLS deployments routinely sourcing passwords or
secret-mounted cert paths from a vault are exactly the case where
spaces and quotes show up.
Add quoteDSNValue: empty values and values containing whitespace, `'`,
or `\` are wrapped in single quotes with `'` and `\` escaped per
PostgreSQL libpq rules; plain alphanumeric values pass through
unchanged. Apply it to every variable field in BuildDSN.
Adds a test that round-trips a password containing spaces, quotes and
backslashes through pgx.ParseConfig and confirms the parsed Config
matches the input.
* fix(credential,s3api/iam): atomic UserRenamer to avoid FK violation on rename
The previous IAM rename path called PutUserInlinePolicy(newName, ...)
before SaveConfiguration created the new users row. user_inline_policies
has a non-deferrable FOREIGN KEY (username) REFERENCES users(username),
which Postgres validates at statement time, so every rename of a user
that owned at least one inline policy failed with an FK violation. The
existing memory-store regression test missed it because the memory
backend has no FK enforcement.
Add an optional credential.UserRenamer interface plus a
CredentialManager.RenameUser thin shim that returns (supported, err).
Implement it on PostgresStore as an atomic in-transaction migration:
INSERT the new users row by SELECT-copying from the old, UPDATE
credentials.username and user_inline_policies.username to the new
name (FK satisfied because both rows now exist), then DELETE the old
row. ErrUserNotFound / ErrUserAlreadyExists are surfaced cleanly.
Implement it on MemoryStore by re-binding store.users / store.accessKeys
/ store.inlinePolicies under the new name. Also fixes a small leak in
DeleteUser, which was forgetting to drop the user's inline-policy
bucket.
EmbeddedIamApi.UpdateUser now calls RenameUser first; if the store
implements the interface, that's the whole migration. If it doesn't
(stores without FK enforcement), fall back to the previous
list / get / put copy.
Adds a focused test for MemoryStore.RenameUser that asserts the
identity, the access-key index, and the inline policies all land
under the new name.
728 lines
24 KiB
Go
728 lines
24 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/credential"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
|
)
|
|
|
|
func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
|
|
if !store.configured {
|
|
return nil, fmt.Errorf("store not configured")
|
|
}
|
|
|
|
config := &iam_pb.S3ApiConfiguration{}
|
|
|
|
rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions, policy_names FROM users")
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: LoadConfiguration query failed: %v", err)
|
|
return nil, fmt.Errorf("failed to query users: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var username, email string
|
|
var accountDataJSON, actionsJSON, policyNamesJSON []byte
|
|
|
|
if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON, &policyNamesJSON); err != nil {
|
|
glog.Errorf("credential postgres: LoadConfiguration scan failed: %v", err)
|
|
return nil, fmt.Errorf("failed to scan user row: %w", err)
|
|
}
|
|
|
|
identity := &iam_pb.Identity{
|
|
Name: username,
|
|
}
|
|
|
|
if len(accountDataJSON) > 0 {
|
|
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
|
|
}
|
|
}
|
|
|
|
if len(actionsJSON) > 0 {
|
|
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
|
|
}
|
|
}
|
|
|
|
if len(policyNamesJSON) > 0 {
|
|
if err := json.Unmarshal(policyNamesJSON, &identity.PolicyNames); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal policy names for user %s: %v", username, err)
|
|
}
|
|
}
|
|
|
|
credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
|
|
}
|
|
|
|
for credRows.Next() {
|
|
var accessKey, secretKey string
|
|
if err := credRows.Scan(&accessKey, &secretKey); err != nil {
|
|
credRows.Close()
|
|
return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
|
|
}
|
|
|
|
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
|
|
AccessKey: accessKey,
|
|
SecretKey: secretKey,
|
|
})
|
|
}
|
|
credErr := credRows.Err()
|
|
credRows.Close()
|
|
if credErr != nil {
|
|
return nil, fmt.Errorf("failed iterating credential rows for user %s: %w", username, credErr)
|
|
}
|
|
config.Identities = append(config.Identities, identity)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("failed iterating user rows: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: LoadConfiguration loaded %d identities", len(config.Identities))
|
|
return config, nil
|
|
}
|
|
|
|
func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
tx, err := store.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Track which usernames are in the incoming config for pruning
|
|
configUsernames := make(map[string]bool, len(config.Identities))
|
|
for _, identity := range config.Identities {
|
|
configUsernames[identity.Name] = true
|
|
}
|
|
|
|
// Upsert each user's row.
|
|
for _, identity := range config.Identities {
|
|
var accountDataJSON []byte
|
|
if identity.Account != nil {
|
|
accountDataJSON, err = json.Marshal(identity.Account)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
|
|
}
|
|
}
|
|
|
|
var actionsJSON []byte
|
|
if identity.Actions != nil {
|
|
actionsJSON, err = json.Marshal(identity.Actions)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
|
|
}
|
|
}
|
|
|
|
var policyNamesJSON []byte
|
|
if identity.PolicyNames != nil {
|
|
policyNamesJSON, err = json.Marshal(identity.PolicyNames)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal policy names for user %s: %v", identity.Name, err)
|
|
}
|
|
}
|
|
|
|
// Upsert user — preserves the row (and its CASCADE dependents) if it already exists
|
|
_, err = tx.ExecContext(ctx,
|
|
`INSERT INTO users (username, email, account_data, actions, policy_names)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
ON CONFLICT (username) DO UPDATE SET
|
|
email = EXCLUDED.email,
|
|
account_data = EXCLUDED.account_data,
|
|
actions = EXCLUDED.actions,
|
|
policy_names = EXCLUDED.policy_names,
|
|
updated_at = CURRENT_TIMESTAMP`,
|
|
identity.Name, "", jsonbParam(accountDataJSON), jsonbParam(actionsJSON), jsonbParam(policyNamesJSON))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upsert user %s: %v", identity.Name, err)
|
|
}
|
|
}
|
|
|
|
// Prune users no longer in config BEFORE replacing credentials. The
|
|
// IAM rename path (s3api UpdateUser) renames an identity in place and
|
|
// keeps its access keys: the renamed user shows up in the incoming
|
|
// config, the old name shows up in the pruned set. If we inserted the
|
|
// new credentials first, the renamed user's access keys would still be
|
|
// owned by the old row in this transaction and the INSERT would
|
|
// violate the global UNIQUE constraint on credentials.access_key.
|
|
// CASCADE on the old row releases those keys before the insert.
|
|
rows, err := tx.QueryContext(ctx, "SELECT username FROM users")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list existing users for pruning: %w", err)
|
|
}
|
|
var toDelete []string
|
|
for rows.Next() {
|
|
var username string
|
|
if err := rows.Scan(&username); err != nil {
|
|
rows.Close()
|
|
return fmt.Errorf("failed to scan username for pruning: %w", err)
|
|
}
|
|
if !configUsernames[username] {
|
|
toDelete = append(toDelete, username)
|
|
}
|
|
}
|
|
scanErr := rows.Err()
|
|
rows.Close()
|
|
if scanErr != nil {
|
|
return fmt.Errorf("failed iterating user rows for pruning: %w", scanErr)
|
|
}
|
|
|
|
if len(toDelete) > 0 {
|
|
var inlineCount int
|
|
if err := tx.QueryRowContext(ctx,
|
|
"SELECT COUNT(*) FROM user_inline_policies WHERE username = ANY($1)",
|
|
toDelete).Scan(&inlineCount); err != nil {
|
|
glog.Warningf("credential postgres: SaveConfiguration failed to count inline policies for prune candidates: %v", err)
|
|
} else if inlineCount > 0 {
|
|
glog.Warningf("credential postgres: SaveConfiguration pruning %d users will CASCADE-remove %d inline policies; if this was a rename, re-create them under the new name", len(toDelete), inlineCount)
|
|
}
|
|
if _, err := tx.ExecContext(ctx, "DELETE FROM users WHERE username = ANY($1)", toDelete); err != nil {
|
|
return fmt.Errorf("failed to prune users: %w", err)
|
|
}
|
|
}
|
|
|
|
// Two-pass credential replace: first clear every user we are about to
|
|
// rewrite in a single round-trip, then insert. Doing the per-user delete
|
|
// + insert in a single pass would violate the global UNIQUE constraint
|
|
// on credentials.access_key when an access key gets reassigned from one
|
|
// user to another within the same SaveConfiguration call.
|
|
usernames := make([]string, 0, len(configUsernames))
|
|
for name := range configUsernames {
|
|
usernames = append(usernames, name)
|
|
}
|
|
if _, err := tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = ANY($1)", usernames); err != nil {
|
|
return fmt.Errorf("failed to clear credentials for incoming users: %w", err)
|
|
}
|
|
for _, identity := range config.Identities {
|
|
for _, cred := range identity.Credentials {
|
|
if _, err := tx.ExecContext(ctx,
|
|
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
|
|
identity.Name, cred.AccessKey, cred.SecretKey); err != nil {
|
|
return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
glog.Errorf("credential postgres: SaveConfiguration commit failed: %v", err)
|
|
return fmt.Errorf("failed to commit SaveConfiguration: %w", err)
|
|
}
|
|
|
|
if len(toDelete) > 0 {
|
|
glog.Warningf("credential postgres: SaveConfiguration saved %d identities, pruned %d", len(config.Identities), len(toDelete))
|
|
} else {
|
|
glog.V(0).Infof("credential postgres: SaveConfiguration saved %d identities", len(config.Identities))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
var count int
|
|
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: CreateUser check failed user=%s: %v", identity.Name, err)
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
if count > 0 {
|
|
glog.V(1).Infof("credential postgres: CreateUser user=%s already exists", identity.Name)
|
|
return credential.ErrUserAlreadyExists
|
|
}
|
|
|
|
tx, err := store.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
var accountDataJSON []byte
|
|
if identity.Account != nil {
|
|
accountDataJSON, err = json.Marshal(identity.Account)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal account data: %w", err)
|
|
}
|
|
}
|
|
|
|
var actionsJSON []byte
|
|
if identity.Actions != nil {
|
|
actionsJSON, err = json.Marshal(identity.Actions)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal actions: %w", err)
|
|
}
|
|
}
|
|
|
|
var policyNamesJSON []byte
|
|
if identity.PolicyNames != nil {
|
|
policyNamesJSON, err = json.Marshal(identity.PolicyNames)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal policy names: %w", err)
|
|
}
|
|
}
|
|
|
|
_, err = tx.ExecContext(ctx,
|
|
"INSERT INTO users (username, email, account_data, actions, policy_names) VALUES ($1, $2, $3, $4, $5)",
|
|
identity.Name, "", jsonbParam(accountDataJSON), jsonbParam(actionsJSON), jsonbParam(policyNamesJSON))
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: CreateUser insert failed user=%s: %v", identity.Name, err)
|
|
return fmt.Errorf("failed to insert user: %w", err)
|
|
}
|
|
|
|
for _, cred := range identity.Credentials {
|
|
_, err = tx.ExecContext(ctx,
|
|
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
|
|
identity.Name, cred.AccessKey, cred.SecretKey)
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: CreateUser insert credential failed user=%s accessKey=%s: %v", identity.Name, cred.AccessKey, err)
|
|
return fmt.Errorf("failed to insert credential: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
glog.Errorf("credential postgres: CreateUser commit failed user=%s: %v", identity.Name, err)
|
|
return fmt.Errorf("failed to commit create user %s: %w", identity.Name, err)
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: CreateUser user=%s credentials=%d actions=%d", identity.Name, len(identity.Credentials), len(identity.Actions))
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
|
|
if !store.configured {
|
|
return nil, fmt.Errorf("store not configured")
|
|
}
|
|
|
|
var email string
|
|
var accountDataJSON, actionsJSON, policyNamesJSON []byte
|
|
|
|
err := store.db.QueryRowContext(ctx,
|
|
"SELECT email, account_data, actions, policy_names FROM users WHERE username = $1",
|
|
username).Scan(&email, &accountDataJSON, &actionsJSON, &policyNamesJSON)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
glog.V(2).Infof("credential postgres: GetUser user=%s not found", username)
|
|
return nil, credential.ErrUserNotFound
|
|
}
|
|
glog.Errorf("credential postgres: GetUser query failed user=%s: %v", username, err)
|
|
return nil, fmt.Errorf("failed to query user: %w", err)
|
|
}
|
|
|
|
identity := &iam_pb.Identity{
|
|
Name: username,
|
|
}
|
|
|
|
if len(accountDataJSON) > 0 {
|
|
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal account data: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(actionsJSON) > 0 {
|
|
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal actions: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(policyNamesJSON) > 0 {
|
|
if err := json.Unmarshal(policyNamesJSON, &identity.PolicyNames); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal policy names: %w", err)
|
|
}
|
|
}
|
|
|
|
rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query credentials: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var accessKey, secretKey string
|
|
if err := rows.Scan(&accessKey, &secretKey); err != nil {
|
|
return nil, fmt.Errorf("failed to scan credential: %w", err)
|
|
}
|
|
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
|
|
AccessKey: accessKey,
|
|
SecretKey: secretKey,
|
|
})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("failed iterating credential rows: %w", err)
|
|
}
|
|
|
|
glog.V(2).Infof("credential postgres: GetUser user=%s credentials=%d actions=%d", username, len(identity.Credentials), len(identity.Actions))
|
|
return identity, nil
|
|
}
|
|
|
|
func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
tx, err := store.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
var count int
|
|
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
if count == 0 {
|
|
return credential.ErrUserNotFound
|
|
}
|
|
|
|
var accountDataJSON []byte
|
|
if identity.Account != nil {
|
|
accountDataJSON, err = json.Marshal(identity.Account)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal account data: %w", err)
|
|
}
|
|
}
|
|
|
|
var actionsJSON []byte
|
|
if identity.Actions != nil {
|
|
actionsJSON, err = json.Marshal(identity.Actions)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal actions: %w", err)
|
|
}
|
|
}
|
|
|
|
var policyNamesJSON []byte
|
|
if identity.PolicyNames != nil {
|
|
policyNamesJSON, err = json.Marshal(identity.PolicyNames)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal policy names: %w", err)
|
|
}
|
|
}
|
|
|
|
_, err = tx.ExecContext(ctx,
|
|
"UPDATE users SET email = $2, account_data = $3, actions = $4, policy_names = $5, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
|
|
username, "", jsonbParam(accountDataJSON), jsonbParam(actionsJSON), jsonbParam(policyNamesJSON))
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: UpdateUser failed user=%s: %v", username, err)
|
|
return fmt.Errorf("failed to update user: %w", err)
|
|
}
|
|
|
|
_, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete existing credentials: %w", err)
|
|
}
|
|
|
|
for _, cred := range identity.Credentials {
|
|
_, err = tx.ExecContext(ctx,
|
|
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
|
|
username, cred.AccessKey, cred.SecretKey)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert credential: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
glog.Errorf("credential postgres: UpdateUser commit failed user=%s: %v", username, err)
|
|
return fmt.Errorf("failed to commit update user %s: %w", username, err)
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: UpdateUser user=%s credentials=%d", username, len(identity.Credentials))
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: DeleteUser failed user=%s: %v", username, err)
|
|
return fmt.Errorf("failed to delete user: %w", err)
|
|
}
|
|
|
|
rowsAffected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
}
|
|
|
|
if rowsAffected == 0 {
|
|
glog.V(1).Infof("credential postgres: DeleteUser user=%s not found", username)
|
|
return credential.ErrUserNotFound
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: DeleteUser user=%s", username)
|
|
return nil
|
|
}
|
|
|
|
// RenameUser atomically renames oldName to newName, re-pointing every
|
|
// FK-backed dependent row in a single transaction. The schema's foreign
|
|
// keys (credentials.username, user_inline_policies.username) reference
|
|
// users.username with ON DELETE CASCADE and the default ON UPDATE
|
|
// NO ACTION, which means a plain UPDATE users SET username = newName
|
|
// would fail because the children still reference the old name. Instead:
|
|
// 1. Insert the new users row by copying every non-key column from old.
|
|
// 2. Re-point the dependent tables (credentials, user_inline_policies)
|
|
// to the new username — both rows now exist so the FK is satisfied.
|
|
// 3. Delete the old users row, which has no remaining dependents.
|
|
//
|
|
// Implements credential.UserRenamer.
|
|
func (store *PostgresStore) RenameUser(ctx context.Context, oldName, newName string) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
if oldName == newName {
|
|
return nil
|
|
}
|
|
|
|
tx, err := store.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Refuse if oldName is missing or newName already exists, so the
|
|
// caller surfaces a recognisable error instead of a constraint
|
|
// violation midway through.
|
|
var oldExists, newExists bool
|
|
if err := tx.QueryRowContext(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", oldName).Scan(&oldExists); err != nil {
|
|
return fmt.Errorf("failed to check old user %s: %w", oldName, err)
|
|
}
|
|
if !oldExists {
|
|
return credential.ErrUserNotFound
|
|
}
|
|
if err := tx.QueryRowContext(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", newName).Scan(&newExists); err != nil {
|
|
return fmt.Errorf("failed to check new user %s: %w", newName, err)
|
|
}
|
|
if newExists {
|
|
return credential.ErrUserAlreadyExists
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx,
|
|
`INSERT INTO users (username, email, account_data, actions, policy_names, created_at, updated_at)
|
|
SELECT $1, email, account_data, actions, policy_names, created_at, CURRENT_TIMESTAMP
|
|
FROM users WHERE username = $2`,
|
|
newName, oldName); err != nil {
|
|
return fmt.Errorf("failed to insert renamed user %s: %w", newName, err)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx,
|
|
"UPDATE credentials SET username = $1 WHERE username = $2", newName, oldName); err != nil {
|
|
return fmt.Errorf("failed to re-point credentials to %s: %w", newName, err)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx,
|
|
"UPDATE user_inline_policies SET username = $1 WHERE username = $2", newName, oldName); err != nil {
|
|
return fmt.Errorf("failed to re-point inline policies to %s: %w", newName, err)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx,
|
|
"DELETE FROM users WHERE username = $1", oldName); err != nil {
|
|
return fmt.Errorf("failed to drop old user %s: %w", oldName, err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
glog.Errorf("credential postgres: RenameUser commit failed %s -> %s: %v", oldName, newName, err)
|
|
return fmt.Errorf("failed to commit rename %s -> %s: %w", oldName, newName, err)
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: RenameUser %s -> %s", oldName, newName)
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
|
|
if !store.configured {
|
|
return nil, fmt.Errorf("store not configured")
|
|
}
|
|
|
|
rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: ListUsers query failed: %v", err)
|
|
return nil, fmt.Errorf("failed to query users: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var usernames []string
|
|
for rows.Next() {
|
|
var username string
|
|
if err := rows.Scan(&username); err != nil {
|
|
return nil, fmt.Errorf("failed to scan username: %w", err)
|
|
}
|
|
usernames = append(usernames, username)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("failed iterating user rows: %w", err)
|
|
}
|
|
|
|
glog.V(1).Infof("credential postgres: ListUsers count=%d", len(usernames))
|
|
return usernames, nil
|
|
}
|
|
|
|
func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
|
|
if !store.configured {
|
|
return nil, fmt.Errorf("store not configured")
|
|
}
|
|
|
|
var username string
|
|
err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
glog.V(2).Infof("credential postgres: GetUserByAccessKey accessKey=%s not found", accessKey)
|
|
return nil, credential.ErrAccessKeyNotFound
|
|
}
|
|
glog.Errorf("credential postgres: GetUserByAccessKey query failed accessKey=%s: %v", accessKey, err)
|
|
return nil, fmt.Errorf("failed to query access key: %w", err)
|
|
}
|
|
|
|
glog.V(2).Infof("credential postgres: GetUserByAccessKey accessKey=%s resolved to user=%s", accessKey, username)
|
|
return store.GetUser(ctx, username)
|
|
}
|
|
|
|
func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
var count int
|
|
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
if count == 0 {
|
|
return credential.ErrUserNotFound
|
|
}
|
|
|
|
_, err = store.db.ExecContext(ctx,
|
|
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
|
|
username, cred.AccessKey, cred.SecretKey)
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: CreateAccessKey failed user=%s accessKey=%s: %v", username, cred.AccessKey, err)
|
|
return fmt.Errorf("failed to insert credential: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: CreateAccessKey user=%s accessKey=%s", username, cred.AccessKey)
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
result, err := store.db.ExecContext(ctx,
|
|
"DELETE FROM credentials WHERE username = $1 AND access_key = $2",
|
|
username, accessKey)
|
|
if err != nil {
|
|
glog.Errorf("credential postgres: DeleteAccessKey failed user=%s accessKey=%s: %v", username, accessKey, err)
|
|
return fmt.Errorf("failed to delete access key: %w", err)
|
|
}
|
|
|
|
rowsAffected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
}
|
|
|
|
if rowsAffected == 0 {
|
|
var count int
|
|
err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
if count == 0 {
|
|
return credential.ErrUserNotFound
|
|
}
|
|
return credential.ErrAccessKeyNotFound
|
|
}
|
|
|
|
glog.V(0).Infof("credential postgres: DeleteAccessKey user=%s accessKey=%s", username, accessKey)
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
identity, err := store.GetUser(ctx, username)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
policy, err := store.GetPolicy(ctx, policyName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if policy == nil {
|
|
return credential.ErrPolicyNotFound
|
|
}
|
|
|
|
for _, p := range identity.PolicyNames {
|
|
if p == policyName {
|
|
return credential.ErrPolicyAlreadyAttached
|
|
}
|
|
}
|
|
|
|
identity.PolicyNames = append(identity.PolicyNames, policyName)
|
|
if err := store.UpdateUser(ctx, username, identity); err != nil {
|
|
return err
|
|
}
|
|
|
|
glog.V(1).Infof("credential postgres: AttachUserPolicy user=%s policy=%s", username, policyName)
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error {
|
|
if !store.configured {
|
|
return fmt.Errorf("store not configured")
|
|
}
|
|
|
|
identity, err := store.GetUser(ctx, username)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
found := false
|
|
var newPolicyNames []string
|
|
for _, p := range identity.PolicyNames {
|
|
if p == policyName {
|
|
found = true
|
|
} else {
|
|
newPolicyNames = append(newPolicyNames, p)
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
return credential.ErrPolicyNotAttached
|
|
}
|
|
|
|
identity.PolicyNames = newPolicyNames
|
|
if err := store.UpdateUser(ctx, username, identity); err != nil {
|
|
return err
|
|
}
|
|
|
|
glog.V(1).Infof("credential postgres: DetachUserPolicy user=%s policy=%s", username, policyName)
|
|
return nil
|
|
}
|
|
|
|
func (store *PostgresStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) {
|
|
if !store.configured {
|
|
return nil, fmt.Errorf("store not configured")
|
|
}
|
|
|
|
identity, err := store.GetUser(ctx, username)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return identity.PolicyNames, nil
|
|
}
|