diff --git a/kmod/src/block.c b/kmod/src/block.c index 7c049a9a..0f6269af 100644 --- a/kmod/src/block.c +++ b/kmod/src/block.c @@ -42,6 +42,8 @@ struct block_bh_private { struct super_block *sb; struct buffer_head *bh; struct rb_node node; + struct rw_semaphore rwsem; + bool rwsem_class; }; enum { @@ -146,6 +148,9 @@ static int insert_bhp(struct super_block *sb, struct buffer_head *bh) bhp->bh = bh; get_bh(bh); bh->b_private = bhp; + /* lockdep class can be set by callers that use the lock */ + init_rwsem(&bhp->rwsem); + bhp->rwsem_class = false; spin_lock_irqsave(&sbi->block_lock, flags); insert_bhp_rb(&sbi->block_dirty_tree, bh); @@ -306,13 +311,6 @@ int scoutfs_block_write_dirty(struct super_block *sb) atomic_inc(&sbi->block_writes); scoutfs_block_set_crc(bh); - /* - * XXX submit_bh() forces us to lock the block while IO is - * in flight. This is unfortunate because we use the buffer - * head lock to serialize access to btree block contents. - * We should fix that and only use the buffer head lock - * when the APIs force us to. - */ lock_buffer(bh); bh->b_end_io = block_write_end_io; @@ -486,3 +484,42 @@ void scoutfs_block_zero(struct buffer_head *bh, size_t off) if (off < SCOUTFS_BLOCK_SIZE) memset((char *)bh->b_data + off, 0, SCOUTFS_BLOCK_SIZE - off); } + +void scoutfs_block_set_lock_class(struct buffer_head *bh, + struct lock_class_key *class) +{ + struct block_bh_private *bhp = bh->b_private; + + if (bhp && !bhp->rwsem_class) { + lockdep_set_class(&bhp->rwsem, class); + bhp->rwsem_class = true; + } +} + +void scoutfs_block_lock(struct buffer_head *bh, bool write, int subclass) +{ + struct block_bh_private *bhp = bh->b_private; + + trace_printk("lock write %d bhp %p\n", write, bhp); + + if (bhp) { + if (write) + down_write_nested(&bhp->rwsem, subclass); + else + down_read_nested(&bhp->rwsem, subclass); + } +} + +void scoutfs_block_unlock(struct buffer_head *bh, bool write) +{ + struct block_bh_private *bhp = bh->b_private; + + trace_printk("unlock write %d bhp %p\n", write, bhp); + + if (bhp) { + if (write) + up_write(&bhp->rwsem); + else + up_read(&bhp->rwsem); + } +} diff --git a/kmod/src/block.h b/kmod/src/block.h index d431d90e..32ea0f38 100644 --- a/kmod/src/block.h +++ b/kmod/src/block.h @@ -19,6 +19,11 @@ int scoutfs_block_write_dirty(struct super_block *sb); void scoutfs_block_set_crc(struct buffer_head *bh); void scoutfs_block_zero(struct buffer_head *bh, size_t off); +void scoutfs_block_set_lock_class(struct buffer_head *bh, + struct lock_class_key *class); +void scoutfs_block_lock(struct buffer_head *bh, bool write, int subclass); +void scoutfs_block_unlock(struct buffer_head *bh, bool write); + /* XXX seems like this should be upstream :) */ static inline void *bh_data(struct buffer_head *bh) { diff --git a/kmod/src/btree.c b/kmod/src/btree.c index b3498858..fbd40f7c 100644 --- a/kmod/src/btree.c +++ b/kmod/src/btree.c @@ -417,6 +417,141 @@ static void compact_items(struct scoutfs_btree_block *bt) sort_key_cmp, sort_off_swap); } + +/* + * Let's talk about btree locking. + * + * The main metadata btree has lots of callers who want concurrency. + * They have their own locks that protect multi item consistency -- say + * an inode's i_mutex protecting the items related to a given inode. + * But it's our responsibility to lock the btree itself. + * + * Our btree operations are implemented with a single walk down the + * tree. This gives us the opportunity to cascade block locks down the + * tree. We first lock the root. Then we lock the first block and + * unlock the root. Then lock the next block and unlock the first + * block. And so on down the tree. Except for that brief transition + * function the btree walk always holds a single lock on either the root + * or a block at a level. After contention on the root and first block + * we have lots of concurrency down paths of the tree to the leaves. + * + * As we walk down the tree we have to split or merge. While we do this + * we hold the parent block lock. We have to also lock the sibling + * blocks. We always acquire them left to right to avoid deadlocks. + * + * The cow tree updates let us skip block locking entirely for stable + * blocks because they're read only. The block layer only has to worry + * about locking blocks that could be written to. While they're + * writable they have a buffer_head private that pins them in the + * transaction and we store the block lock there. The block layer + * ignores our locking attempts for read-only blocks. + * + * lockdep has to not be freaked out by all of this. The cascading + * block locks really make it angry without annotation so we add classes + * for each level and use nested subclasses for the locking of siblings + * during split and merge. + * + * We also use the btree API for the block allocator. This introduces + * nesting btree allocator calls inside main fs metadata btree calls. + * The locking would be safe as the blocks will never be in both trees + * but lockdep would think they're the same class and get raise + * warnings. We'd need to have tree level classes for all the trees. + * It turns out that the allocator has to maintain multi-item + * consistency across its entire tree so it has a tree-wide lock. We + * don't have to lock the btree at all when we're working on the + * allocator roots. They're the only non-metadata roots so far so we + * invert the test and only lock the btree when we're working on the + * main metadata btree root. + */ + +static void set_block_lock_class(struct buffer_head *bh, int level) +{ +#ifdef CONFIG_LOCKDEP + static struct lock_class_key tree_depth_classes[SCOUTFS_BTREE_MAX_DEPTH]; + + scoutfs_block_set_lock_class(bh, &tree_depth_classes[level]); +#endif +} + +static void lock_root(struct super_block *sb, struct scoutfs_btree_root *root, + bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (root == &sbi->super.btree_root) { + if (write) + down_write(&sbi->btree_rwsem); + else + down_read(&sbi->btree_rwsem); + } +} + +static void unlock_root(struct super_block *sb, bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (write) + up_write(&sbi->btree_rwsem); + else + up_read(&sbi->btree_rwsem); +} + +static void unlock_level(struct super_block *sb, + struct scoutfs_btree_root *root, + struct buffer_head *bh, bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (root == &sbi->super.btree_root) { + if (bh) + scoutfs_block_unlock(bh, write); + else + unlock_root(sb, write); + } +} + +static void lock_next_level(struct super_block *sb, + struct scoutfs_btree_root *root, + struct buffer_head *par_bh, + struct buffer_head *bh, bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (root == &sbi->super.btree_root) { + scoutfs_block_lock(bh, write, 0); + + unlock_level(sb, root, par_bh, write); + } +} + +static void lock_siblings(struct super_block *sb, + struct scoutfs_btree_root *root, + struct buffer_head *left, struct buffer_head *right, + bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (root == &sbi->super.btree_root) { + scoutfs_block_lock(left, write, 0); + scoutfs_block_lock(right, write, 1); + } +} + +static void unlock_siblings(struct super_block *sb, + struct scoutfs_btree_root *root, + struct buffer_head *left, struct buffer_head *right, + bool write) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + + if (root == &sbi->super.btree_root) { + scoutfs_block_unlock(left, write); + scoutfs_block_unlock(right, write); + } +} + + + /* sorting relies on masking pointers to find the containing block */ static inline struct buffer_head *check_bh_alignment(struct buffer_head *bh) { @@ -477,12 +612,14 @@ static struct buffer_head *grow_tree(struct super_block *sb, root->height++; root->ref.blkno = hdr->blkno; root->ref.seq = hdr->seq; + + set_block_lock_class(bh, root->height - 1); } return bh; } -static struct buffer_head *get_block_ref(struct super_block *sb, +static struct buffer_head *get_block_ref(struct super_block *sb, int level, struct scoutfs_block_ref *ref, bool dirty) { @@ -493,6 +630,9 @@ static struct buffer_head *get_block_ref(struct super_block *sb, else bh = scoutfs_block_read_ref(sb, ref); + if (!IS_ERR(bh)) + set_block_lock_class(bh, level); + return check_bh_alignment(bh); } @@ -549,6 +689,7 @@ static struct buffer_head *try_split(struct super_block *sb, struct buffer_head *par_bh = NULL; struct scoutfs_key maximal; unsigned int all_bytes; + bool swap_return = false; if (level) val_len = sizeof(struct scoutfs_block_ref); @@ -586,23 +727,28 @@ static struct buffer_head *try_split(struct super_block *sb, create_parent_item(parent, parent_pos, right, &maximal); } + lock_siblings(sb, root, left_bh, right_bh, true); + move_items(left, right, false, used_total(right) / 2); create_parent_item(parent, parent_pos, left, greatest_key(left)); parent_pos++; /* not that anything uses it again :P */ if (scoutfs_key_cmp(key, greatest_key(left)) <= 0) { /* insertion will go to the new left block */ - scoutfs_block_put(right_bh); - right_bh = left_bh; + swap_return = true; } else { - scoutfs_block_put(left_bh); - /* insertion will still go through us, might need to compact */ if (contig_free(right) < all_bytes) compact_items(right); } + unlock_siblings(sb, root, left_bh, right_bh, true); + + if (swap_return) + swap(right_bh, left_bh); + scoutfs_block_put(par_bh); + scoutfs_block_put(left_bh); return right_bh; } @@ -631,7 +777,7 @@ static struct buffer_head *try_split(struct super_block *sb, static struct buffer_head *try_merge(struct super_block *sb, struct scoutfs_btree_root *root, struct scoutfs_btree_block *parent, - unsigned int pos, + int level, unsigned int pos, struct buffer_head *bh) { struct scoutfs_btree_block *bt = bh_data(bh); @@ -655,7 +801,7 @@ static struct buffer_head *try_merge(struct super_block *sb, } sib_item = pos_item(parent, sib_pos); - sib_bh = get_block_ref(sb, (void *)sib_item->val, true); + sib_bh = get_block_ref(sb, level, (void *)sib_item->val, true); if (IS_ERR(sib_bh)) { /* XXX do we need to unlock this? don't think so */ scoutfs_block_put(bh); @@ -663,6 +809,11 @@ static struct buffer_head *try_merge(struct super_block *sb, } sib_bt = bh_data(sib_bh); + if (move_right) + lock_siblings(sb, root, sib_bh, bh, true); + else + lock_siblings(sb, root, bh, sib_bh, true); + if (used_total(sib_bt) <= reclaimable_free(bt)) to_move = used_total(sib_bt); else @@ -703,6 +854,11 @@ static struct buffer_head *try_merge(struct super_block *sb, free_tree_block(sb, parent->hdr.blkno); } + if (move_right) + unlock_siblings(sb, root, sib_bh, bh, true); + else + unlock_siblings(sb, root, bh, sib_bh, true); + scoutfs_block_put(sib_bh); return bh; @@ -716,44 +872,6 @@ enum { WALK_DIRTY, }; -static inline void lock_root(struct scoutfs_sb_info *sbi, bool dirty) -{ - if (dirty) - down_write(&sbi->btree_rwsem); - else - down_read(&sbi->btree_rwsem); -} - -static inline void unlock_root(struct scoutfs_sb_info *sbi, bool dirty) -{ - if (dirty) - up_write(&sbi->btree_rwsem); - else - up_read(&sbi->btree_rwsem); -} - -/* - * As we descend we lock parent blocks (or the root), then lock the child, - * then unlock the parent. - */ -static inline void lock_block(struct scoutfs_sb_info *sbi, - struct buffer_head *bh, bool dirty) -{ - if (bh == NULL) - lock_root(sbi, dirty); - else - lock_buffer(bh); -} - -static inline void unlock_block(struct scoutfs_sb_info *sbi, - struct buffer_head *bh, bool dirty) -{ - if (bh == NULL) - unlock_root(sbi, dirty); - else - unlock_buffer(bh); -} - static u64 item_block_ref_seq(struct scoutfs_btree_item *item) { struct scoutfs_block_ref *ref = (void *)item->val; @@ -914,7 +1032,6 @@ static struct buffer_head *btree_walk(struct super_block *sb, struct scoutfs_key *next_key, unsigned int val_len, u64 seq, int op) { - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_btree_block *parent = NULL; struct buffer_head *par_bh = NULL; struct buffer_head *bh = NULL; @@ -932,7 +1049,7 @@ static struct buffer_head *btree_walk(struct super_block *sb, if (next_key) scoutfs_set_max_key(next_key); - lock_block(sbi, par_bh, dirty); + lock_root(sb, root, dirty); ref = &root->ref; level = root->height; @@ -943,17 +1060,16 @@ static struct buffer_head *btree_walk(struct super_block *sb, } else { bh = grow_tree(sb, root); if (!IS_ERR(bh)) - lock_block(sbi, bh, dirty); + lock_next_level(sb, root, NULL, bh, dirty); } - unlock_block(sbi, par_bh, dirty); - return bh; + goto out; } /* skip the whole tree if the root ref's seq is old */ if (op == WALK_NEXT_SEQ && le64_to_cpu(ref->seq) < seq) { - unlock_block(sbi, par_bh, dirty); - return ERR_PTR(-ENOENT); + bh = ERR_PTR(-ENOENT); + goto out; } scoutfs_set_key(&small, 0, 0, 0); @@ -961,7 +1077,7 @@ static struct buffer_head *btree_walk(struct super_block *sb, while (level--) { /* XXX hmm, need to think about retry */ - bh = get_block_ref(sb, ref, dirty); + bh = get_block_ref(sb, level, ref, dirty); if (IS_ERR(bh)) break; @@ -977,17 +1093,15 @@ static struct buffer_head *btree_walk(struct super_block *sb, bh = try_split(sb, root, level, key, val_len, parent, pos, bh); if ((op == WALK_DELETE) && parent) - bh = try_merge(sb, root, parent, pos, bh); + bh = try_merge(sb, root, parent, level, pos, bh); if (IS_ERR(bh)) break; - lock_block(sbi, bh, dirty); + lock_next_level(sb, root, par_bh, bh, dirty); if (!level) break; - /* unlock parent before searching so others can use it */ - unlock_block(sbi, par_bh, dirty); scoutfs_block_put(par_bh); par_bh = bh; parent = bh_data(par_bh); @@ -1027,7 +1141,9 @@ static struct buffer_head *btree_walk(struct super_block *sb, large = item->key; } - unlock_block(sbi, par_bh, dirty); +out: + if (IS_ERR(bh)) + unlock_level(sb, root, par_bh, dirty); scoutfs_block_put(par_bh); return bh; @@ -1066,7 +1182,7 @@ int scoutfs_btree_lookup(struct super_block *sb, ret = -ENOENT; } - unlock_block(NULL, bh, false); + unlock_level(sb, root, bh, false); scoutfs_block_put(bh); trace_printk("key "CKF" ret %d\n", CKA(key), ret); @@ -1120,7 +1236,7 @@ int scoutfs_btree_insert(struct super_block *sb, ret = -EEXIST; } - unlock_block(NULL, bh, true); + unlock_level(sb, root, bh, true); scoutfs_block_put(bh); return ret; @@ -1166,7 +1282,7 @@ int scoutfs_btree_delete(struct super_block *sb, ret = -ENOENT; } - unlock_block(NULL, bh, true); + unlock_level(sb, root, bh, true); scoutfs_block_put(bh); out: @@ -1234,7 +1350,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root, pos = find_pos_after_seq(bt, &key, 0, seq, op); if (pos >= bt->nr_items) { key = next_key; - unlock_block(NULL, bh, false); + unlock_level(sb, root, bh, false); scoutfs_block_put(bh); continue; } @@ -1252,7 +1368,7 @@ static int btree_next(struct super_block *sb, struct scoutfs_btree_root *root, ret = -ENOENT; } - unlock_block(NULL, bh, false); + unlock_level(sb, root, bh, false); scoutfs_block_put(bh); break; } @@ -1311,7 +1427,7 @@ int scoutfs_btree_dirty(struct super_block *sb, ret = -ENOENT; } - unlock_block(NULL, bh, true); + unlock_level(sb, root, bh, true); scoutfs_block_put(bh); trace_printk("key "CKF" ret %d\n", CKA(key), ret); @@ -1353,7 +1469,7 @@ int scoutfs_btree_update(struct super_block *sb, ret = -ENOENT; } - unlock_block(NULL, bh, true); + unlock_level(sb, root, bh, true); scoutfs_block_put(bh); return ret; diff --git a/kmod/src/format.h b/kmod/src/format.h index 932a9d2b..fd0cbd62 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -156,6 +156,21 @@ struct scoutfs_btree_item { (member_sizeof(struct scoutfs_btree_block, item_offs[0]) + \ sizeof(struct scoutfs_btree_item))) +/* + * We can calculate the max tree depth by calculating how many leaf + * blocks the tree could reference. The block device can only reference + * 2^64 bytes. The tallest parent tree has half full parent blocks. + * + * So we have the relation: + * + * ceil(max_items / 2) ^ (max_depth - 1) >= 2^64 / block_size + * + * and solve for depth: + * + * max_depth = log(ceil(max_items / 2), 2^64 / block_size) + 1 + */ +#define SCOUTFS_BTREE_MAX_DEPTH 10 + #define SCOUTFS_UUID_BYTES 16 struct scoutfs_super_block { diff --git a/kmod/src/super.h b/kmod/src/super.h index 79c2168f..6f550d0c 100644 --- a/kmod/src/super.h +++ b/kmod/src/super.h @@ -27,7 +27,6 @@ struct scoutfs_sb_info { struct mutex buddy_mutex; atomic_t buddy_count; - /* XXX there will be a lot more of these :) */ struct rw_semaphore btree_rwsem; atomic_t trans_holds;