mirror of
https://github.com/versity/versitygw.git
synced 2026-01-06 11:46:20 +00:00
fix: remove caching in local iam service
The caching will be implements a layer up so that the individual IAM services don't need ot care about caching rules.
This commit is contained in:
@@ -18,11 +18,10 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -31,20 +30,9 @@ const (
|
||||
iamBackupFile = "users.json.backup"
|
||||
)
|
||||
|
||||
var (
|
||||
cacheDuration = 5 * time.Minute
|
||||
)
|
||||
|
||||
// IAMServiceInternal manages the internal IAM service
|
||||
type IAMServiceInternal struct {
|
||||
path string
|
||||
|
||||
mu sync.RWMutex
|
||||
accts iAMConfig
|
||||
serial uint32
|
||||
iamcache []byte
|
||||
iamvalid bool
|
||||
iamexpire time.Time
|
||||
dir string
|
||||
}
|
||||
|
||||
// UpdateAcctFunc accepts the current data and returns the new data to be stored
|
||||
@@ -58,9 +46,9 @@ type iAMConfig struct {
|
||||
var _ IAMService = &IAMServiceInternal{}
|
||||
|
||||
// NewInternal creates a new instance for the Internal IAM service
|
||||
func NewInternal(path string) (*IAMServiceInternal, error) {
|
||||
func NewInternal(dir string) (*IAMServiceInternal, error) {
|
||||
i := &IAMServiceInternal{
|
||||
path: path,
|
||||
dir: dir,
|
||||
}
|
||||
|
||||
err := i.initIAM()
|
||||
@@ -68,29 +56,16 @@ func NewInternal(path string) (*IAMServiceInternal, error) {
|
||||
return nil, fmt.Errorf("init iam: %w", err)
|
||||
}
|
||||
|
||||
err = i.updateCache()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("refresh iam cache: %w", err)
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// CreateAccount creates a new IAM account. Returns an error if the account
|
||||
// already exists.
|
||||
func (s *IAMServiceInternal) CreateAccount(account Account) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.storeIAM(func(data []byte) ([]byte, error) {
|
||||
var conf iAMConfig
|
||||
|
||||
if len(data) > 0 {
|
||||
if err := json.Unmarshal(data, &conf); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse iam: %w", err)
|
||||
}
|
||||
} else {
|
||||
conf = iAMConfig{AccessAccounts: map[string]Account{}}
|
||||
conf, err := parseIAM(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get iam data: %w", err)
|
||||
}
|
||||
|
||||
_, ok := conf.AccessAccounts[account.Access]
|
||||
@@ -103,7 +78,6 @@ func (s *IAMServiceInternal) CreateAccount(account Account) error {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize iam: %w", err)
|
||||
}
|
||||
s.accts = conf
|
||||
|
||||
return b, nil
|
||||
})
|
||||
@@ -112,25 +86,12 @@ func (s *IAMServiceInternal) CreateAccount(account Account) error {
|
||||
// GetUserAccount retrieves account info for the requested user. Returns
|
||||
// ErrNoSuchUser if the account does not exist.
|
||||
func (s *IAMServiceInternal) GetUserAccount(access string) (Account, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
data, err := s.getIAM()
|
||||
conf, err := s.getIAM()
|
||||
if err != nil {
|
||||
return Account{}, fmt.Errorf("get iam data: %w", err)
|
||||
}
|
||||
|
||||
serial := crc32.ChecksumIEEE(data)
|
||||
if serial != s.serial {
|
||||
s.mu.RUnlock()
|
||||
err := s.updateCache()
|
||||
s.mu.RLock()
|
||||
if err != nil {
|
||||
return Account{}, fmt.Errorf("refresh iam cache: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
acct, ok := s.accts.AccessAccounts[access]
|
||||
acct, ok := conf.AccessAccounts[access]
|
||||
if !ok {
|
||||
return Account{}, ErrNoSuchUser
|
||||
}
|
||||
@@ -138,47 +99,13 @@ func (s *IAMServiceInternal) GetUserAccount(access string) (Account, error) {
|
||||
return acct, nil
|
||||
}
|
||||
|
||||
// updateCache must be called with no locks held
|
||||
func (s *IAMServiceInternal) updateCache() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
data, err := s.getIAM()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get iam data: %w", err)
|
||||
}
|
||||
|
||||
serial := crc32.ChecksumIEEE(data)
|
||||
|
||||
if len(data) > 0 {
|
||||
if err := json.Unmarshal(data, &s.accts); err != nil {
|
||||
return fmt.Errorf("failed to parse the config file: %w", err)
|
||||
}
|
||||
} else {
|
||||
s.accts.AccessAccounts = make(map[string]Account)
|
||||
}
|
||||
|
||||
s.serial = serial
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteUserAccount deletes the specified user account. Does not check if
|
||||
// account exists.
|
||||
func (s *IAMServiceInternal) DeleteUserAccount(access string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.storeIAM(func(data []byte) ([]byte, error) {
|
||||
if len(data) == 0 {
|
||||
// empty config, do nothing
|
||||
return data, nil
|
||||
}
|
||||
|
||||
var conf iAMConfig
|
||||
|
||||
if err := json.Unmarshal(data, &conf); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse iam: %w", err)
|
||||
conf, err := parseIAM(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get iam data: %w", err)
|
||||
}
|
||||
|
||||
delete(conf.AccessAccounts, access)
|
||||
@@ -188,37 +115,29 @@ func (s *IAMServiceInternal) DeleteUserAccount(access string) error {
|
||||
return nil, fmt.Errorf("failed to serialize iam: %w", err)
|
||||
}
|
||||
|
||||
s.accts = conf
|
||||
|
||||
return b, nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListUserAccounts lists all the user accounts stored.
|
||||
func (s *IAMServiceInternal) ListUserAccounts() (accs []Account, err error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
data, err := s.getIAM()
|
||||
func (s *IAMServiceInternal) ListUserAccounts() ([]Account, error) {
|
||||
conf, err := s.getIAM()
|
||||
if err != nil {
|
||||
return []Account{}, fmt.Errorf("get iam data: %w", err)
|
||||
}
|
||||
|
||||
serial := crc32.ChecksumIEEE(data)
|
||||
if serial != s.serial {
|
||||
s.mu.RUnlock()
|
||||
err := s.updateCache()
|
||||
s.mu.RLock()
|
||||
if err != nil {
|
||||
return []Account{}, fmt.Errorf("refresh iam cache: %w", err)
|
||||
}
|
||||
keys := make([]string, 0, len(conf.AccessAccounts))
|
||||
for k := range conf.AccessAccounts {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for access, usr := range s.accts.AccessAccounts {
|
||||
var accs []Account
|
||||
for _, k := range keys {
|
||||
accs = append(accs, Account{
|
||||
Access: access,
|
||||
Secret: usr.Secret,
|
||||
Role: usr.Role,
|
||||
Access: k,
|
||||
Secret: conf.AccessAccounts[k].Secret,
|
||||
Role: conf.AccessAccounts[k].Role,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -230,7 +149,7 @@ const (
|
||||
)
|
||||
|
||||
func (s *IAMServiceInternal) initIAM() error {
|
||||
fname := filepath.Join(s.path, iamFile)
|
||||
fname := filepath.Join(s.dir, iamFile)
|
||||
|
||||
_, err := os.ReadFile(fname)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -247,15 +166,22 @@ func (s *IAMServiceInternal) initIAM() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *IAMServiceInternal) getIAM() ([]byte, error) {
|
||||
if !s.iamvalid || !s.iamexpire.After(time.Now()) {
|
||||
err := s.refreshIAM()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func (s *IAMServiceInternal) getIAM() (iAMConfig, error) {
|
||||
b, err := s.readIAMData()
|
||||
if err != nil {
|
||||
return iAMConfig{}, err
|
||||
}
|
||||
|
||||
return s.iamcache, nil
|
||||
return parseIAM(b)
|
||||
}
|
||||
|
||||
func parseIAM(b []byte) (iAMConfig, error) {
|
||||
var conf iAMConfig
|
||||
if err := json.Unmarshal(b, &conf); err != nil {
|
||||
return iAMConfig{}, fmt.Errorf("failed to parse the config file: %w", err)
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -263,7 +189,7 @@ const (
|
||||
maxretry = 300
|
||||
)
|
||||
|
||||
func (s *IAMServiceInternal) refreshIAM() error {
|
||||
func (s *IAMServiceInternal) readIAMData() ([]byte, error) {
|
||||
// We are going to be racing with other running gateways without any
|
||||
// coordination. So we might find the file does not exist at times.
|
||||
// For this case we need to retry for a while assuming the other gateway
|
||||
@@ -273,7 +199,7 @@ func (s *IAMServiceInternal) refreshIAM() error {
|
||||
retries := 0
|
||||
|
||||
for {
|
||||
b, err := os.ReadFile(filepath.Join(s.path, iamFile))
|
||||
b, err := os.ReadFile(filepath.Join(s.dir, iamFile))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
// racing with someone else updating
|
||||
// keep retrying after backoff
|
||||
@@ -282,19 +208,14 @@ func (s *IAMServiceInternal) refreshIAM() error {
|
||||
time.Sleep(backoff)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("read iam file: %w", err)
|
||||
return nil, fmt.Errorf("read iam file: %w", err)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.iamcache = b
|
||||
s.iamvalid = true
|
||||
s.iamexpire = time.Now().Add(cacheDuration)
|
||||
break
|
||||
return b, nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *IAMServiceInternal) storeIAM(update UpdateAcctFunc) error {
|
||||
@@ -314,7 +235,7 @@ func (s *IAMServiceInternal) storeIAM(update UpdateAcctFunc) error {
|
||||
// write the file.
|
||||
|
||||
retries := 0
|
||||
fname := filepath.Join(s.path, iamFile)
|
||||
fname := filepath.Join(s.dir, iamFile)
|
||||
|
||||
for {
|
||||
b, err := os.ReadFile(fname)
|
||||
@@ -361,7 +282,7 @@ func (s *IAMServiceInternal) storeIAM(update UpdateAcctFunc) error {
|
||||
// can go wrong, but the remove should barrier other gateways
|
||||
// from trying to write backup at the same time. Only one
|
||||
// gateway will successfully remove the file.
|
||||
os.WriteFile(filepath.Join(s.path, iamBackupFile), b, iamMode)
|
||||
os.WriteFile(filepath.Join(s.dir, iamBackupFile), b, iamMode)
|
||||
|
||||
b, err = update(b)
|
||||
if err != nil {
|
||||
@@ -377,9 +298,6 @@ func (s *IAMServiceInternal) storeIAM(update UpdateAcctFunc) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.iamcache = b
|
||||
s.iamvalid = true
|
||||
s.iamexpire = time.Now().Add(cacheDuration)
|
||||
break
|
||||
}
|
||||
|
||||
@@ -387,9 +305,9 @@ func (s *IAMServiceInternal) storeIAM(update UpdateAcctFunc) error {
|
||||
}
|
||||
|
||||
func (s *IAMServiceInternal) writeTempFile(b []byte) error {
|
||||
fname := filepath.Join(s.path, iamFile)
|
||||
fname := filepath.Join(s.dir, iamFile)
|
||||
|
||||
f, err := os.CreateTemp(s.path, iamFile)
|
||||
f, err := os.CreateTemp(s.dir, iamFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create temp file: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user