Add per-cluster lock spinlock

Add a spinlock to the scoutfs_lock cluster lock which protects its
state.   This replaces the use of the mount-wide lock_info spinlock.

In practice, for now, this largely just mirrors the continued use of the
lock_info spinlock because it's still needed to protect the mount-wide
structures that are used during put_lock.   That'll be fixed in future
patches as the use of global structures is reduced.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2022-05-20 09:45:51 -07:00
parent cf046f26c4
commit f30e545870
2 changed files with 38 additions and 13 deletions

View File

@@ -253,12 +253,9 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock)
{
struct super_block *sb = lock->sb;
assert_spin_locked(&linfo->lock);
trace_scoutfs_lock_free(sb, lock);
scoutfs_inc_counter(sb, lock_free);
/* manually checking lock_idle gives identifying line numbers */
BUG_ON(lock->request_pending);
BUG_ON(lock->invalidate_pending);
BUG_ON(lock->waiters[SCOUTFS_LOCK_READ]);
@@ -295,6 +292,7 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb,
scoutfs_inc_counter(sb, lock_alloc);
spin_lock_init(&lock->lock);
RB_CLEAR_NODE(&lock->node);
RB_CLEAR_NODE(&lock->range_node);
INIT_LIST_HEAD(&lock->lru_head);
@@ -633,6 +631,8 @@ int scoutfs_lock_grant_response(struct super_block *sb,
bug_on_inconsistent_grant_cache(sb, lock, nl->old_mode, nl->new_mode);
spin_lock(&lock->lock);
if (!lock_mode_can_read(nl->old_mode) && lock_mode_can_read(nl->new_mode))
lock->refresh_gen = atomic64_inc_return(&linfo->next_refresh_gen);
@@ -641,6 +641,8 @@ int scoutfs_lock_grant_response(struct super_block *sb,
lock->write_seq = le64_to_cpu(nl->write_seq);
trace_scoutfs_lock_granted(sb, lock);
spin_unlock(&lock->lock);
wake_up(&lock->waitq);
put_lock(linfo, lock);
@@ -696,19 +698,21 @@ static void lock_invalidate_worker(struct work_struct *work)
spin_lock(&linfo->inv_wlist.lock);
list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) {
spin_lock(&lock->lock);
ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head);
nl = &ireq->nl;
/* wait until incompatible holders unlock */
if (!lock_counts_match(nl->new_mode, lock->users))
continue;
if (lock_counts_match(nl->new_mode, lock->users)) {
/* set the new mode, no incompatible users during inval, recov needs old */
lock->invalidating_mode = lock->mode;
lock->mode = nl->new_mode;
/* set the new mode, no incompatible users during inval, recov needs old */
lock->invalidating_mode = lock->mode;
lock->mode = nl->new_mode;
/* move everyone that's ready to our private list */
list_move_tail(&lock->inv_head, &ready);
}
/* move everyone that's ready to our private list */
list_move_tail(&lock->inv_head, &ready);
spin_unlock(&lock->lock);
}
spin_unlock(&linfo->inv_wlist.lock);
@@ -742,6 +746,7 @@ static void lock_invalidate_worker(struct work_struct *work)
spin_lock(&linfo->inv_wlist.lock);
list_for_each_entry_safe(lock, tmp, &ready, inv_head) {
spin_lock(&lock->lock);
ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head);
trace_scoutfs_lock_invalidated(sb, lock);
@@ -762,6 +767,7 @@ static void lock_invalidate_worker(struct work_struct *work)
queue_nonempty_work_list(linfo, &linfo->inv_wlist);
}
spin_unlock(&lock->lock);
put_lock(linfo, lock);
}
@@ -808,6 +814,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id,
spin_lock(&linfo->lock);
lock = get_lock(sb, &nl->key);
if (lock) {
spin_lock(&lock->lock);
trace_scoutfs_lock_invalidate_request(sb, lock);
ireq->lock = lock;
ireq->net_id = net_id;
@@ -820,6 +827,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id,
spin_unlock(&linfo->inv_wlist.lock);
}
list_add_tail(&ireq->head, &lock->inv_req_list);
spin_unlock(&lock->lock);
}
spin_unlock(&linfo->lock);
@@ -865,6 +873,8 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id,
for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) {
spin_lock(&lock->lock);
if (lock->invalidating_mode != SCOUTFS_LOCK_NULL)
mode = lock->invalidating_mode;
else
@@ -875,6 +885,8 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id,
nlr->locks[i].old_mode = mode;
nlr->locks[i].new_mode = mode;
spin_unlock(&lock->lock);
node = rb_next(&lock->node);
if (node)
lock = rb_entry(node, struct scoutfs_lock, node);
@@ -897,10 +909,10 @@ static bool lock_wait_cond(struct super_block *sb, struct scoutfs_lock *lock,
DECLARE_LOCK_INFO(sb, linfo);
bool wake;
spin_lock(&linfo->lock);
spin_lock(&lock->lock);
wake = linfo->shutdown || lock_modes_match(lock->mode, mode) ||
!lock->request_pending;
spin_unlock(&linfo->lock);
spin_unlock(&lock->lock);
if (!wake)
scoutfs_inc_counter(sb, lock_wait);
@@ -962,6 +974,8 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i
goto out_unlock;
}
spin_lock(&lock->lock);
/* the waiters count is only used by debugging output */
lock_inc_count(lock->waiters, mode);
@@ -993,6 +1007,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i
should_send = false;
}
spin_unlock(&lock->lock);
spin_unlock(&linfo->lock);
if (should_send) {
@@ -1003,6 +1018,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i
ret = scoutfs_client_lock_request(sb, &nl);
if (ret) {
spin_lock(&linfo->lock);
spin_lock(&lock->lock);
lock->request_pending = 0;
break;
}
@@ -1020,6 +1036,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i
}
spin_lock(&linfo->lock);
spin_lock(&lock->lock);
if (ret)
break;
}
@@ -1028,6 +1045,8 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i
if (ret == 0)
trace_scoutfs_lock_locked(sb, lock);
spin_unlock(&lock->lock);
wake_up(&lock->waitq);
put_lock(linfo, lock);
@@ -1286,18 +1305,20 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou
scoutfs_inc_counter(sb, lock_unlock);
spin_lock(&linfo->lock);
spin_lock(&lock->lock);
lock_dec_count(lock->users, mode);
if (lock_mode_can_write(mode))
lock->dirty_trans_seq = scoutfs_trans_sample_seq(sb);
trace_scoutfs_lock_unlock(sb, lock);
wake_up(&lock->waitq);
spin_lock(&linfo->inv_wlist.lock);
queue_nonempty_work_list(linfo, &linfo->inv_wlist);
spin_unlock(&linfo->inv_wlist.lock);
spin_unlock(&lock->lock);
wake_up(&lock->waitq);
put_lock(linfo, lock);
spin_unlock(&linfo->lock);
@@ -1418,8 +1439,11 @@ static void lock_shrink_worker(struct work_struct *work)
scoutfs_inc_counter(sb, lock_shrink_aborted);
spin_lock(&linfo->lock);
spin_lock(&lock->lock);
lock->request_pending = 0;
spin_unlock(&lock->lock);
wake_up(&lock->waitq);
put_lock(linfo, lock);

View File

@@ -19,6 +19,7 @@ struct inode_deletion_lock_data;
*/
struct scoutfs_lock {
struct super_block *sb;
spinlock_t lock;
struct scoutfs_key start;
struct scoutfs_key end;
struct rb_node node;