scoutfs: fix inode lock inversions

We lock multiple inodes by order of their inode number.  This fixes
the directory entry paths that hold parent dir and target inode locks.

Link and unlink are easy because they just acquire the existing parent
dir and target inode locks.

Lookup is a little squirrely because we don't want to try and order
the parent dir lock with locks down in iget.  It turns out that it's
safe to drop the dir lock before calling iget as long as iget handles
racing the inode cache instantiation with inode deletion.

Creation is the remaining pattern and it's a little weird because we
want to lock the newly created inode before we create it and the items
that store it.  We add a function that correctly orders the locks,
transaction, and inode cache instantiation.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-08-04 14:53:32 -07:00
parent d2ea247ab9
commit 8735d319a3
3 changed files with 104 additions and 76 deletions

View File

@@ -231,6 +231,17 @@ static struct scoutfs_key_buf *alloc_link_backref_key(struct super_block *sb,
return key;
}
/*
* Because of rename, locks are ordered by inode number. To hold the
* dir lock while calling iget, we might have to already hold a lesser
* inode's lock while telling iget whether or not to lock. Instead of
* adding all those moving pieces we drop the dir lock before calling
* iget. We don't reuse inode numbers so we don't have to worry about
* the target of the link changing. We will only follow the entry as it
* existed before or after whatever modification is happening under the
* dir lock and that can already legally race before or after our
* lookup.
*/
static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry,
unsigned int flags)
{
@@ -267,6 +278,7 @@ static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry,
ret = scoutfs_item_lookup_exact(sb, key, val, sizeof(dent),
dir_lock->end);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_PR);
if (ret == -ENOENT) {
ino = 0;
ret = 0;
@@ -282,8 +294,6 @@ out:
else
inode = scoutfs_iget(sb, ino);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_PR);
scoutfs_key_free(sb, key);
return d_splice_alias(inode, dentry);
@@ -516,13 +526,82 @@ out:
return ret;
}
/*
* Inode creation needs to hold dir and inode locks which can be greater
* or less than each other. It seems easiest to keep the dual locking
* here like it is for all the other dual locking of established inodes.
* Except we don't have the inode struct yet when we're getting locks,
* so we roll our own comparion between the two instead of pushing
* complexity down the locking paths that acquire existing inodes in
* order.
*/
static struct inode *lock_hold_create(struct inode *dir, struct dentry *dentry,
umode_t mode, dev_t rdev,
struct scoutfs_item_count *cnt,
struct scoutfs_lock **dir_lock,
struct scoutfs_lock **inode_lock)
{
struct super_block *sb = dir->i_sb;
struct inode *inode;
int ret = 0;
u64 ino;
ret = alloc_dentry_info(dentry);
if (ret)
return ERR_PTR(ret);
ret = scoutfs_alloc_ino(sb, &ino);
if (ret)
return ERR_PTR(ret);
if (ino < scoutfs_ino(dir)) {
ret = scoutfs_lock_ino(sb, DLM_LOCK_EX, 0, ino, inode_lock) ?:
scoutfs_lock_inode(sb, DLM_LOCK_EX,
SCOUTFS_LKF_REFRESH_INODE, dir,
dir_lock);
} else {
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX,
SCOUTFS_LKF_REFRESH_INODE, dir,
dir_lock) ?:
scoutfs_lock_ino(sb, DLM_LOCK_EX, 0, ino, inode_lock);
}
if (ret)
goto out_unlock;
ret = scoutfs_hold_trans(sb, cnt);
if (ret)
goto out_unlock;
inode = scoutfs_new_inode(sb, dir, mode, rdev, ino, *inode_lock);
if (IS_ERR(inode)) {
ret = PTR_ERR(inode);
goto out;
}
ret = scoutfs_dirty_inode_item(dir, (*dir_lock)->end);
out:
if (ret)
scoutfs_release_trans(sb);
out_unlock:
if (ret) {
scoutfs_unlock(sb, *dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, *inode_lock, DLM_LOCK_EX);
*dir_lock = NULL;
*inode_lock = NULL;
inode = ERR_PTR(ret);
}
return inode;
}
static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
dev_t rdev)
{
struct super_block *sb = dir->i_sb;
DECLARE_ITEM_COUNT(cnt);
struct inode *inode = NULL;
struct scoutfs_lock *dir_lock;
struct scoutfs_lock *dir_lock = NULL;
struct scoutfs_lock *inode_lock = NULL;
u64 pos;
int ret;
@@ -530,34 +609,12 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
if (dentry->d_name.len > SCOUTFS_NAME_LEN)
return -ENAMETOOLONG;
ret = alloc_dentry_info(dentry);
if (ret)
return ret;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock);
if (ret)
return ret;
scoutfs_count_mknod(&cnt, dentry->d_name.len);
ret = scoutfs_hold_trans(sb, &cnt);
if (ret)
goto out_unlock;
ret = scoutfs_dirty_inode_item(dir, dir_lock->end);
if (ret)
goto out;
inode = scoutfs_new_inode(sb, dir, mode, rdev, dir_lock);
if (IS_ERR(inode)) {
ret = PTR_ERR(inode);
goto out;
}
/* Now that we have ino from scoutfs_new_inode, allocate a lock */
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, 0, inode, &inode_lock);
if (ret)
goto out;
inode = lock_hold_create(dir, dentry, mode, rdev, &cnt,
&dir_lock, &inode_lock);
if (IS_ERR(inode))
return PTR_ERR(inode);
pos = SCOUTFS_I(dir)->next_readdir_pos++;
@@ -585,9 +642,9 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
d_instantiate(dentry, inode);
out:
scoutfs_release_trans(sb);
out_unlock:
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
/* XXX delete the inode item here */
if (ret && !IS_ERR_OR_NULL(inode))
iput(inode);
@@ -620,16 +677,12 @@ static int scoutfs_link(struct dentry *old_dentry,
if (dentry->d_name.len > SCOUTFS_NAME_LEN)
return -ENAMETOOLONG;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock);
ret = scoutfs_lock_inodes(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock, inode, &inode_lock,
NULL, NULL, NULL, NULL);
if (ret)
return ret;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
inode, &inode_lock);
if (ret)
goto out_unlock;
if (inode->i_nlink >= SCOUTFS_LINK_MAX) {
ret = -EMLINK;
goto out_unlock;
@@ -700,16 +753,12 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
DECLARE_ITEM_COUNT(cnt);
int ret = 0;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock);
ret = scoutfs_lock_inodes(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock, inode, &inode_lock,
NULL, NULL, NULL, NULL);
if (ret)
return ret;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
inode, &inode_lock);
if (ret)
goto unlock;
if (S_ISDIR(inode->i_mode) && i_size_read(inode)) {
ret = -ENOTEMPTY;
goto unlock;
@@ -915,7 +964,7 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
struct super_block *sb = dir->i_sb;
const int name_len = strlen(symname) + 1;
struct inode *inode = NULL;
struct scoutfs_lock *dir_lock;
struct scoutfs_lock *dir_lock = NULL;
struct scoutfs_lock *inode_lock = NULL;
DECLARE_ITEM_COUNT(cnt);
u64 pos;
@@ -930,29 +979,11 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
if (ret)
return ret;
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
dir, &dir_lock);
if (ret)
return ret;
scoutfs_count_symlink(&cnt, dentry->d_name.len, name_len);
ret = scoutfs_hold_trans(sb, &cnt);
if (ret)
goto out_unlock;
ret = scoutfs_dirty_inode_item(dir, dir_lock->end);
if (ret)
goto out;
inode = scoutfs_new_inode(sb, dir, S_IFLNK|S_IRWXUGO, 0, dir_lock);
if (IS_ERR(inode)) {
ret = PTR_ERR(inode);
goto out;
}
ret = scoutfs_lock_inode(sb, DLM_LOCK_EX, 0, inode, &inode_lock);
if (ret)
goto out;
inode = lock_hold_create(dir, dentry, S_IFLNK|S_IRWXUGO, 0, &cnt,
&dir_lock, &inode_lock);
if (IS_ERR(inode))
return PTR_ERR(inode);
ret = symlink_item_ops(sb, SYM_CREATE, scoutfs_ino(inode), inode_lock,
symname, name_len);
@@ -983,6 +1014,7 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
d_instantiate(dentry, inode);
out:
if (ret < 0) {
/* XXX remove inode items */
if (!IS_ERR_OR_NULL(inode))
iput(inode);
@@ -991,9 +1023,9 @@ out:
}
scoutfs_release_trans(sb);
out_unlock:
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
return ret;
}

View File

@@ -745,7 +745,7 @@ static bool pool_in_flight(struct free_ino_pool *pool)
* net layer calls us when it gets a reply. If there's no more inodes
* we'll get ino == ~0 and nr == 0.
*/
static int alloc_ino(struct super_block *sb, u64 *ino)
int scoutfs_alloc_ino(struct super_block *sb, u64 *ino)
{
struct free_ino_pool *pool = &SCOUTFS_SB(sb)->inode_sb_info->pool;
bool request;
@@ -807,7 +807,7 @@ out:
* creating links to it and updating it. @dir can be null.
*/
struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
umode_t mode, dev_t rdev,
umode_t mode, dev_t rdev, u64 ino,
struct scoutfs_lock *lock)
{
struct scoutfs_inode_info *ci;
@@ -816,13 +816,8 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
struct scoutfs_inode sinode;
SCOUTFS_DECLARE_KVEC(val);
struct inode *inode;
u64 ino;
int ret;
ret = alloc_ino(sb, &ino);
if (ret)
return ERR_PTR(ret);
inode = new_inode(sb);
if (!inode)
return ERR_PTR(-ENOMEM);

View File

@@ -64,8 +64,9 @@ struct inode *scoutfs_ilookup(struct super_block *sb, u64 ino);
int scoutfs_dirty_inode_item(struct inode *inode, struct scoutfs_key_buf *end);
void scoutfs_update_inode_item(struct inode *inode);
void scoutfs_inode_fill_pool(struct super_block *sb, u64 ino, u64 nr);
int scoutfs_alloc_ino(struct super_block *sb, u64 *ino);
struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
umode_t mode, dev_t rdev,
umode_t mode, dev_t rdev, u64 ino,
struct scoutfs_lock *lock);
void scoutfs_inode_set_meta_seq(struct inode *inode);
void scoutfs_inode_set_data_seq(struct inode *inode);