Lock btree merges

The btree locking wasn't covering the merge candidate block before the
siblings were locked.  In that unlocked code it could compact the block
corrupting it for whatever other tree walk might only have the merge
candidate locked after having unlocked the parent.

This extends locking coverage to merge and split attempts by acquiring
the block lock immediately after we read it.  Split doesn't have to lock
its destination block but it does have to know to unlock the block on
errors.  Merge has to more carefully lock both of its existing blocks in
a consistent order.

To clearly implement this we simplify the locking helpers to just unlock
and lock a given block, falling back to the btree rwsem if there isn't a
block.

I started down this road while chasing allocator bugs that manifested as
tree corruption.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2016-11-03 14:23:04 -07:00
parent a77f88386c
commit 44588c1d8b

View File

@@ -442,14 +442,13 @@ static void compact_items(struct scoutfs_btree_block *bt)
* tree. This gives us the opportunity to cascade block locks down the
* tree. We first lock the root. Then we lock the first block and
* unlock the root. Then lock the next block and unlock the first
* block. And so on down the tree. Except for that brief transition
* function the btree walk always holds a single lock on either the root
* or a block at a level. After contention on the root and first block
* we have lots of concurrency down paths of the tree to the leaves.
* block. And so on down the tree. After contention on the root and
* first block we have lots of concurrency down paths of the tree to the
* leaves.
*
* As we walk down the tree we have to split or merge. While we do this
* we hold the parent block lock. We have to also lock the sibling
* blocks. We always acquire them left to right to avoid deadlocks.
* Merging during descent has to lock the sibling block that it's
* pulling items from. It has to acquire these nested locks in
* consistent tree order.
*
* The cow tree updates let us skip block locking entirely for stable
* blocks because they're read only. The block layer only has to worry
@@ -458,22 +457,13 @@ static void compact_items(struct scoutfs_btree_block *bt)
* transaction and we store the block lock there. The block layer
* ignores our locking attempts for read-only blocks.
*
* And all of the blocks referenced by the stable super will be stable
* so we only try to lock at all when working with the dirty super.
*
* lockdep has to not be freaked out by all of this. The cascading
* block locks really make it angry without annotation so we add classes
* for each level and use nested subclasses for the locking of siblings
* during split and merge.
*
* We also use the btree API for the block allocator. This introduces
* nesting btree allocator calls inside main fs metadata btree calls.
* The locking would be safe as the blocks will never be in both trees
* but lockdep would think they're the same class and get raise
* warnings. We'd need to have tree level classes for all the trees.
* It turns out that the allocator has to maintain multi-item
* consistency across its entire tree so it has a tree-wide lock. We
* don't have to lock the btree at all when we're working on the
* allocator roots. They're the only non-metadata roots so far so we
* invert the test and only lock the btree when we're working on the
* main metadata btree root.
* during merge.
*/
static void set_block_lock_class(struct buffer_head *bh, int level)
@@ -485,85 +475,42 @@ static void set_block_lock_class(struct buffer_head *bh, int level)
#endif
}
static void lock_root(struct super_block *sb, struct scoutfs_btree_root *root,
bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
if (write)
down_write(&sbi->btree_rwsem);
else
down_read(&sbi->btree_rwsem);
}
}
static void unlock_root(struct super_block *sb, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (write)
up_write(&sbi->btree_rwsem);
else
up_read(&sbi->btree_rwsem);
}
static void unlock_level(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *bh, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
if (bh)
scoutfs_block_unlock(bh, write);
else
unlock_root(sb, write);
}
}
static void lock_next_level(struct super_block *sb,
static void lock_tree_block(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *par_bh,
struct buffer_head *bh, bool write)
struct buffer_head *bh, bool write, int subclass)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_lock(bh, write, 0);
unlock_level(sb, root, par_bh, write);
if (bh) {
scoutfs_block_lock(bh, write, subclass);
} else {
if (write)
down_write(&sbi->btree_rwsem);
else
down_read(&sbi->btree_rwsem);
}
}
}
static void lock_siblings(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *left, struct buffer_head *right,
bool write)
static void unlock_tree_block(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *bh, bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_lock(left, write, 0);
scoutfs_block_lock(right, write, 1);
if (bh) {
scoutfs_block_unlock(bh, write);
} else {
if (write)
up_write(&sbi->btree_rwsem);
else
up_read(&sbi->btree_rwsem);
}
}
}
static void unlock_siblings(struct super_block *sb,
struct scoutfs_btree_root *root,
struct buffer_head *left, struct buffer_head *right,
bool write)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (root == &sbi->super.btree_root) {
scoutfs_block_unlock(left, write);
scoutfs_block_unlock(right, write);
}
}
/* sorting relies on masking pointers to find the containing block */
static inline struct buffer_head *check_bh_alignment(struct buffer_head *bh)
{
@@ -678,14 +625,13 @@ static void create_parent_item(struct scoutfs_btree_block *parent,
* We split to the left so that the greatest key in the existing block
* doesn't change so we don't have to update the key in its parent item.
*
* If the search key falls in the new split block then we return it
* to the caller to walk through.
* If the search key falls in the new split block then we return it to
* the caller to walk through.
*
* The locking in the case where we add the first parent is a little wonky.
* We're creating a parent block that the walk doesn't know about. It
* holds the tree mutex while we add the parent ref and then will lock
* the child that we return. It's skipping locking the new parent as it
* descends but that's fine.
* The caller has the parent (or root) and our block locked. We don't
* have to lock the blocks we allocate while we have the references to
* them locked. We only need to lock the new sibling if we return it
* instead of our given block for the caller to continue descent.
*/
static struct buffer_head *try_split(struct super_block *sb,
struct scoutfs_btree_root *root,
@@ -701,7 +647,6 @@ static struct buffer_head *try_split(struct super_block *sb,
struct buffer_head *par_bh = NULL;
struct scoutfs_key maximal;
unsigned int all_bytes;
bool swap_return = false;
if (level)
val_len = sizeof(struct scoutfs_block_ref);
@@ -718,6 +663,7 @@ static struct buffer_head *try_split(struct super_block *sb,
/* alloc split neighbour first to avoid unwinding tree growth */
left_bh = alloc_tree_block(sb);
if (IS_ERR(left_bh)) {
unlock_tree_block(sb, root, right_bh, true);
scoutfs_block_put(right_bh);
return left_bh;
}
@@ -728,6 +674,7 @@ static struct buffer_head *try_split(struct super_block *sb,
if (IS_ERR(par_bh)) {
free_tree_block(sb, left->hdr.blkno);
scoutfs_block_put(left_bh);
unlock_tree_block(sb, root, right_bh, true);
scoutfs_block_put(right_bh);
return par_bh;
}
@@ -739,26 +686,21 @@ static struct buffer_head *try_split(struct super_block *sb,
create_parent_item(parent, parent_pos, right, &maximal);
}
lock_siblings(sb, root, left_bh, right_bh, true);
move_items(left, right, false, used_total(right) / 2);
create_parent_item(parent, parent_pos, left, greatest_key(left));
parent_pos++; /* not that anything uses it again :P */
if (scoutfs_key_cmp(key, greatest_key(left)) <= 0) {
/* insertion will go to the new left block */
swap_return = true;
unlock_tree_block(sb, root, right_bh, true);
lock_tree_block(sb, root, left_bh, true, 0);
swap(right_bh, left_bh);
} else {
/* insertion will still go through us, might need to compact */
if (contig_free(right) < all_bytes)
compact_items(right);
}
unlock_siblings(sb, root, left_bh, right_bh, true);
if (swap_return)
swap(right_bh, left_bh);
scoutfs_block_put(par_bh);
scoutfs_block_put(left_bh);
@@ -777,8 +719,10 @@ static struct buffer_head *try_split(struct super_block *sb,
* from both easily. We have to unlock and release our buffer to return
* an error.
*
* The caller only has the parent locked. They'll lock whichever
* block we return.
* The caller locks the parent and our given block. We need to
* lock sibling blocks in consistent tree order. Our common case
* has us pulling from our left sibling so we prefer to lock blocks
* from right to left. Splitting doesn't hold both sibling locks.
*
* We free sibling or parent btree block blknos if we drain them of items.
* They're dirtied either by descent or before we start migrating items
@@ -821,10 +765,19 @@ static struct buffer_head *try_merge(struct super_block *sb,
}
sib_bt = bh_data(sib_bh);
if (move_right)
lock_siblings(sb, root, sib_bh, bh, true);
else
lock_siblings(sb, root, bh, sib_bh, true);
if (!move_right) {
unlock_tree_block(sb, root, bh, true);
lock_tree_block(sb, root, sib_bh, true, 0);
lock_tree_block(sb, root, bh, true, 1);
if (reclaimable_free(bt) <= SCOUTFS_BTREE_FREE_LIMIT) {
unlock_tree_block(sb, root, sib_bh, true);
scoutfs_block_put(sib_bh);
return bh;
}
} else {
lock_tree_block(sb, root, sib_bh, true, 1);
}
if (used_total(sib_bt) <= reclaimable_free(bt))
to_move = used_total(sib_bt);
@@ -866,11 +819,7 @@ static struct buffer_head *try_merge(struct super_block *sb,
free_tree_block(sb, parent->hdr.blkno);
}
if (move_right)
unlock_siblings(sb, root, sib_bh, bh, true);
else
unlock_siblings(sb, root, bh, sib_bh, true);
unlock_tree_block(sb, root, sib_bh, true);
scoutfs_block_put(sib_bh);
return bh;
@@ -1065,7 +1014,7 @@ static struct buffer_head *btree_walk(struct super_block *sb,
if (prev_key)
scoutfs_key_set_zero(prev_key);
lock_root(sb, root, dirty);
lock_tree_block(sb, root, NULL, dirty, 0);
ref = &root->ref;
level = root->height;
@@ -1075,8 +1024,10 @@ static struct buffer_head *btree_walk(struct super_block *sb,
bh = ERR_PTR(-ENOENT);
} else {
bh = grow_tree(sb, root);
if (!IS_ERR(bh))
lock_next_level(sb, root, NULL, bh, dirty);
if (!IS_ERR(bh)) {
lock_tree_block(sb, root, bh, dirty, 0);
unlock_tree_block(sb, root, NULL, dirty);
}
}
goto out;
}
@@ -1105,6 +1056,8 @@ static struct buffer_head *btree_walk(struct super_block *sb,
break;
}
lock_tree_block(sb, root, bh, dirty, 0);
if (op == WALK_INSERT)
bh = try_split(sb, root, level, key, val_len, parent,
pos, bh);
@@ -1113,7 +1066,7 @@ static struct buffer_head *btree_walk(struct super_block *sb,
if (IS_ERR(bh))
break;
lock_next_level(sb, root, par_bh, bh, dirty);
unlock_tree_block(sb, root, par_bh, dirty);
if (!level)
break;
@@ -1163,7 +1116,7 @@ static struct buffer_head *btree_walk(struct super_block *sb,
out:
if (IS_ERR(bh))
unlock_level(sb, root, par_bh, dirty);
unlock_tree_block(sb, root, par_bh, dirty);
scoutfs_block_put(par_bh);
return bh;
@@ -1201,7 +1154,7 @@ int scoutfs_btree_lookup(struct super_block *sb,
ret = -ENOENT;
}
unlock_level(sb, root, bh, false);
unlock_tree_block(sb, root, bh, false);
scoutfs_block_put(bh);
trace_printk("key "CKF" ret %d\n", CKA(key), ret);
@@ -1257,7 +1210,7 @@ int scoutfs_btree_insert(struct super_block *sb,
ret = -EEXIST;
}
unlock_level(sb, root, bh, true);
unlock_tree_block(sb, root, bh, true);
scoutfs_block_put(bh);
return ret;
@@ -1305,7 +1258,7 @@ int scoutfs_btree_delete(struct super_block *sb,
ret = -ENOENT;
}
unlock_level(sb, root, bh, true);
unlock_tree_block(sb, root, bh, true);
scoutfs_block_put(bh);
out:
@@ -1373,7 +1326,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root,
pos = find_pos_after_seq(bt, &key, 0, seq, op);
if (pos >= bt->nr_items) {
key = next_key;
unlock_level(sb, root, bh, false);
unlock_tree_block(sb, root, bh, false);
scoutfs_block_put(bh);
continue;
}
@@ -1391,7 +1344,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root,
ret = -ENOENT;
}
unlock_level(sb, root, bh, false);
unlock_tree_block(sb, root, bh, false);
scoutfs_block_put(bh);
break;
}
@@ -1465,7 +1418,7 @@ int scoutfs_btree_prev(struct super_block *sb, struct scoutfs_btree_root *root,
/* walk to the prev leaf if we hit the front of this leaf */
if (pos == 0 && cmp != 0) {
unlock_level(sb, root, bh, false);
unlock_tree_block(sb, root, bh, false);
scoutfs_block_put(bh);
if (scoutfs_key_is_zero(&key))
break;
@@ -1487,7 +1440,7 @@ int scoutfs_btree_prev(struct super_block *sb, struct scoutfs_btree_root *root,
ret = 0;
}
unlock_level(sb, root, bh, false);
unlock_tree_block(sb, root, bh, false);
scoutfs_block_put(bh);
break;
}
@@ -1525,7 +1478,7 @@ int scoutfs_btree_dirty(struct super_block *sb,
ret = -ENOENT;
}
unlock_level(sb, root, bh, true);
unlock_tree_block(sb, root, bh, true);
scoutfs_block_put(bh);
trace_printk("key "CKF" ret %d\n", CKA(key), ret);
@@ -1570,7 +1523,7 @@ int scoutfs_btree_update(struct super_block *sb,
ret = -ENOENT;
}
unlock_level(sb, root, bh, true);
unlock_tree_block(sb, root, bh, true);
scoutfs_block_put(bh);
return ret;