scoutfs: allow more concurrent btree locking

The btree locking so far was a quick interim measure to get the rest of
the system going.  We want to clean it up both for correctness and
performance but also to make way for using the btree for block
allocation.

We were unconditionally using the buffer head lock for tree block
locking.  This is bad for at least four reasons:  it's invisible to
lockdep, it doesn't allow concurrent reads, it doesn't allow reading
while a block is being written during the transaction, and it's not
necessary at all when the for stable read-only blocks.

Instead we add a rwsem to the buffer head private which we use to lock
the block when it's writable.  We clean up the locking functions to make
it clearer that btree_walk holds one lock at a time and either returns
it to the caller with the buffer head or unlocks the parent if its
returning an error.

We also add the missing sibling block locking during splits and merges.
Locking the parent prevented walks from descending down our path but it
didn't protect against previous walks that were already down at our
sibling's level.

Getting all this working with lockdep adds a bit more class/subclass
plumbing calls but nothing too ornerous.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2016-09-21 09:59:53 -07:00
parent bb3a5742f4
commit cf0199da00
5 changed files with 245 additions and 73 deletions

View File

@@ -42,6 +42,8 @@ struct block_bh_private {
struct super_block *sb;
struct buffer_head *bh;
struct rb_node node;
struct rw_semaphore rwsem;
bool rwsem_class;
};
enum {
@@ -146,6 +148,9 @@ static int insert_bhp(struct super_block *sb, struct buffer_head *bh)
bhp->bh = bh;
get_bh(bh);
bh->b_private = bhp;
/* lockdep class can be set by callers that use the lock */
init_rwsem(&bhp->rwsem);
bhp->rwsem_class = false;
spin_lock_irqsave(&sbi->block_lock, flags);
insert_bhp_rb(&sbi->block_dirty_tree, bh);
@@ -306,13 +311,6 @@ int scoutfs_block_write_dirty(struct super_block *sb)
atomic_inc(&sbi->block_writes);
scoutfs_block_set_crc(bh);
/*
* XXX submit_bh() forces us to lock the block while IO is
* in flight. This is unfortunate because we use the buffer
* head lock to serialize access to btree block contents.
* We should fix that and only use the buffer head lock
* when the APIs force us to.
*/
lock_buffer(bh);
bh->b_end_io = block_write_end_io;
@@ -486,3 +484,42 @@ void scoutfs_block_zero(struct buffer_head *bh, size_t off)
if (off < SCOUTFS_BLOCK_SIZE)
memset((char *)bh->b_data + off, 0, SCOUTFS_BLOCK_SIZE - off);
}
void scoutfs_block_set_lock_class(struct buffer_head *bh,
struct lock_class_key *class)
{
struct block_bh_private *bhp = bh->b_private;
if (bhp && !bhp->rwsem_class) {
lockdep_set_class(&bhp->rwsem, class);
bhp->rwsem_class = true;
}
}
void scoutfs_block_lock(struct buffer_head *bh, bool write, int subclass)
{
struct block_bh_private *bhp = bh->b_private;
trace_printk("lock write %d bhp %p\n", write, bhp);
if (bhp) {
if (write)
down_write_nested(&bhp->rwsem, subclass);
else
down_read_nested(&bhp->rwsem, subclass);
}
}
void scoutfs_block_unlock(struct buffer_head *bh, bool write)
{
struct block_bh_private *bhp = bh->b_private;
trace_printk("unlock write %d bhp %p\n", write, bhp);
if (bhp) {
if (write)
up_write(&bhp->rwsem);
else
up_read(&bhp->rwsem);
}
}

View File

@@ -19,6 +19,11 @@ int scoutfs_block_write_dirty(struct super_block *sb);
void scoutfs_block_set_crc(struct buffer_head *bh);
void scoutfs_block_zero(struct buffer_head *bh, size_t off);
void scoutfs_block_set_lock_class(struct buffer_head *bh,
struct lock_class_key *class);
void scoutfs_block_lock(struct buffer_head *bh, bool write, int subclass);
void scoutfs_block_unlock(struct buffer_head *bh, bool write);
/* XXX seems like this should be upstream :) */
static inline void *bh_data(struct buffer_head *bh)
{

View File

@@ -417,6 +417,141 @@ static void compact_items(struct scoutfs_btree_block *bt)
sort_key_cmp, sort_off_swap);
}
/*
* Let's talk about btree locking.
*
* The main metadata btree has lots of callers who want concurrency.
* They have their own locks that protect multi item consistency -- say
* an inode's i_mutex protecting the items related to a given inode.
* But it's our responsibility to lock the btree itself.
*
* Our btree operations are implemented with a single walk down the
* tree. This gives us the opportunity to cascade block locks down the
* tree. We first lock the root. Then we lock the first block and
* unlock the root. Then lock the next block and unlock the first
* block. And so on down the tree. Except for that brief transition
* function the btree walk always holds a single lock on either the root
* or a block at a level. After contention on the root and first block
* we have lots of concurrency down paths of the tree to the leaves.
*
* As we walk down the tree we have to split or merge. While we do this
* we hold the parent block lock. We have to also lock the sibling
* blocks. We always acquire them left to right to avoid deadlocks.
*
* The cow tree updates let us skip block locking entirely for stable
* blocks because they're read only. The block layer only has to worry
* about locking blocks that could be written to. While they're
* writable they have a buffer_head private that pins them in the
* transaction and we store the block lock there. The block layer
* ignores our locking attempts for read-only blocks.
*
* lockdep has to not be freaked out by all of this. The cascading
* block locks really make it angry without annotation so we add classes
* for each level and use nested subclasses for the locking of siblings
* during split and merge.
*
* We also use the btree API for the block allocator. This introduces
* nesting btree allocator calls inside main fs metadata btree calls.
* The locking would be safe as the blocks will never be in both trees
* but lockdep would think they're the same class and get raise
* warnings. We'd need to have tree level classes for all the trees.
* It turns out that the allocator has to maintain multi-item
* consistency across its entire tree so it has a tree-wide lock. We
* don't have to lock the btree at all when we're working on the
* allocator roots. They're the only non-metadata roots so far so we
* invert the test and only lock the btree when we're working on the
* main metadata btree root.
*/
static void set_block_lock_class(struct buffer_head *bh, int level)
{
#ifdef CONFIG_LOCKDEP
static struct lock_class_key tree_depth_classes[SCOUTFS_BTREE_MAX_DEPTH];
scoutfs_block_set_lock_class(bh, &tree_depth_classes[level]);
#endif
}
static void lock_root(struct super_block *sb, struct scoutfs_btree_root *root,
bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
if (write)
down_write(&sbi->btree_rwsem);
else
down_read(&sbi->btree_rwsem);
}
}
static void unlock_root(struct super_block *sb, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (write)
up_write(&sbi->btree_rwsem);
else
up_read(&sbi->btree_rwsem);
}
static void unlock_level(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *bh, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
if (bh)
scoutfs_block_unlock(bh, write);
else
unlock_root(sb, write);
}
}
static void lock_next_level(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *par_bh,
struct buffer_head *bh, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_lock(bh, write, 0);
unlock_level(sb, root, par_bh, write);
}
}
static void lock_siblings(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *left, struct buffer_head *right,
bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_lock(left, write, 0);
scoutfs_block_lock(right, write, 1);
}
}
static void unlock_siblings(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *left, struct buffer_head *right,
bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_unlock(left, write);
scoutfs_block_unlock(right, write);
}
}
/* sorting relies on masking pointers to find the containing block */
static inline struct buffer_head *check_bh_alignment(struct buffer_head *bh)
{
@@ -477,12 +612,14 @@ static struct buffer_head *grow_tree(struct super_block *sb,
root->height++;
root->ref.blkno = hdr->blkno;
root->ref.seq = hdr->seq;
set_block_lock_class(bh, root->height - 1);
}
return bh;
}
static struct buffer_head *get_block_ref(struct super_block *sb,
static struct buffer_head *get_block_ref(struct super_block *sb, int level,
struct scoutfs_block_ref *ref,
bool dirty)
{
@@ -493,6 +630,9 @@ static struct buffer_head *get_block_ref(struct super_block *sb,
else
bh = scoutfs_block_read_ref(sb, ref);
if (!IS_ERR(bh))
set_block_lock_class(bh, level);
return check_bh_alignment(bh);
}
@@ -549,6 +689,7 @@ static struct buffer_head *try_split(struct super_block *sb,
struct buffer_head *par_bh = NULL;
struct scoutfs_key maximal;
unsigned int all_bytes;
bool swap_return = false;
if (level)
val_len = sizeof(struct scoutfs_block_ref);
@@ -586,23 +727,28 @@ static struct buffer_head *try_split(struct super_block *sb,
create_parent_item(parent, parent_pos, right, &maximal);
}
lock_siblings(sb, root, left_bh, right_bh, true);
move_items(left, right, false, used_total(right) / 2);
create_parent_item(parent, parent_pos, left, greatest_key(left));
parent_pos++; /* not that anything uses it again :P */
if (scoutfs_key_cmp(key, greatest_key(left)) <= 0) {
/* insertion will go to the new left block */
scoutfs_block_put(right_bh);
right_bh = left_bh;
swap_return = true;
} else {
scoutfs_block_put(left_bh);
/* insertion will still go through us, might need to compact */
if (contig_free(right) < all_bytes)
compact_items(right);
}
unlock_siblings(sb, root, left_bh, right_bh, true);
if (swap_return)
swap(right_bh, left_bh);
scoutfs_block_put(par_bh);
scoutfs_block_put(left_bh);
return right_bh;
}
@@ -631,7 +777,7 @@ static struct buffer_head *try_split(struct super_block *sb,
static struct buffer_head *try_merge(struct super_block *sb,
struct scoutfs_btree_root *root,
struct scoutfs_btree_block *parent,
unsigned int pos,
int level, unsigned int pos,
struct buffer_head *bh)
{
struct scoutfs_btree_block *bt = bh_data(bh);
@@ -655,7 +801,7 @@ static struct buffer_head *try_merge(struct super_block *sb,
}
sib_item = pos_item(parent, sib_pos);
sib_bh = get_block_ref(sb, (void *)sib_item->val, true);
sib_bh = get_block_ref(sb, level, (void *)sib_item->val, true);
if (IS_ERR(sib_bh)) {
/* XXX do we need to unlock this? don't think so */
scoutfs_block_put(bh);
@@ -663,6 +809,11 @@ static struct buffer_head *try_merge(struct super_block *sb,
}
sib_bt = bh_data(sib_bh);
if (move_right)
lock_siblings(sb, root, sib_bh, bh, true);
else
lock_siblings(sb, root, bh, sib_bh, true);
if (used_total(sib_bt) <= reclaimable_free(bt))
to_move = used_total(sib_bt);
else
@@ -703,6 +854,11 @@ static struct buffer_head *try_merge(struct super_block *sb,
free_tree_block(sb, parent->hdr.blkno);
}
if (move_right)
unlock_siblings(sb, root, sib_bh, bh, true);
else
unlock_siblings(sb, root, bh, sib_bh, true);
scoutfs_block_put(sib_bh);
return bh;
@@ -716,44 +872,6 @@ enum {
WALK_DIRTY,
};
static inline void lock_root(struct scoutfs_sb_info *sbi, bool dirty)
{
if (dirty)
down_write(&sbi->btree_rwsem);
else
down_read(&sbi->btree_rwsem);
}
static inline void unlock_root(struct scoutfs_sb_info *sbi, bool dirty)
{
if (dirty)
up_write(&sbi->btree_rwsem);
else
up_read(&sbi->btree_rwsem);
}
/*
* As we descend we lock parent blocks (or the root), then lock the child,
* then unlock the parent.
*/
static inline void lock_block(struct scoutfs_sb_info *sbi,
struct buffer_head *bh, bool dirty)
{
if (bh == NULL)
lock_root(sbi, dirty);
else
lock_buffer(bh);
}
static inline void unlock_block(struct scoutfs_sb_info *sbi,
struct buffer_head *bh, bool dirty)
{
if (bh == NULL)
unlock_root(sbi, dirty);
else
unlock_buffer(bh);
}
static u64 item_block_ref_seq(struct scoutfs_btree_item *item)
{
struct scoutfs_block_ref *ref = (void *)item->val;
@@ -914,7 +1032,6 @@ static struct buffer_head *btree_walk(struct super_block *sb,
struct scoutfs_key *next_key,
unsigned int val_len, u64 seq, int op)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_btree_block *parent = NULL;
struct buffer_head *par_bh = NULL;
struct buffer_head *bh = NULL;
@@ -932,7 +1049,7 @@ static struct buffer_head *btree_walk(struct super_block *sb,
if (next_key)
scoutfs_set_max_key(next_key);
lock_block(sbi, par_bh, dirty);
lock_root(sb, root, dirty);
ref = &root->ref;
level = root->height;
@@ -943,17 +1060,16 @@ static struct buffer_head *btree_walk(struct super_block *sb,
} else {
bh = grow_tree(sb, root);
if (!IS_ERR(bh))
lock_block(sbi, bh, dirty);
lock_next_level(sb, root, NULL, bh, dirty);
}
unlock_block(sbi, par_bh, dirty);
return bh;
goto out;
}
/* skip the whole tree if the root ref's seq is old */
if (op == WALK_NEXT_SEQ && le64_to_cpu(ref->seq) < seq) {
unlock_block(sbi, par_bh, dirty);
return ERR_PTR(-ENOENT);
bh = ERR_PTR(-ENOENT);
goto out;
}
scoutfs_set_key(&small, 0, 0, 0);
@@ -961,7 +1077,7 @@ static struct buffer_head *btree_walk(struct super_block *sb,
while (level--) {
/* XXX hmm, need to think about retry */
bh = get_block_ref(sb, ref, dirty);
bh = get_block_ref(sb, level, ref, dirty);
if (IS_ERR(bh))
break;
@@ -977,17 +1093,15 @@ static struct buffer_head *btree_walk(struct super_block *sb,
bh = try_split(sb, root, level, key, val_len, parent,
pos, bh);
if ((op == WALK_DELETE) && parent)
bh = try_merge(sb, root, parent, pos, bh);
bh = try_merge(sb, root, parent, level, pos, bh);
if (IS_ERR(bh))
break;
lock_block(sbi, bh, dirty);
lock_next_level(sb, root, par_bh, bh, dirty);
if (!level)
break;
/* unlock parent before searching so others can use it */
unlock_block(sbi, par_bh, dirty);
scoutfs_block_put(par_bh);
par_bh = bh;
parent = bh_data(par_bh);
@@ -1027,7 +1141,9 @@ static struct buffer_head *btree_walk(struct super_block *sb,
large = item->key;
}
unlock_block(sbi, par_bh, dirty);
out:
if (IS_ERR(bh))
unlock_level(sb, root, par_bh, dirty);
scoutfs_block_put(par_bh);
return bh;
@@ -1066,7 +1182,7 @@ int scoutfs_btree_lookup(struct super_block *sb,
ret = -ENOENT;
}
unlock_block(NULL, bh, false);
unlock_level(sb, root, bh, false);
scoutfs_block_put(bh);
trace_printk("key "CKF" ret %d\n", CKA(key), ret);
@@ -1120,7 +1236,7 @@ int scoutfs_btree_insert(struct super_block *sb,
ret = -EEXIST;
}
unlock_block(NULL, bh, true);
unlock_level(sb, root, bh, true);
scoutfs_block_put(bh);
return ret;
@@ -1166,7 +1282,7 @@ int scoutfs_btree_delete(struct super_block *sb,
ret = -ENOENT;
}
unlock_block(NULL, bh, true);
unlock_level(sb, root, bh, true);
scoutfs_block_put(bh);
out:
@@ -1234,7 +1350,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root,
pos = find_pos_after_seq(bt, &key, 0, seq, op);
if (pos >= bt->nr_items) {
key = next_key;
unlock_block(NULL, bh, false);
unlock_level(sb, root, bh, false);
scoutfs_block_put(bh);
continue;
}
@@ -1252,7 +1368,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root,
ret = -ENOENT;
}
unlock_block(NULL, bh, false);
unlock_level(sb, root, bh, false);
scoutfs_block_put(bh);
break;
}
@@ -1311,7 +1427,7 @@ int scoutfs_btree_dirty(struct super_block *sb,
ret = -ENOENT;
}
unlock_block(NULL, bh, true);
unlock_level(sb, root, bh, true);
scoutfs_block_put(bh);
trace_printk("key "CKF" ret %d\n", CKA(key), ret);
@@ -1353,7 +1469,7 @@ int scoutfs_btree_update(struct super_block *sb,
ret = -ENOENT;
}
unlock_block(NULL, bh, true);
unlock_level(sb, root, bh, true);
scoutfs_block_put(bh);
return ret;

View File

@@ -156,6 +156,21 @@ struct scoutfs_btree_item {
(member_sizeof(struct scoutfs_btree_block, item_offs[0]) + \
sizeof(struct scoutfs_btree_item)))
/*
* We can calculate the max tree depth by calculating how many leaf
* blocks the tree could reference. The block device can only reference
* 2^64 bytes. The tallest parent tree has half full parent blocks.
*
* So we have the relation:
*
* ceil(max_items / 2) ^ (max_depth - 1) >= 2^64 / block_size
*
* and solve for depth:
*
* max_depth = log(ceil(max_items / 2), 2^64 / block_size) + 1
*/
#define SCOUTFS_BTREE_MAX_DEPTH 10
#define SCOUTFS_UUID_BYTES 16
struct scoutfs_super_block {

View File

@@ -27,7 +27,6 @@ struct scoutfs_sb_info {
struct mutex buddy_mutex;
atomic_t buddy_count;
/* XXX there will be a lot more of these :) */
struct rw_semaphore btree_rwsem;
atomic_t trans_holds;