diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 97d71243..3a0d75cb 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "super.h" #include "lock.h" @@ -81,7 +82,7 @@ struct lock_info { spinlock_t lock; bool shutdown; bool unmounting; - struct rb_root lock_tree; + struct rhashtable ht; struct rb_root lock_range_tree; struct shrinker shrinker; struct list_head lru_list; @@ -266,7 +267,6 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(lock->users[SCOUTFS_LOCK_WRITE]); BUG_ON(lock->users[SCOUTFS_LOCK_WRITE_ONLY]); BUG_ON(!linfo->shutdown && lock->mode != SCOUTFS_LOCK_NULL); - BUG_ON(!RB_EMPTY_NODE(&lock->node)); BUG_ON(!RB_EMPTY_NODE(&lock->range_node)); BUG_ON(!list_empty(&lock->lru_head)); BUG_ON(!list_empty(&lock->inv_head)); @@ -274,7 +274,7 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) BUG_ON(!list_empty(&lock->cov_list)); kfree(lock->inode_deletion_data); - kfree(lock); + kfree_rcu(lock, rcu_head); } static struct scoutfs_lock *lock_alloc(struct super_block *sb, @@ -295,7 +295,6 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, atomic_set(&lock->refcount, 0); spin_lock_init(&lock->lock); - RB_CLEAR_NODE(&lock->node); RB_CLEAR_NODE(&lock->range_node); INIT_LIST_HEAD(&lock->lru_head); INIT_LIST_HEAD(&lock->inv_head); @@ -434,47 +433,55 @@ static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scout return next; } +static const struct rhashtable_params lock_ht_params = { + .key_len = member_sizeof(struct scoutfs_lock, start), + .key_offset = offsetof(struct scoutfs_lock, start), + .head_offset = offsetof(struct scoutfs_lock, ht_head), +}; + /* - * Insert a lock into the lookup rbtree by its start key. If another - * lock is already present then don't insert and return it instead. - * Return null if we couldn't insert the lock because it overlaps with - * an existing lock, and finally return the inserted lock on success. + * Insert a lock into the lookup hash table, keyed by its start key. If + * another lock is already present then we return eexist and the caller + * will retry. The locks are inserted with a 0 refcount so that they + * won't be used until they've been inserted into the range tree without + * overlaps. */ -static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_lock *ins) +static int lock_insert(struct super_block *sb, struct scoutfs_lock *lock) { DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *lock; - struct rb_node *parent; - struct rb_node **node; - int cmp; + int ret; - assert_spin_locked(&linfo->lock); + if (WARN_ON_ONCE(atomic_read(&lock->refcount) != 0)) + return -EINVAL; - node = &linfo->lock_tree.rb_node; - parent = NULL; - while (*node) { - parent = *node; - lock = container_of(*node, struct scoutfs_lock, node); - - cmp = scoutfs_key_compare(&ins->start, &lock->start); - if (cmp < 0) - node = &(*node)->rb_left; - else if (cmp > 0) - node = &(*node)->rb_right; - else - return lock; +retry: + ret = rhashtable_lookup_insert_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + if (ret < 0) { + if (ret == -EBUSY) { + /* wait for pending rebalance to finish */ + synchronize_rcu(); + goto retry; + } } - if (!insert_lock_range(sb, ins)) - return NULL; + if (ret == 0) { + spin_lock(&linfo->lock); - rb_link_node(&ins->node, parent, node); - rb_insert_color(&ins->node, &linfo->lock_tree); - scoutfs_tseq_add(&linfo->tseq_tree, &ins->tseq_entry); - __lock_add_lru(linfo, ins); - atomic_add(2, &ins->refcount); + if (!insert_lock_range(sb, lock)) { + ret = -EINVAL; + } else { + scoutfs_tseq_add(&linfo->tseq_tree, &lock->tseq_entry); + __lock_add_lru(linfo, lock); + atomic_add(2, &lock->refcount); + } - return ins; + spin_unlock(&linfo->lock); + + if (ret < 0) + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + } + + return ret; } /* @@ -485,49 +492,18 @@ static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_l */ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); - WARN_ON_ONCE(atomic_read(&lock->refcount) != 1); - rb_erase(&lock->node, &linfo->lock_tree); - RB_CLEAR_NODE(&lock->node); + rhashtable_remove_fast(&linfo->ht, &lock->ht_head, lock_ht_params); + + spin_lock(&linfo->lock); rb_erase(&lock->range_node, &linfo->lock_range_tree); RB_CLEAR_NODE(&lock->range_node); + __lock_del_lru(linfo, lock); + spin_unlock(&linfo->lock); scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); - __lock_del_lru(linfo, lock); -} -static struct scoutfs_lock *lock_lookup(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_lock **next) -{ - DECLARE_LOCK_INFO(sb, linfo); - struct rb_node *node = linfo->lock_tree.rb_node; - struct scoutfs_lock *lock; - int cmp; - - assert_spin_locked(&linfo->lock); - - if (next) - *next = NULL; - - while (node) { - lock = container_of(node, struct scoutfs_lock, node); - - cmp = scoutfs_key_compare(start, &lock->start); - if (cmp < 0) { - if (next) - *next = lock; - node = node->rb_left; - } else if (cmp > 0) { - node = node->rb_right; - } else { - return lock; - } - } - - return NULL; } /* should be in the core */ @@ -578,9 +554,7 @@ static bool try_remove_null_lock(struct lock_info *linfo, struct scoutfs_lock *l if (lock && lock->mode == SCOUTFS_LOCK_NULL && atomic_cmpxchg(&lock->refcount, 3, 1) == 3) { - spin_lock(&linfo->lock); lock_remove(linfo, lock); - spin_unlock(&linfo->lock); return true; } @@ -596,53 +570,55 @@ static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; - spin_lock(&linfo->lock); - lock = lock_lookup(sb, start, NULL); - if (lock) { + rcu_read_lock(); + lock = rhashtable_lookup(&linfo->ht, start, lock_ht_params); + if (lock) lock = get_lock(lock); - if (lock) { - __lock_del_lru(linfo, lock); - __lock_add_lru(linfo, lock); - } + rcu_read_unlock(); + + if (lock) { + spin_lock(&linfo->lock); + __lock_del_lru(linfo, lock); + __lock_add_lru(linfo, lock); + spin_unlock(&linfo->lock); } - spin_unlock(&linfo->lock); return lock; } /* * Find a lock, allocating and inserting a new lock if it doesn't exist. + * Concurrent insertion attempts that fail with eexist will retry + * finding the lock. This can return hard errors from insertion. */ -static struct scoutfs_lock *find_or_alloc_lock(struct super_block *sb, - struct scoutfs_key *start, struct scoutfs_key *end) +static int find_or_alloc_lock(struct super_block *sb, struct scoutfs_key *start, + struct scoutfs_key *end, struct scoutfs_lock **lock_ret) { DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *found; struct scoutfs_lock *lock; struct scoutfs_lock *ins; + int ret = 0; + + while (!(lock = find_lock(sb, start))) { -retry: - lock = find_lock(sb, start); - while (!lock) { ins = lock_alloc(sb, start, end); - if (!ins) + if (!ins) { + ret = -ENOMEM; break; - - spin_lock(&linfo->lock); - found = lock_insert(sb, ins); - lock = found ? get_lock(found) : NULL; - spin_unlock(&linfo->lock); - - if (lock != ins) - lock_free(linfo, ins); - - if (found && !lock) { - cpu_relax(); - goto retry; } + + ret = lock_insert(sb, ins); + if (ret < 0) { + lock_free(linfo, ins); + if (ret != -EEXIST) + break; + } + + cpu_relax(); } - return lock; + *lock_ret = lock; + return ret; } static bool put_lock(struct lock_info *linfo, struct scoutfs_lock *lock) @@ -1071,9 +1047,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (WARN_ON_ONCE(scoutfs_trans_held())) return -EDEADLK; - lock = find_or_alloc_lock(sb, start, end); - if (!lock) - return -ENOMEM; + ret = find_or_alloc_lock(sb, start, end, &lock); + if (ret < 0) + return ret; spin_lock(&lock->lock); @@ -1710,8 +1686,8 @@ void scoutfs_lock_shutdown(struct super_block *sb) /* cause current and future lock calls to return errors */ spin_lock(&linfo->lock); linfo->shutdown = true; - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); wake_up(&lock->waitq); } spin_unlock(&linfo->lock); @@ -1746,8 +1722,8 @@ void scoutfs_lock_destroy(struct super_block *sb) /* make sure that no one's actively using locks */ spin_lock(&linfo->lock); - for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { - lock = rb_entry(node, struct scoutfs_lock, node); + for (node = rb_first(&linfo->lock_range_tree); node; node = rb_next(node)) { + lock = rb_entry(node, struct scoutfs_lock, range_node); for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { if (lock->waiters[mode] || lock->users[mode]) { @@ -1774,9 +1750,9 @@ void scoutfs_lock_destroy(struct super_block *sb) * drop references for any pending work that we've canceled so * that we can tear down the locks. */ - node = rb_first(&linfo->lock_tree); + node = rb_first(&linfo->lock_range_tree); while (node) { - lock = rb_entry(node, struct scoutfs_lock, node); + lock = rb_entry(node, struct scoutfs_lock, range_node); node = rb_next(node); atomic_inc(&lock->refcount); @@ -1809,6 +1785,8 @@ void scoutfs_lock_destroy(struct super_block *sb) WARN_ON_ONCE(!put_lock(linfo, lock)); } + rhashtable_destroy(&linfo->ht); + kfree(linfo); sbi->lock_info = NULL; } @@ -1825,7 +1803,6 @@ int scoutfs_lock_setup(struct super_block *sb) linfo->sb = sb; spin_lock_init(&linfo->lock); - linfo->lock_tree = RB_ROOT; linfo->lock_range_tree = RB_ROOT; linfo->shrinker.shrink = scoutfs_lock_shrink; linfo->shrinker.seeks = DEFAULT_SEEKS; @@ -1839,6 +1816,12 @@ int scoutfs_lock_setup(struct super_block *sb) sbi->lock_info = linfo; trace_scoutfs_lock_setup(sb, linfo); + ret = rhashtable_init(&linfo->ht, &lock_ht_params); + if (ret < 0) { + kfree(linfo); + return -ENOMEM; + } + linfo->tseq_dentry = scoutfs_tseq_create("client_locks", sbi->debug_root, &linfo->tseq_tree); diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 6f83c793..a7c9c0fa 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -1,6 +1,8 @@ #ifndef _SCOUTFS_LOCK_H_ #define _SCOUTFS_LOCK_H_ +#include + #include "key.h" #include "tseq.h" @@ -21,9 +23,10 @@ struct scoutfs_lock { struct super_block *sb; atomic_t refcount; spinlock_t lock; + struct rcu_head rcu_head; struct scoutfs_key start; struct scoutfs_key end; - struct rb_node node; + struct rhash_head ht_head; struct rb_node range_node; u64 refresh_gen; u64 write_seq;