scoutfs: add lock coverage for inode index items

Add lock coverage for inode index items.

Sadly, this isn't trivial. We have to predict the value of the indexed
fields before the operation to lock those items.  One value in
particular we can't reliably predict: the sequence of the transaction we
enter after locking.  Also operations can create an absolute ton of
index item updates -- rename can modify nr_inodes * items_per_inode * 2
items, so maybe 24 today.  And these items can be arbitrarily positioned
in the key space.

So to handle all this we add functions to gather predicted item values
we'll need to lock sort and lock them all, then pass appropriate locks
down to the item functions during inode updates.

The trickiest bit of the index locking code is having to retry if the
sequence number changes.  Preparing locks has to guess the sequence
number of its upcoming trans and then makes item update decisions based
on that.  If we enter and have a different sequence number then we need
to back off and retry with the correct sequence number (we may find that
we'll need to update the indexed meta seq and need to have it locked).

The use of the functions is straight forward.  Sites figure out the
predicted sizes, lock, pass the locks to inode updates, and unlock.

While we're at it we replace the individual item field tracking
variables in the inode info with an array of indexed values.  The code
ends up a bit nicer.  It also gets rid of the indexed time fields that
were left behind and were unused.

It's worth noting that we're getting exclusive locks on the index
updates.  Locking the meta/data seq updates results in complete global
serialization of all changes.  We'll need concurrent writer locks to get
concurrency back.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-09-29 10:28:50 -07:00
committed by Mark Fasheh
parent 960bc4d53c
commit 950436461a
9 changed files with 702 additions and 159 deletions

View File

@@ -1133,6 +1133,12 @@ static int scoutfs_writepages(struct address_space *mapping,
return mpage_writepages(mapping, wbc, scoutfs_get_block);
}
/* fsdata allocated in write_begin and freed in write_end */
struct write_begin_data {
struct list_head ind_locks;
struct scoutfs_lock *lock;
};
static int scoutfs_write_begin(struct file *file,
struct address_space *mapping, loff_t pos,
unsigned len, unsigned flags,
@@ -1141,30 +1147,60 @@ static int scoutfs_write_begin(struct file *file,
struct inode *inode = mapping->host;
struct scoutfs_inode_info *si = SCOUTFS_I(inode);
struct super_block *sb = inode->i_sb;
struct scoutfs_lock *lock;
struct write_begin_data *wbd;
u64 new_size;
u64 ind_seq;
int ret;
trace_scoutfs_write_begin(sb, scoutfs_ino(inode), (__u64)pos, len);
lock = scoutfs_per_task_get(&si->pt_data_lock);
if (WARN_ON_ONCE(!lock))
return -EINVAL;
wbd = kmalloc(sizeof(struct write_begin_data), GFP_NOFS);
if (!wbd)
return -ENOMEM;
ret = scoutfs_hold_trans(sb, SIC_WRITE_BEGIN());
if (ret)
INIT_LIST_HEAD(&wbd->ind_locks);
*fsdata = wbd;
wbd->lock = scoutfs_per_task_get(&si->pt_data_lock);
if (WARN_ON_ONCE(!wbd->lock)) {
ret = -EINVAL;
goto out;
}
/*
* Lock a size update item assuming we perform the full write.
* If If the write is inside i_size then we don't lock and
* nothing will be updated. Lock granularity is larger than
* pages so any size update in this call will be covered by the
* lock. If there's an error and we don't change i_size then
* the item update won't happen and the lock will be unused.
*/
new_size = max(pos + len, i_size_read(inode));
do {
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &wbd->ind_locks, inode,
new_size, true) ?:
scoutfs_inode_index_lock_hold(sb, &wbd->ind_locks,
ind_seq, SIC_WRITE_BEGIN());
} while (ret > 0);
if (ret < 0)
goto out;
/* can't re-enter fs, have trans */
flags |= AOP_FLAG_NOFS;
/* generic write_end updates i_size and calls dirty_inode */
ret = scoutfs_dirty_inode_item(inode, lock);
ret = scoutfs_dirty_inode_item(inode, wbd->lock);
if (ret == 0)
ret = block_write_begin(mapping, pos, len, flags, pagep,
scoutfs_get_block);
if (ret)
scoutfs_release_trans(sb);
out:
if (ret) {
scoutfs_inode_index_unlock(sb, &wbd->ind_locks);
kfree(wbd);
}
return ret;
}
@@ -1175,26 +1211,25 @@ static int scoutfs_write_end(struct file *file, struct address_space *mapping,
struct inode *inode = mapping->host;
struct scoutfs_inode_info *si = SCOUTFS_I(inode);
struct super_block *sb = inode->i_sb;
struct scoutfs_lock *lock;
struct write_begin_data *wbd = fsdata;
int ret;
trace_scoutfs_write_end(sb, scoutfs_ino(inode), page->index, (u64)pos,
len, copied);
/* always call write_end, update_inode will bark if there's no lock */
lock = scoutfs_per_task_get(&si->pt_data_lock);
ret = generic_write_end(file, mapping, pos, len, copied, page, fsdata);
if (ret > 0) {
if (!si->staging) {
scoutfs_inode_set_data_seq(inode);
scoutfs_inode_inc_data_version(inode);
}
/* XXX kind of a big hammer, inode life cycle needs work */
scoutfs_update_inode_item(inode, lock);
scoutfs_update_inode_item(inode, wbd->lock, &wbd->ind_locks);
scoutfs_inode_queue_writeback(inode);
}
scoutfs_release_trans(sb);
scoutfs_inode_index_unlock(sb, &wbd->ind_locks);
kfree(wbd);
return ret;
}

View File

@@ -538,11 +538,14 @@ out:
static struct inode *lock_hold_create(struct inode *dir, struct dentry *dentry,
umode_t mode, dev_t rdev,
const struct scoutfs_item_count cnt,
u64 dir_size, u64 inode_size,
struct scoutfs_lock **dir_lock,
struct scoutfs_lock **inode_lock)
struct scoutfs_lock **inode_lock,
struct list_head *ind_locks)
{
struct super_block *sb = dir->i_sb;
struct inode *inode;
u64 ind_seq;
int ret = 0;
u64 ino;
@@ -568,7 +571,14 @@ static struct inode *lock_hold_create(struct inode *dir, struct dentry *dentry,
if (ret)
goto out_unlock;
ret = scoutfs_hold_trans(sb, cnt);
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, ind_locks, dir, dir_size, true) ?:
scoutfs_inode_index_prepare_ino(sb, ind_locks, ino, mode,
inode_size) ?:
scoutfs_inode_index_lock_hold(sb, ind_locks, ind_seq, cnt);
if (ret > 0)
goto retry;
if (ret)
goto out_unlock;
@@ -584,6 +594,7 @@ out:
scoutfs_release_trans(sb);
out_unlock:
if (ret) {
scoutfs_inode_index_unlock(sb, ind_locks);
scoutfs_unlock(sb, *dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, *inode_lock, DLM_LOCK_EX);
*dir_lock = NULL;
@@ -602,16 +613,18 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
struct inode *inode = NULL;
struct scoutfs_lock *dir_lock = NULL;
struct scoutfs_lock *inode_lock = NULL;
LIST_HEAD(ind_locks);
u64 dir_size;
u64 pos;
int ret;
if (dentry->d_name.len > SCOUTFS_NAME_LEN)
return -ENAMETOOLONG;
dir_size = i_size_read(dir) + dentry->d_name.len;
inode = lock_hold_create(dir, dentry, mode, rdev,
SIC_MKNOD(dentry->d_name.len),
&dir_lock, &inode_lock);
SIC_MKNOD(dentry->d_name.len), dir_size, 0,
&dir_lock, &inode_lock, &ind_locks);
if (IS_ERR(inode))
return PTR_ERR(inode);
@@ -625,7 +638,7 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
update_dentry_info(dentry, pos);
i_size_write(dir, i_size_read(dir) + dentry->d_name.len);
i_size_write(dir, dir_size);
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
inode->i_mtime = inode->i_atime = inode->i_ctime = dir->i_mtime;
@@ -634,13 +647,15 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
inc_nlink(dir);
}
scoutfs_update_inode_item(inode, inode_lock);
scoutfs_update_inode_item(dir, dir_lock);
scoutfs_update_inode_item(inode, inode_lock, &ind_locks);
scoutfs_update_inode_item(dir, dir_lock, &ind_locks);
scoutfs_inode_index_unlock(sb, &ind_locks);
insert_inode_hash(inode);
d_instantiate(dentry, inode);
out:
scoutfs_release_trans(sb);
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
@@ -669,6 +684,9 @@ static int scoutfs_link(struct dentry *old_dentry,
struct super_block *sb = dir->i_sb;
struct scoutfs_lock *dir_lock;
struct scoutfs_lock *inode_lock = NULL;
LIST_HEAD(ind_locks);
u64 dir_size;
u64 ind_seq;
u64 pos;
int ret;
@@ -690,7 +708,17 @@ static int scoutfs_link(struct dentry *old_dentry,
if (ret)
goto out_unlock;
ret = scoutfs_hold_trans(sb, SIC_LINK(dentry->d_name.len));
dir_size = i_size_read(dir) + dentry->d_name.len;
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, dir,
dir_size, false) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, inode,
i_size_read(inode), false) ?:
scoutfs_inode_index_lock_hold(sb, &ind_locks, ind_seq,
SIC_LINK(dentry->d_name.len));
if (ret > 0)
goto retry;
if (ret)
goto out_unlock;
@@ -707,19 +735,20 @@ static int scoutfs_link(struct dentry *old_dentry,
goto out;
update_dentry_info(dentry, pos);
i_size_write(dir, i_size_read(dir) + dentry->d_name.len);
i_size_write(dir, dir_size);
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
inode->i_ctime = dir->i_mtime;
inc_nlink(inode);
scoutfs_update_inode_item(inode, inode_lock);
scoutfs_update_inode_item(dir, dir_lock);
scoutfs_update_inode_item(inode, inode_lock, &ind_locks);
scoutfs_update_inode_item(dir, dir_lock, &ind_locks);
atomic_inc(&inode->i_count);
d_instantiate(dentry, inode);
out:
scoutfs_release_trans(sb);
out_unlock:
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
return ret;
@@ -747,6 +776,9 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
struct timespec ts = current_kernel_time();
struct scoutfs_lock *inode_lock = NULL;
struct scoutfs_lock *dir_lock = NULL;
LIST_HEAD(ind_locks);
u64 dir_size;
u64 ind_seq;
int ret = 0;
ret = scoutfs_lock_inodes(sb, DLM_LOCK_EX, SCOUTFS_LKF_REFRESH_INODE,
@@ -760,7 +792,17 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
goto unlock;
}
ret = scoutfs_hold_trans(sb, SIC_UNLINK(dentry->d_name.len));
dir_size = i_size_read(dir) - dentry->d_name.len;
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, dir,
dir_size, false) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, inode,
i_size_read(inode), false) ?:
scoutfs_inode_index_lock_hold(sb, &ind_locks, ind_seq,
SIC_UNLINK(dentry->d_name.len));
if (ret > 0)
goto retry;
if (ret)
goto unlock;
@@ -784,7 +826,7 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
dir->i_ctime = ts;
dir->i_mtime = ts;
i_size_write(dir, i_size_read(dir) - dentry->d_name.len);
i_size_write(dir, dir_size);
inode->i_ctime = ts;
drop_nlink(inode);
@@ -792,12 +834,13 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
drop_nlink(dir);
drop_nlink(inode);
}
scoutfs_update_inode_item(inode, inode_lock);
scoutfs_update_inode_item(dir, dir_lock);
scoutfs_update_inode_item(inode, inode_lock, &ind_locks);
scoutfs_update_inode_item(dir, dir_lock, &ind_locks);
out:
scoutfs_release_trans(sb);
unlock:
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
@@ -961,6 +1004,8 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
struct inode *inode = NULL;
struct scoutfs_lock *dir_lock = NULL;
struct scoutfs_lock *inode_lock = NULL;
LIST_HEAD(ind_locks);
u64 dir_size;
u64 pos;
int ret;
@@ -973,9 +1018,11 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
if (ret)
return ret;
dir_size = i_size_read(dir) + dentry->d_name.len;
inode = lock_hold_create(dir, dentry, S_IFLNK|S_IRWXUGO, 0,
SIC_SYMLINK(dentry->d_name.len, name_len),
&dir_lock, &inode_lock);
dir_size, name_len,
&dir_lock, &inode_lock, &ind_locks);
if (IS_ERR(inode))
return PTR_ERR(inode);
@@ -994,14 +1041,14 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry,
update_dentry_info(dentry, pos);
i_size_write(dir, i_size_read(dir) + dentry->d_name.len);
i_size_write(dir, dir_size);
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
inode->i_ctime = dir->i_mtime;
i_size_write(inode, name_len);
scoutfs_update_inode_item(inode, inode_lock);
scoutfs_update_inode_item(dir, dir_lock);
scoutfs_update_inode_item(inode, inode_lock, &ind_locks);
scoutfs_update_inode_item(dir, dir_lock, &ind_locks);
insert_inode_hash(inode);
/* XXX need to set i_op/fop before here for sec callbacks */
@@ -1017,6 +1064,7 @@ out:
}
scoutfs_release_trans(sb);
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, dir_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, inode_lock, DLM_LOCK_EX);
@@ -1339,6 +1387,10 @@ static int scoutfs_rename(struct inode *old_dir, struct dentry *old_dentry,
bool ins_new = false;
bool del_new = false;
bool ins_old = false;
LIST_HEAD(ind_locks);
u64 old_size;
u64 uninitialized_var(new_size);
u64 ind_seq;
u64 new_pos;
int ret;
int err;
@@ -1388,8 +1440,35 @@ static int scoutfs_rename(struct inode *old_dir, struct dentry *old_dentry,
if (ret)
goto out_unlock;
ret = scoutfs_hold_trans(sb, SIC_RENAME(old_dentry->d_name.len,
new_dentry->d_name.len));
old_size = i_size_read(old_dir) - old_dentry->d_name.len;
if (!new_inode) {
if (old_dir != new_dir)
new_size = i_size_read(new_dir) +
new_dentry->d_name.len;
else
old_size += new_dentry->d_name.len;
} else {
if (old_dir != new_dir)
new_size = i_size_read(new_dir);
}
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, old_dir,
old_size, false) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, old_inode,
i_size_read(old_inode), false) ?:
(new_dir == old_dir ? 0 :
scoutfs_inode_index_prepare(sb, &ind_locks, new_dir,
new_size, false)) ?:
(new_inode == NULL ? 0 :
scoutfs_inode_index_prepare(sb, &ind_locks, new_inode,
i_size_read(new_inode), false)) ?:
scoutfs_inode_index_lock_hold(sb, &ind_locks, ind_seq,
SIC_RENAME(old_dentry->d_name.len,
new_dentry->d_name.len));
if (ret > 0)
goto retry;
if (ret)
goto out_unlock;
@@ -1450,10 +1529,9 @@ static int scoutfs_rename(struct inode *old_dir, struct dentry *old_dentry,
/* the caller will use d_move to move the old_dentry into place */
update_dentry_info(old_dentry, new_pos);
i_size_write(old_dir, i_size_read(old_dir) - old_dentry->d_name.len);
if (!new_inode)
i_size_write(new_dir, i_size_read(new_dir) +
new_dentry->d_name.len);
i_size_write(old_dir, old_size);
if (old_dir != new_dir)
i_size_write(new_dir, new_size);
if (new_inode) {
drop_nlink(new_inode);
@@ -1477,12 +1555,13 @@ static int scoutfs_rename(struct inode *old_dir, struct dentry *old_dentry,
if (new_inode)
old_inode->i_ctime = now;
scoutfs_update_inode_item(old_dir, old_dir_lock);
scoutfs_update_inode_item(old_inode, old_inode_lock);
scoutfs_update_inode_item(old_dir, old_dir_lock, &ind_locks);
scoutfs_update_inode_item(old_inode, old_inode_lock, &ind_locks);
if (new_dir != old_dir)
scoutfs_update_inode_item(new_dir, new_dir_lock);
scoutfs_update_inode_item(new_dir, new_dir_lock, &ind_locks);
if (new_inode)
scoutfs_update_inode_item(new_inode, new_inode_lock);
scoutfs_update_inode_item(new_inode, new_inode_lock,
&ind_locks);
ret = 0;
out:
@@ -1532,6 +1611,7 @@ out:
scoutfs_release_trans(sb);
out_unlock:
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, old_inode_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, new_inode_lock, DLM_LOCK_EX);
scoutfs_unlock(sb, old_dir_lock, DLM_LOCK_EX);

View File

@@ -239,12 +239,10 @@ struct scoutfs_segment_block {
#define SCOUTFS_MAX_ZONE 4 /* power of 2 is efficient */
/* inode index zone */
#define SCOUTFS_INODE_INDEX_SIZE_TYPE 3
#define SCOUTFS_INODE_INDEX_META_SEQ_TYPE 4
#define SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE 5
#define SCOUTFS_INODE_INDEX_NR \
(SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE - SCOUTFS_INODE_INDEX_SIZE_TYPE + 1)
#define SCOUTFS_INODE_INDEX_SIZE_TYPE 1
#define SCOUTFS_INODE_INDEX_META_SEQ_TYPE 2
#define SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE 3
#define SCOUTFS_INODE_INDEX_NR 4 /* don't forget to update */
/* node zone */
#define SCOUTFS_FREE_BITS_SEGNO_TYPE 1
@@ -549,6 +547,8 @@ struct scoutfs_lock_name {
#define SCOUTFS_LOCK_INODE_GROUP_MASK (SCOUTFS_LOCK_INODE_GROUP_NR - 1)
#define SCOUTFS_LOCK_INODE_GROUP_OFFSET (~0ULL)
#define SCOUTFS_LOCK_SEQ_GROUP_MASK ((1ULL << 10) - 1)
/*
* messages over the wire.
*/

View File

@@ -18,6 +18,7 @@
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/sched.h>
#include <linux/list_sort.h>
#include "format.h"
#include "super.h"
@@ -32,6 +33,7 @@
#include "kvec.h"
#include "item.h"
#include "client.h"
#include "cmp.h"
/*
* XXX
@@ -39,6 +41,12 @@
* - use inode item value lengths for forward/back compat
*/
/*
* XXX before committing:
* - describe all this better
* - describe data locking size problems
*/
struct free_ino_pool {
wait_queue_head_t waitq;
spinlock_t lock;
@@ -201,14 +209,16 @@ static void set_item_info(struct scoutfs_inode_info *si,
{
BUG_ON(!mutex_is_locked(&si->item_mutex));
memset(si->item_majors, 0, sizeof(si->item_majors));
memset(si->item_minors, 0, sizeof(si->item_minors));
si->have_item = true;
si->item_size = le64_to_cpu(sinode->size);
si->item_ctime.tv_sec = le64_to_cpu(sinode->ctime.sec);
si->item_ctime.tv_nsec = le32_to_cpu(sinode->ctime.nsec);
si->item_mtime.tv_sec = le64_to_cpu(sinode->mtime.sec);
si->item_mtime.tv_nsec = le32_to_cpu(sinode->mtime.nsec);
si->item_meta_seq = le64_to_cpu(sinode->meta_seq);
si->item_data_seq = le64_to_cpu(sinode->data_seq);
si->item_majors[SCOUTFS_INODE_INDEX_SIZE_TYPE] =
le64_to_cpu(sinode->size);
si->item_majors[SCOUTFS_INODE_INDEX_META_SEQ_TYPE] =
le64_to_cpu(sinode->meta_seq);
si->item_majors[SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE] =
le64_to_cpu(sinode->data_seq);
}
static void load_inode(struct inode *inode, struct scoutfs_inode *cinode)
@@ -517,87 +527,190 @@ int scoutfs_dirty_inode_item(struct inode *inode, struct scoutfs_lock *lock)
return ret;
}
struct index_lock {
struct list_head head;
struct scoutfs_lock *lock;
u8 type;
u64 major;
u32 minor;
u64 ino;
};
static bool will_del_index(struct scoutfs_inode_info *si,
u8 type, u64 major, u32 minor)
{
return si && si->have_item &&
(si->item_majors[type] != major ||
si->item_minors[type] != minor);
}
static bool will_ins_index(struct scoutfs_inode_info *si,
u8 type, u64 major, u32 minor)
{
return !si || !si->have_item ||
(si->item_majors[type] != major ||
si->item_minors[type] != minor);
}
static bool inode_has_index(umode_t mode, u8 type)
{
switch(type) {
case SCOUTFS_INODE_INDEX_SIZE_TYPE:
case SCOUTFS_INODE_INDEX_META_SEQ_TYPE:
return true;
case SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE:
return S_ISREG(mode);
default:
return WARN_ON_ONCE(false);
}
}
static int cmp_index_lock(void *priv, struct list_head *A, struct list_head *B)
{
struct index_lock *a = list_entry(A, struct index_lock, head);
struct index_lock *b = list_entry(B, struct index_lock, head);
return ((int)a->type - (int)b->type) ?:
scoutfs_cmp_u64s(a->major, b->major) ?:
scoutfs_cmp_u64s(a->minor, b->minor) ?:
scoutfs_cmp_u64s(a->ino, b->ino);
}
/*
* Make sure inode index items are kept in sync with the fields that are
* set in the inode items. This must be called any time the contents of
* the inode items are updated.
*
* This is effectively a RMW on the inode fields so the caller needs to
* lock the inode so that it's the only one working with the index items
* for a given set of fields in the inode.
*
* But it doesn't need to lock the index item keys. By locking the
* inode we've ensured that we can safely log deletion and insertion
* items in our log. The indexes are eventually consistent so we don't
* need to wrap them locks.
*
* XXX this needs more supporting work from the rest of the
* infrastructure:
*
* - Deleting and creating the items needs to forcefully set those dirty
* items in the cache without first trying to read them from segments.
* - the reading ioctl needs to forcefully invalidate the index items
* as it walks.
* - maybe the reading ioctl needs to verify fields with inodes?
* - final inode deletion needs to invalidate the index items for
* each inode as it deletes items based on the locked inode fields.
* - make sure deletion items safely vanish w/o finding existing item
* - ... error handling :(
* Find the lock that covers the given index item. Returns NULL if
* there isn't a lock that covers the item. We know that the list is
* sorted at this point so we can stop once our search value is less
* than a list entry.
*/
static int update_index(struct super_block *sb, struct scoutfs_inode_info *si,
u64 ino, u8 type, u64 now_major, u32 now_minor,
u64 then_major, u32 then_minor)
static struct scoutfs_lock *find_index_lock(struct list_head *lock_list,
u8 type, u64 major, u32 minor,
u64 ino)
{
struct index_lock *ind_lock;
struct index_lock needle;
int cmp;
scoutfs_lock_clamp_inode_index(type, &major, &minor, &ino);
needle.type = type;
needle.major = major;
needle.minor = minor;
needle.ino = ino;
list_for_each_entry(ind_lock, lock_list, head) {
cmp = cmp_index_lock(NULL, &needle.head, &ind_lock->head);
if (cmp == 0)
return ind_lock->lock;
if (cmp < 0)
break;
}
return NULL;
}
/*
* The inode info reflects the current inode index items. Create or delete
* index items to bring the index in line with the caller's item. The list
* should contain locks that cover any item modifications that are made.
*/
static int update_index_items(struct super_block *sb,
struct scoutfs_inode_info *si, u64 ino, u8 type,
u64 major, u32 minor,
struct list_head *lock_list)
{
struct scoutfs_inode_index_key ins_ikey;
struct scoutfs_inode_index_key del_ikey;
struct scoutfs_lock *ins_lock;
struct scoutfs_lock *del_lock;
struct scoutfs_key_buf ins;
struct scoutfs_key_buf del;
int ret;
int err;
trace_scoutfs_inode_update_index(sb, ino, si->have_item, now_major,
now_minor, then_major, then_minor);
if (si->have_item && now_major == then_major && now_minor == then_minor)
if (!will_ins_index(si, type, major, minor))
return 0;
trace_scoutfs_create_index_item(sb, type, major, minor, ino);
ins_ikey.zone = SCOUTFS_INODE_INDEX_ZONE;
ins_ikey.type = type;
ins_ikey.major = cpu_to_be64(now_major);
ins_ikey.minor = cpu_to_be32(now_minor);
ins_ikey.major = cpu_to_be64(major);
ins_ikey.minor = cpu_to_be32(minor);
ins_ikey.ino = cpu_to_be64(ino);
scoutfs_key_init(&ins, &ins_ikey, sizeof(ins_ikey));
ins_lock = find_index_lock(lock_list, type, major, minor, ino);
ret = scoutfs_item_create(sb, &ins, NULL);
if (ret || !si->have_item)
if (ret || !will_del_index(si, type, major, minor))
return ret;
trace_scoutfs_delete_index_item(sb, type, major, minor, ino);
del_ikey.zone = SCOUTFS_INODE_INDEX_ZONE;
del_ikey.type = type;
del_ikey.major = cpu_to_be64(then_major);
del_ikey.minor = cpu_to_be32(then_minor);
del_ikey.major = cpu_to_be64(si->item_majors[type]);
del_ikey.minor = cpu_to_be32(si->item_minors[type]);
del_ikey.ino = cpu_to_be64(ino);
scoutfs_key_init(&del, &del_ikey, sizeof(del_ikey));
ret = scoutfs_item_delete(sb, &del, NULL);
del_lock = find_index_lock(lock_list, type, si->item_majors[type],
si->item_minors[type], ino);
ret = scoutfs_item_delete(sb, &del, del_lock->end);
if (ret) {
err = scoutfs_item_delete(sb, &ins, NULL);
err = scoutfs_item_delete(sb, &ins, ins_lock->end);
BUG_ON(err);
}
return ret;
}
static int update_indices(struct super_block *sb,
struct scoutfs_inode_info *si, u64 ino, umode_t mode,
struct scoutfs_inode *sinode,
struct list_head *lock_list)
{
struct index_update {
u8 type;
u64 major;
u32 minor;
} *upd, upds[] = {
{ SCOUTFS_INODE_INDEX_SIZE_TYPE,
le64_to_cpu(sinode->size), 0 },
{ SCOUTFS_INODE_INDEX_META_SEQ_TYPE,
le64_to_cpu(sinode->meta_seq), 0 },
{ SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE,
le64_to_cpu(sinode->data_seq), 0 },
};
int ret;
int i;
for (i = 0, upd = upds; i < ARRAY_SIZE(upds); i++, upd++) {
if (!inode_has_index(mode, upd->type))
continue;
ret = update_index_items(sb, si, ino, upd->type, upd->major,
upd->minor, lock_list);
if (ret)
break;
}
return ret;
}
/*
* Every time we modify the inode in memory we copy it to its inode
* item. This lets us write out items without having to track down
* dirty vfs inodes.
*
* The caller makes sure that the item is dirty and pinned so they don't
* have to deal with errors and unwinding after they've modified the
* vfs inode and get here.
* have to deal with errors and unwinding after they've modified the vfs
* inode and get here.
*
* Index items that track inode fields are updated here as we update the
* inode item. The caller must have acquired locks on all the index
* items that might change.
*/
void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock)
void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock,
struct list_head *lock_list)
{
struct scoutfs_inode_info *si = SCOUTFS_I(inode);
struct super_block *sb = inode->i_sb;
@@ -617,16 +730,7 @@ void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock)
/* only race with other inode field stores once */
store_inode(&sinode, inode);
ret = update_index(sb, si, ino, SCOUTFS_INODE_INDEX_SIZE_TYPE,
le64_to_cpu(sinode.size), 0, si->item_size, 0) ?:
update_index(sb, si, ino, SCOUTFS_INODE_INDEX_META_SEQ_TYPE,
le64_to_cpu(sinode.meta_seq), 0,
si->item_meta_seq, 0);
if (ret == 0 && S_ISREG(inode->i_mode))
ret = update_index(sb, si, ino,
SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE,
le64_to_cpu(sinode.data_seq), 0,
si->item_data_seq, 0);
ret = update_indices(sb, si, ino, inode->i_mode, &sinode, lock_list);
BUG_ON(ret);
scoutfs_inode_init_key(&key, &ikey, ino);
@@ -644,12 +748,251 @@ void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock)
mutex_unlock(&si->item_mutex);
}
/*
* We map the item to coarse locks here. This reduces the number of
* locks we track and means that when we later try to find the lock that
* covers an item we can deal with the item update changing a little
* (seq, size) while still being covered. It does mean we have to share
* some logic with lock naming.
*/
static int add_index_lock(struct list_head *list, u64 ino, u8 type, u64 major,
u32 minor)
{
struct index_lock *ind_lock;
scoutfs_lock_clamp_inode_index(type, &major, &minor, &ino);
list_for_each_entry(ind_lock, list, head) {
if (ind_lock->type == type && ind_lock->major == major &&
ind_lock->minor == minor && ind_lock->ino == ino) {
return 0;
}
}
ind_lock = kzalloc(sizeof(struct index_lock), GFP_NOFS);
if (!ind_lock)
return -ENOMEM;
ind_lock->type = type;
ind_lock->major = major;
ind_lock->minor = minor;
ind_lock->ino = ino;
list_add(&ind_lock->head, list);
return 0;
}
static int prepare_index_items(struct scoutfs_inode_info *si,
struct list_head *list, u64 ino, umode_t mode,
u8 type, u64 major, u32 minor)
{
int ret;
if (will_ins_index(si, type, major, minor)) {
ret = add_index_lock(list, ino, type, major, minor);
if (ret)
return ret;
}
if (will_del_index(si, type, major, minor)) {
ret = add_index_lock(list, ino, type, si->item_majors[type],
si->item_minors[type]);
if (ret)
return ret;
}
return 0;
}
/*
* Return the data seq that we expect to see in the updated inode. The
* caller tells us if they know they're going to update it. If the
* inode doesn't exist it'll also get the current data_seq.
*/
static u64 upd_data_seq(struct scoutfs_sb_info *sbi,
struct scoutfs_inode_info *si, bool set_data_seq)
{
if (!si || !si->have_item || set_data_seq)
return sbi->trans_seq;
return si->item_majors[SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE];
}
/*
* Prepare locks that will cover the inode index items that will be
* modified when this inode's item is updated during the upcoming
* transaction.
*
* To lock the index items that will be created we need to predict the
* new indexed values. We assume that the meta seq will always be set
* to the current seq. This will usually be a nop in a running
* transaction. The caller tells us what the size will be and whether
* data_seq will also be set to the current transaction.
*/
static int prepare_indices(struct super_block *sb, struct list_head *list,
struct scoutfs_inode_info *si, u64 ino,
umode_t mode, u64 new_size, bool set_data_seq)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct index_update {
u8 type;
u64 major;
u32 minor;
} *upd, upds[] = {
{ SCOUTFS_INODE_INDEX_SIZE_TYPE, new_size, 0},
{ SCOUTFS_INODE_INDEX_META_SEQ_TYPE, sbi->trans_seq, 0},
{ SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE,
upd_data_seq(sbi, si, set_data_seq), 0},
};
int ret;
int i;
for (i = 0, upd = upds; i < ARRAY_SIZE(upds); i++, upd++) {
if (!inode_has_index(mode, upd->type))
continue;
ret = prepare_index_items(si, list, ino, mode,
upd->type, upd->major, upd->minor);
if (ret)
break;
}
return ret;
}
int scoutfs_inode_index_prepare(struct super_block *sb, struct list_head *list,
struct inode *inode, u64 new_size,
bool set_data_seq)
{
struct scoutfs_inode_info *si = SCOUTFS_I(inode);
return prepare_indices(sb, list, si, scoutfs_ino(inode),
inode->i_mode, new_size, set_data_seq);
}
/*
* This is used to initially create the index items for a newly created
* inode. We don't have a populated vfs inode yet. The existing
* indexed values don't matter because it's 'have_item' is false. It
* will try to create all the appropriate index items.
*/
int scoutfs_inode_index_prepare_ino(struct super_block *sb,
struct list_head *list, u64 ino,
umode_t mode, u64 new_size)
{
return prepare_indices(sb, list, NULL, ino, mode, new_size, true);
}
/*
* Prepare the locks needed to delete all the index items associated
* with the inode. We know the items have to exist and can skip straight
* to adding locks for each of them.
*/
static int prepare_index_deletion(struct super_block *sb,
struct list_head *list, u64 ino,
umode_t mode, struct scoutfs_inode *sinode)
{
struct index_item {
u8 type;
u64 major;
u32 minor;
} *ind, inds[] = {
{ SCOUTFS_INODE_INDEX_SIZE_TYPE,
le64_to_cpu(sinode->size), 0 },
{ SCOUTFS_INODE_INDEX_META_SEQ_TYPE,
le64_to_cpu(sinode->meta_seq), 0 },
{ SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE,
le64_to_cpu(sinode->data_seq), 0 },
};
int ret;
int i;
for (i = 0, ind = inds; i < ARRAY_SIZE(inds); i++, ind++) {
if (!inode_has_index(mode, ind->type))
continue;
ret = add_index_lock(list, ino, ind->type, ind->major,
ind->minor);
if (ret)
break;
}
return ret;
}
/*
* Sample the transaction sequence before we start checking it to see if
* indexed meta seq and data seq items will change.
*/
int scoutfs_inode_index_start(struct super_block *sb, u64 *seq)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
/* XXX this feels racey in a bad way :) */
*seq = sbi->trans_seq;
return 0;
}
/*
* Acquire the prepared index locks and hold the transaction. If the
* sequence number changes as we enter the transaction then we need to
* retry so that we can use the new seq to prepare locks.
*
* Returns > 0 if the seq changed and the locks should be retried.
*/
int scoutfs_inode_index_lock_hold(struct super_block *sb,
struct list_head *list, u64 seq,
const struct scoutfs_item_count cnt)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct index_lock *ind_lock;
int ret = 0;
list_sort(NULL, list, cmp_index_lock);
list_for_each_entry(ind_lock, list, head) {
ret = scoutfs_lock_inode_index(sb, DLM_LOCK_EX, ind_lock->type,
ind_lock->major, ind_lock->ino,
&ind_lock->lock);
if (ret)
goto out;
}
ret = scoutfs_hold_trans(sb, cnt);
if (ret == 0 && seq != sbi->trans_seq) {
scoutfs_release_trans(sb);
ret = 1;
}
out:
if (ret)
scoutfs_inode_index_unlock(sb, list);
return ret;
}
/*
* Unlocks and frees all the locks on the list.
*/
void scoutfs_inode_index_unlock(struct super_block *sb, struct list_head *list)
{
struct index_lock *ind_lock;
struct index_lock *tmp;
list_for_each_entry_safe(ind_lock, tmp, list, head) {
scoutfs_unlock(sb, ind_lock->lock, DLM_LOCK_EX);
list_del_init(&ind_lock->head);
kfree(ind_lock);
}
}
/* this is called on final inode cleanup so enoent is fine */
static int remove_index(struct super_block *sb, u64 ino, u8 type, u64 major,
u32 minor)
u32 minor, struct list_head *ind_locks)
{
struct scoutfs_inode_index_key ikey;
struct scoutfs_key_buf key;
struct scoutfs_lock *lock;
int ret;
ikey.zone = SCOUTFS_INODE_INDEX_ZONE;
@@ -659,8 +1002,8 @@ static int remove_index(struct super_block *sb, u64 ino, u8 type, u64 major,
ikey.ino = cpu_to_be64(ino);
scoutfs_key_init(&key, &ikey, sizeof(ikey));
/* XXX would be deletion under CW that doesn't need to read */
ret = scoutfs_item_delete(sb, &key, NULL);
lock = find_index_lock(ind_locks, type, major, minor, ino);
ret = scoutfs_item_delete(sb, &key, lock->end);
if (ret == -ENOENT)
ret = 0;
return ret;
@@ -676,18 +1019,19 @@ static int remove_index(struct super_block *sb, u64 ino, u8 type, u64 major,
* the time they get to it, including being deleted.
*/
static int remove_index_items(struct super_block *sb, u64 ino,
struct scoutfs_inode *sinode)
struct scoutfs_inode *sinode,
struct list_head *ind_locks)
{
umode_t mode = le32_to_cpu(sinode->mode);
int ret;
ret = remove_index(sb, ino, SCOUTFS_INODE_INDEX_SIZE_TYPE,
le64_to_cpu(sinode->size), 0) ?:
le64_to_cpu(sinode->size), 0, ind_locks) ?:
remove_index(sb, ino, SCOUTFS_INODE_INDEX_META_SEQ_TYPE,
le64_to_cpu(sinode->meta_seq), 0);
le64_to_cpu(sinode->meta_seq), 0, ind_locks);
if (ret == 0 && S_ISREG(mode))
ret = remove_index(sb, ino, SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE,
le64_to_cpu(sinode->data_seq), 0);
le64_to_cpu(sinode->data_seq), 0, ind_locks);
return ret;
}
@@ -825,13 +1169,14 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
ci = SCOUTFS_I(inode);
ci->ino = ino;
ci->meta_seq = 0;
ci->data_seq = 0;
ci->data_version = 0;
ci->next_readdir_pos = SCOUTFS_DIRENT_FIRST_POS;
ci->have_item = false;
atomic64_set(&ci->last_refreshed, scoutfs_lock_refresh_gen(lock));
scoutfs_inode_set_meta_seq(inode);
scoutfs_inode_set_data_seq(inode);
inode->i_ino = ino; /* XXX overflow */
inode_init_owner(inode, dir, mode);
inode_set_bytes(inode, 0);
@@ -890,41 +1235,56 @@ static int remove_orphan_item(struct super_block *sb, u64 ino)
*/
static int delete_inode_items(struct super_block *sb, u64 ino)
{
struct scoutfs_lock *lock = NULL;
struct scoutfs_inode_key ikey;
struct scoutfs_inode sinode;
struct scoutfs_key_buf key;
SCOUTFS_DECLARE_KVEC(val);
LIST_HEAD(ind_locks);
bool release = false;
umode_t mode;
u64 ind_seq;
int ret;
ret = scoutfs_lock_ino(sb, DLM_LOCK_EX, 0, ino, &lock);
if (ret)
return ret;
scoutfs_inode_init_key(&key, &ikey, ino);
scoutfs_kvec_init(val, &sinode, sizeof(sinode));
ret = scoutfs_item_lookup_exact(sb, &key, val, sizeof(sinode), NULL);
ret = scoutfs_item_lookup_exact(sb, &key, val, sizeof(sinode), lock);
if (ret < 0) {
if (ret == -ENOENT)
ret = 0;
return ret;
goto out;
}
/* XXX corruption, inode probably won't be freed without repair */
if (le32_to_cpu(sinode.nlink)) {
scoutfs_warn(sb, "Dangling orphan item for inode %llu.", ino);
return -EIO;
ret = -EIO;
goto out;
}
mode = le32_to_cpu(sinode.mode);
trace_scoutfs_delete_inode(sb, ino, mode);
/* XXX this is obviously not done yet :) */
ret = scoutfs_hold_trans(sb, SIC_DIRTY_INODE());
/* XXX the trans reservation count is obviously bonkers :) */
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
prepare_index_deletion(sb, &ind_locks, ino, mode, &sinode) ?:
scoutfs_inode_index_lock_hold(sb, &ind_locks, ind_seq,
SIC_DIRTY_INODE());
if (ret > 0)
goto retry;
if (ret)
goto out;
release = true;
/* first remove index items to try to avoid indexing partial deletion */
ret = remove_index_items(sb, ino, &sinode);
ret = remove_index_items(sb, ino, &sinode, &ind_locks);
if (ret)
goto out;
@@ -941,7 +1301,7 @@ static int delete_inode_items(struct super_block *sb, u64 ino)
goto out;
#endif
ret = scoutfs_item_delete(sb, &key, NULL);
ret = scoutfs_item_delete(sb, &key, lock->end);
if (ret)
goto out;
@@ -949,6 +1309,8 @@ static int delete_inode_items(struct super_block *sb, u64 ino)
out:
if (release)
scoutfs_release_trans(sb);
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, lock, DLM_LOCK_EX);
return ret;
}

View File

@@ -4,6 +4,7 @@
#include "key.h"
#include "lock.h"
#include "per_task.h"
#include "count.h"
struct scoutfs_lock;
@@ -23,11 +24,8 @@ struct scoutfs_inode_info {
*/
struct mutex item_mutex;
bool have_item;
u64 item_size;
struct timespec item_ctime;
struct timespec item_mtime;
u64 item_meta_seq;
u64 item_data_seq;
u64 item_majors[SCOUTFS_INODE_INDEX_NR];
u32 item_minors[SCOUTFS_INODE_INDEX_NR];
/* updated at on each new lock acquisition */
atomic64_t last_refreshed;
@@ -63,13 +61,29 @@ int scoutfs_orphan_inode(struct inode *inode);
struct inode *scoutfs_iget(struct super_block *sb, u64 ino);
struct inode *scoutfs_ilookup(struct super_block *sb, u64 ino);
int scoutfs_inode_index_start(struct super_block *sb, u64 *seq);
int scoutfs_inode_index_prepare(struct super_block *sb, struct list_head *list,
struct inode *inode, u64 new_size,
bool set_data_seq);
int scoutfs_inode_index_prepare_ino(struct super_block *sb,
struct list_head *list, u64 ino,
umode_t mode, u64 new_size);
int scoutfs_inode_index_lock_hold(struct super_block *sb,
struct list_head *list, u64 seq,
const struct scoutfs_item_count cnt);
void scoutfs_inode_index_unlock(struct super_block *sb, struct list_head *list);
int scoutfs_dirty_inode_item(struct inode *inode, struct scoutfs_lock *lock);
void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock);
void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock,
struct list_head *ind_locks);
void scoutfs_inode_fill_pool(struct super_block *sb, u64 ino, u64 nr);
int scoutfs_alloc_ino(struct super_block *sb, u64 *ino);
struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
umode_t mode, dev_t rdev, u64 ino,
struct scoutfs_lock *lock);
void scoutfs_inode_set_meta_seq(struct inode *inode);
void scoutfs_inode_set_data_seq(struct inode *inode);
void scoutfs_inode_inc_data_version(struct inode *inode);

View File

@@ -612,6 +612,42 @@ int scoutfs_lock_global(struct super_block *sb, int mode, int flags, int type,
NULL, NULL, lock);
}
/*
* Set the caller's major, minor, and ino to the start of lock that
* covers the incoming index item. This can be used to discover when
* multiple items map to the same lock.
*/
void scoutfs_lock_clamp_inode_index(u8 type, u64 *major, u32 *minor, u64 *ino)
{
u64 major_mask;
u64 ino_mask;
int bit;
switch(type) {
case SCOUTFS_INODE_INDEX_SIZE_TYPE:
major_mask = 0;
if (*major) {
bit = fls64(*major);
if (bit > 4)
major_mask = (1 << (bit - 4)) - 1;
}
ino_mask = (1 << 12) - 1;
break;
case SCOUTFS_INODE_INDEX_META_SEQ_TYPE:
case SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE:
major_mask = SCOUTFS_LOCK_SEQ_GROUP_MASK;
ino_mask = ~0ULL;
break;
default:
BUG();
}
*major &= ~major_mask;
*minor = 0;
*ino &= ~ino_mask;
}
/*
* map inode index items to locks. The idea is to not have to
* constantly get locks over a reasonable distribution of items, but
@@ -647,7 +683,7 @@ int scoutfs_lock_inode_index(struct super_block *sb, int mode,
case SCOUTFS_INODE_INDEX_META_SEQ_TYPE:
case SCOUTFS_INODE_INDEX_DATA_SEQ_TYPE:
major_mask = (1 << 10) - 1;
major_mask = SCOUTFS_LOCK_SEQ_GROUP_MASK;
ino_mask = ~0ULL;
break;
default:

View File

@@ -36,6 +36,7 @@ int scoutfs_lock_inode(struct super_block *sb, int mode, int flags,
struct inode *inode, struct scoutfs_lock **ret_lock);
int scoutfs_lock_ino(struct super_block *sb, int mode, int flags, u64 ino,
struct scoutfs_lock **ret_lock);
void scoutfs_lock_clamp_inode_index(u8 type, u64 *major, u32 *minor, u64 *ino);
int scoutfs_lock_inode_index(struct super_block *sb, int mode,
u8 type, u64 major, u64 ino,
struct scoutfs_lock **ret_lock);

View File

@@ -782,38 +782,43 @@ TRACE_EVENT(scoutfs_i_callback,
TP_printk("freeing inode %p", __entry->inode)
);
TRACE_EVENT(scoutfs_inode_update_index,
TP_PROTO(struct super_block *sb, __u64 ino, unsigned int have_item,
__u64 now_major, unsigned int now_minor, __u64 then_major,
unsigned int then_minor),
DECLARE_EVENT_CLASS(scoutfs_index_item_class,
TP_PROTO(struct super_block *sb, __u8 type, __u64 major, __u32 minor,
__u64 ino),
TP_ARGS(sb, ino, have_item, now_major, now_minor, then_major,
then_minor),
TP_ARGS(sb, type, major, minor, ino),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u8, type)
__field(__u64, major)
__field(__u32, minor)
__field(__u64, ino)
__field(unsigned int, have_item)
__field(__u64, now_major)
__field(unsigned int, now_minor)
__field(__u64, then_major)
__field(unsigned int, then_minor)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->type = type;
__entry->major = major;
__entry->minor = minor;
__entry->ino = ino;
__entry->have_item = have_item;
__entry->now_major = now_major;
__entry->now_minor = now_minor;
__entry->then_major = then_major;
__entry->then_minor = then_minor;
),
TP_printk(FSID_FMT" ino %llu have %u now %llu.%u then %llu.%u",
__entry->fsid, __entry->ino, __entry->have_item,
__entry->now_major, __entry->now_minor, __entry->then_major,
__entry->then_minor)
TP_printk("fsid "FSID_FMT" type %u major %llu minor %u ino %llu",
__entry->fsid, __entry->type, __entry->major, __entry->minor,
__entry->ino)
);
DEFINE_EVENT(scoutfs_index_item_class, scoutfs_create_index_item,
TP_PROTO(struct super_block *sb, __u8 type, __u64 major, __u32 minor,
__u64 ino),
TP_ARGS(sb, type, major, minor, ino)
);
DEFINE_EVENT(scoutfs_index_item_class, scoutfs_delete_index_item,
TP_PROTO(struct super_block *sb, __u8 type, __u64 major, __u32 minor,
__u64 ino),
TP_ARGS(sb, type, major, minor, ino)
);
TRACE_EVENT(scoutfs_inode_fill_pool,

View File

@@ -264,10 +264,12 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
struct scoutfs_xattr_val_header vh;
size_t name_len = strlen(name);
SCOUTFS_DECLARE_KVEC(val);
struct scoutfs_lock *lck;
struct scoutfs_lock *lck = NULL;
unsigned int bytes;
unsigned int off;
LIST_HEAD(ind_locks);
LIST_HEAD(list);
u64 ind_seq;
u8 part;
int sif;
int ret;
@@ -299,7 +301,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
ret = scoutfs_item_add_batch(sb, &list, key, val);
if (ret)
goto unlock;
goto out;
}
}
@@ -315,28 +317,36 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
else
sif = 0;
ret = scoutfs_hold_trans(sb, SIC_XATTR_SET(name_len, size));
down_write(&si->xattr_rwsem);
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, inode,
i_size_read(inode), false) ?:
scoutfs_inode_index_lock_hold(sb, &ind_locks, ind_seq,
SIC_XATTR_SET(name_len, size));
if (ret > 0)
goto retry;
if (ret)
goto unlock;
down_write(&si->xattr_rwsem);
ret = scoutfs_dirty_inode_item(inode, lck) ?:
scoutfs_item_set_batch(sb, &list, key, last, sif, lck->end);
if (ret == 0) {
/* XXX do these want i_mutex or anything? */
inode_inc_iversion(inode);
inode->i_ctime = CURRENT_TIME;
scoutfs_update_inode_item(inode, lck);
scoutfs_update_inode_item(inode, lck, &ind_locks);
}
up_write(&si->xattr_rwsem);
scoutfs_release_trans(sb);
unlock:
scoutfs_unlock(sb, lck, DLM_LOCK_EX);
up_write(&si->xattr_rwsem);
out:
scoutfs_inode_index_unlock(sb, &ind_locks);
scoutfs_unlock(sb, lck, DLM_LOCK_EX);
scoutfs_item_free_batch(sb, &list);
scoutfs_key_free(sb, key);
scoutfs_key_free(sb, last);