diff --git a/kmod/src/inode.c b/kmod/src/inode.c index 3553c520..0681850e 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -482,7 +482,7 @@ int scoutfs_complete_truncate(struct inode *inode, struct scoutfs_lock *lock) } /* - * If we're changing the file size than the contents of the file are + * If we're changing the file size then the contents of the file are * changing and we increment the data_version. This would prevent * staging because the data_version is per-inode today, not per-extent. * So if there are any offline extents within the new size then we need diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 1be0c987..a0daa3fd 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -253,6 +253,7 @@ static void lock_free(struct lock_info *linfo, struct scoutfs_lock *lock) trace_scoutfs_lock_free(sb, lock); scoutfs_inc_counter(sb, lock_free); + BUG_ON(atomic_read(&lock->refcount) != 0); BUG_ON(lock->request_pending); BUG_ON(lock->invalidate_pending); BUG_ON(lock->waiters[SCOUTFS_LOCK_READ]); @@ -289,6 +290,7 @@ static struct scoutfs_lock *lock_alloc(struct super_block *sb, scoutfs_inc_counter(sb, lock_alloc); + atomic_set(&lock->refcount, 0); spin_lock_init(&lock->lock); RB_CLEAR_NODE(&lock->node); RB_CLEAR_NODE(&lock->range_node); @@ -342,26 +344,32 @@ static bool lock_counts_match(int granted, unsigned int *counts) return true; } -/* - * An idle lock has nothing going on. It can be present in the lru and - * can be freed by the final put when it has a null mode. - */ -static bool lock_idle(struct scoutfs_lock *lock) +static void __lock_add_lru(struct lock_info *linfo, struct scoutfs_lock *lock) { - enum scoutfs_lock_mode mode; + assert_spin_locked(&linfo->lock); - if (lock->request_pending || lock->invalidate_pending) - return false; - - for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) { - if (lock->waiters[mode] || lock->users[mode]) - return false; + if (list_empty(&lock->lru_head)) { + list_add_tail(&lock->lru_head, &linfo->lru_list); + linfo->lru_nr++; } - - return true; } -static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) +static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) +{ + assert_spin_locked(&linfo->lock); + + if (!list_empty(&lock->lru_head)) { + list_del_init(&lock->lru_head); + linfo->lru_nr--; + } +} + +/* + * Insert the lock into the tree that tracks their non-overlapping key + * ranges. Warn if we see an attempt to insert a lock that overlaps + * with an existing lock that isn't being freed. + */ +static int insert_lock_range(struct super_block *sb, struct scoutfs_lock *ins) { DECLARE_LOCK_INFO(sb, linfo); struct rb_root *root = &linfo->lock_range_tree; @@ -370,17 +378,28 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) struct scoutfs_lock *lock; int cmp; + assert_spin_locked(&linfo->lock); + while (*node) { parent = *node; lock = container_of(*node, struct scoutfs_lock, range_node); cmp = scoutfs_key_compare_ranges(&ins->start, &ins->end, &lock->start, &lock->end); - if (WARN_ON_ONCE(cmp == 0)) { - scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, + if (cmp == 0) { + if (WARN_ON_ONCE(atomic_read(&lock->refcount) >= 2)) { + /* Overlap with an in-use lock */ + scoutfs_warn(sb, "inserting lock start "SK_FMT" end "SK_FMT" overlaps with existing lock start "SK_FMT" end "SK_FMT, SK_ARG(&ins->start), SK_ARG(&ins->end), SK_ARG(&lock->start), SK_ARG(&lock->end)); - return false; + return -EINVAL; + } else { + /* + * Overlap with a lock that's being freed. Tell + * the caller to retry. + */ + return -EEXIST; + } } if (cmp < 0) @@ -389,15 +408,47 @@ static bool insert_range_node(struct super_block *sb, struct scoutfs_lock *ins) node = &(*node)->rb_right; } - rb_link_node(&ins->range_node, parent, node); rb_insert_color(&ins->range_node, root); - return true; + return 0; } -/* returns true if the lock was inserted at its start key */ -static bool lock_insert(struct super_block *sb, struct scoutfs_lock *ins) +static struct scoutfs_lock *next_lock_range(struct super_block *sb, struct scoutfs_key *key) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct rb_node *node = linfo->lock_range_tree.rb_node; + struct scoutfs_lock *next = NULL; + struct scoutfs_lock *lock; + int cmp; + + assert_spin_locked(&linfo->lock); + + while (node) { + lock = container_of(node, struct scoutfs_lock, range_node); + + cmp = scoutfs_key_compare(key, &lock->start); + if (cmp < 0) { + next = lock; + node = node->rb_left; + } else if (cmp > 0) { + node = node->rb_right; + } else { + return lock; + } + } + + return next; +} + +/* + * Insert a lock into the lookup hash table, keyed by its start key. If + * another lock is already present then we return EEXIST and the caller + * will retry. The locks are inserted with a 0 refcount so that they + * won't be used until they've been inserted into the range tree without + * overlaps. + */ +static struct scoutfs_lock *lock_insert(struct super_block *sb, struct scoutfs_lock *ins) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; @@ -419,24 +470,33 @@ static bool lock_insert(struct super_block *sb, struct scoutfs_lock *ins) else if (cmp > 0) node = &(*node)->rb_right; else - return false; + return lock; } - if (!insert_range_node(sb, ins)) - return false; + if (insert_lock_range(sb, ins) != 0) + return NULL; rb_link_node(&ins->node, parent, node); rb_insert_color(&ins->node, &linfo->lock_tree); - scoutfs_tseq_add(&linfo->tseq_tree, &ins->tseq_entry); + __lock_add_lru(linfo, ins); + atomic_add(2, &ins->refcount); - return true; + return ins; } +/* + * Remove the lock from all the active indexes. The caller has already + * established the exclusive ability to remove by atomically removing + * the 2 refs that were added by insertion. There should be no more + * references once those refs were removed. + */ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) { assert_spin_locked(&linfo->lock); + WARN_ON_ONCE(atomic_read(&lock->refcount) != 1); + rb_erase(&lock->node, &linfo->lock_tree); RB_CLEAR_NODE(&lock->node); rb_erase(&lock->range_node, &linfo->lock_range_tree); @@ -445,117 +505,129 @@ static void lock_remove(struct lock_info *linfo, struct scoutfs_lock *lock) scoutfs_tseq_del(&linfo->tseq_tree, &lock->tseq_entry); } -static struct scoutfs_lock *lock_lookup(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_lock **next) +/* should be in the core */ +static int atomic_add_unless_lessthan(atomic_t *v, int a, int u) { - DECLARE_LOCK_INFO(sb, linfo); - struct rb_node *node = linfo->lock_tree.rb_node; - struct scoutfs_lock *lock; - int cmp; + int c, old; - assert_spin_locked(&linfo->lock); + c = atomic_read(v); + for (;;) { + if (unlikely(c < (u))) + break; + old = atomic_cmpxchg((v), c, c + (a)); + if (likely(old == c)) + return 1; + c = old; + } - if (next) - *next = NULL; + return 0; +} - while (node) { - lock = container_of(node, struct scoutfs_lock, node); - - cmp = scoutfs_key_compare(start, &lock->start); - if (cmp < 0) { - if (next) - *next = lock; - node = node->rb_left; - } else if (cmp > 0) { - node = node->rb_right; - } else { - return lock; - } - } +/* + * Get a reference to a lock that's still active and present in the + * lookup index. + */ +static struct scoutfs_lock *get_lock(struct scoutfs_lock *lock) +{ + if (lock && atomic_add_unless_lessthan(&lock->refcount, 1, 2)) + return lock; return NULL; } -static void __lock_del_lru(struct lock_info *linfo, struct scoutfs_lock *lock) -{ - assert_spin_locked(&linfo->lock); - - if (!list_empty(&lock->lru_head)) { - list_del_init(&lock->lru_head); - linfo->lru_nr--; - } -} - /* - * Get a lock and remove it from the lru. The caller must set state on - * the lock that indicates that it's busy before dropping the lock. - * Then later they call add_lru_or_free once they've cleared that state. + * The caller has a referenced lock and is holding its spinlock. If + * it's null, and we're the only user, and we're able to atomically + * remove the 2 refs for its presence in the lookup index, then we can + * lock the lookup index and remove it. This creates a window where the + * lock is in the index but won't allow new references, lookups and + * insertions need to be careful. + * + * This nests the global linfo spinlock under the per-lock spinlock only + * to keep callers from having to free on the other side of dropping + * the refs and unlocking the lock's spinlock. */ -static struct scoutfs_lock *get_lock(struct super_block *sb, - struct scoutfs_key *start) +static bool try_remove_null_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *lock; + assert_spin_locked(&lock->lock); - assert_spin_locked(&linfo->lock); - - lock = lock_lookup(sb, start, NULL); - if (lock) - __lock_del_lru(linfo, lock); - - return lock; -} - -/* - * Get a lock, creating it if it doesn't exist. The caller must treat - * the lock like it came from get lock (mark sate, drop lock, clear - * state, put lock). Allocated locks aren't on the lru. - */ -static struct scoutfs_lock *create_lock(struct super_block *sb, - struct scoutfs_key *start, - struct scoutfs_key *end) -{ - DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *lock; - - assert_spin_locked(&linfo->lock); - - lock = get_lock(sb, start); - if (!lock) { - spin_unlock(&linfo->lock); - lock = lock_alloc(sb, start, end); + if (lock && lock->mode == SCOUTFS_LOCK_NULL && + atomic_cmpxchg(&lock->refcount, 3, 1) == 3) { spin_lock(&linfo->lock); + lock_remove(linfo, lock); + spin_unlock(&linfo->lock); + return true; + } + return false; +} + +/* + * Search for a lock by its key in the lookup index and return with a + * reference held. + */ +static struct scoutfs_lock *find_lock(struct super_block *sb, struct scoutfs_key *start) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct scoutfs_lock *lock; + + spin_lock(&linfo->lock); + lock = lock_lookup(sb, start, NULL); + if (lock) { + lock = get_lock(lock); if (lock) { - if (!lock_insert(sb, lock)) { - lock_free(linfo, lock); - lock = get_lock(sb, start); - } + __lock_del_lru(linfo, lock); + __lock_add_lru(linfo, lock); + } + } + spin_unlock(&linfo->lock); + + return lock; +} + +/* + * Find a lock, allocating and inserting a new lock if it doesn't exist. + */ +static struct scoutfs_lock *find_or_alloc_lock(struct super_block *sb, + struct scoutfs_key *start, struct scoutfs_key *end) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct scoutfs_lock *found; + struct scoutfs_lock *lock; + struct scoutfs_lock *ins; + +retry: + lock = find_lock(sb, start); + while (!lock) { + ins = lock_alloc(sb, start, end); + if (!ins) + break; + + spin_lock(&linfo->lock); + found = lock_insert(sb, ins); + lock = found ? get_lock(found) : NULL; + spin_unlock(&linfo->lock); + + if (lock != ins) + lock_free(linfo, ins); + + if (found && !lock) { + cpu_relax(); + goto retry; } } return lock; } -/* - * The caller is done using a lock and has cleared state that used to - * indicate that the lock wasn't idle. If it really is idle then we - * either free it if it's null or put it back on the lru. - */ -static void put_lock(struct lock_info *linfo,struct scoutfs_lock *lock) +static bool put_lock(struct lock_info *linfo, struct scoutfs_lock *lock) { - assert_spin_locked(&linfo->lock); - - if (lock_idle(lock)) { - if (lock->mode != SCOUTFS_LOCK_NULL) { - list_add_tail(&lock->lru_head, &linfo->lru_list); - linfo->lru_nr++; - } else { - lock_remove(linfo, lock); - lock_free(linfo, lock); - } + if (lock && atomic_dec_and_test(&lock->refcount)) { + lock_free(linfo, lock); + return true; } + + return false; } /* @@ -589,7 +661,7 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, if (dirty || (cached && (!lock_mode_can_read(old_mode) || !lock_mode_can_read(new_mode)))) { - scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u", + scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u refcnt %d", cached, dirty, old_mode, new_mode, SK_ARG(&lock->start), SK_ARG(&lock->end), lock->refresh_gen, lock->mode, lock->waiters[SCOUTFS_LOCK_READ], @@ -597,7 +669,8 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb, lock->waiters[SCOUTFS_LOCK_WRITE_ONLY], lock->users[SCOUTFS_LOCK_READ], lock->users[SCOUTFS_LOCK_WRITE], - lock->users[SCOUTFS_LOCK_WRITE_ONLY]); + lock->users[SCOUTFS_LOCK_WRITE_ONLY], + atomic_read(&lock->refcount)); BUG(); } } @@ -619,10 +692,8 @@ int scoutfs_lock_grant_response(struct super_block *sb, scoutfs_inc_counter(sb, lock_grant_response); - spin_lock(&linfo->lock); - /* lock must already be busy with request_pending */ - lock = lock_lookup(sb, &nl->key, NULL); + lock = find_lock(sb, &nl->key); BUG_ON(!lock); trace_scoutfs_lock_grant_response(sb, lock); BUG_ON(!lock->request_pending); @@ -635,17 +706,17 @@ int scoutfs_lock_grant_response(struct super_block *sb, lock->refresh_gen = atomic64_inc_return(&linfo->next_refresh_gen); lock->request_pending = 0; + put_lock(linfo, lock); lock->mode = nl->new_mode; lock->write_seq = le64_to_cpu(nl->write_seq); trace_scoutfs_lock_granted(sb, lock); + try_remove_null_lock(linfo, lock); spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - spin_unlock(&linfo->lock); - return 0; } @@ -692,11 +763,17 @@ static void lock_invalidate_worker(struct work_struct *work) scoutfs_inc_counter(sb, lock_invalidate_work); - spin_lock(&linfo->lock); +retry: spin_lock(&linfo->inv_wlist.lock); list_for_each_entry_safe(lock, tmp, &linfo->inv_wlist.list, inv_head) { - spin_lock(&lock->lock); + /* inversion, usually we get the inv spinlock under the lock spinlock */ + if (!spin_trylock(&lock->lock)) { + spin_unlock(&linfo->inv_wlist.lock); + cpu_relax(); + goto retry; + } + ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; @@ -714,15 +791,16 @@ static void lock_invalidate_worker(struct work_struct *work) } spin_unlock(&linfo->inv_wlist.lock); - spin_unlock(&linfo->lock); if (list_empty(&ready)) return; - /* invalidate once the lock is read */ + /* invalidate once the lock is ready */ list_for_each_entry(lock, &ready, inv_head) { + spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); nl = &ireq->nl; + spin_unlock(&lock->lock); /* only lock protocol, inv can't call subsystems after shutdown */ if (!linfo->shutdown) { @@ -740,9 +818,6 @@ static void lock_invalidate_worker(struct work_struct *work) } /* and finish all the invalidated locks */ - spin_lock(&linfo->lock); - spin_lock(&linfo->inv_wlist.lock); - list_for_each_entry_safe(lock, tmp, &ready, inv_head) { spin_lock(&lock->lock); ireq = list_first_entry(&lock->inv_req_list, struct inv_req, head); @@ -761,16 +836,16 @@ static void lock_invalidate_worker(struct work_struct *work) wake_up(&lock->waitq); } else { /* another request arrived, back on the list and requeue */ + spin_lock(&linfo->inv_wlist.lock); list_move_tail(&lock->inv_head, &linfo->inv_wlist.list); queue_nonempty_work_list(linfo, &linfo->inv_wlist); + spin_unlock(&linfo->inv_wlist.lock); } + try_remove_null_lock(linfo, lock); spin_unlock(&lock->lock); put_lock(linfo, lock); } - - spin_unlock(&linfo->inv_wlist.lock); - spin_unlock(&linfo->lock); } /* @@ -809,8 +884,7 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, goto out; } - spin_lock(&linfo->lock); - lock = get_lock(sb, &nl->key); + lock = find_lock(sb, &nl->key); if (lock) { spin_lock(&lock->lock); trace_scoutfs_lock_invalidate_request(sb, lock); @@ -825,9 +899,10 @@ int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, spin_unlock(&linfo->inv_wlist.lock); } list_add_tail(&ireq->head, &lock->inv_req_list); + get_lock(lock); spin_unlock(&lock->lock); + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); out: if (!lock) { @@ -851,9 +926,9 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_net_lock_recover *nlr; enum scoutfs_lock_mode mode; + struct scoutfs_lock *found; struct scoutfs_lock *lock; - struct scoutfs_lock *next; - struct rb_node *node; + struct scoutfs_key pos; int ret; int i; @@ -865,11 +940,24 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, if (!nlr) return -ENOMEM; - spin_lock(&linfo->lock); + pos = *key; - lock = lock_lookup(sb, key, &next) ?: next; + for (i = 0; i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { - for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) { + spin_lock(&linfo->lock); + found = next_lock_range(sb, &pos); + lock = found ? get_lock(found) : NULL; + spin_unlock(&linfo->lock); + + /* retry to avoid freeing locks */ + if (found && !lock) { + cpu_relax(); + i--; + continue; + } + + if (lock == NULL) + break; spin_lock(&lock->lock); @@ -883,19 +971,15 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id, nlr->locks[i].old_mode = mode; nlr->locks[i].new_mode = mode; - spin_unlock(&lock->lock); + pos = lock->start; + scoutfs_key_inc(&pos); - node = rb_next(&lock->node); - if (node) - lock = rb_entry(node, struct scoutfs_lock, node); - else - lock = NULL; + spin_unlock(&lock->lock); + put_lock(linfo, lock); } nlr->nr = cpu_to_le16(i); - spin_unlock(&linfo->lock); - ret = scoutfs_client_lock_recover_response(sb, net_id, nlr); kfree(nlr); return ret; @@ -963,14 +1047,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (WARN_ON_ONCE(scoutfs_trans_held())) return -EDEADLK; - spin_lock(&linfo->lock); - - /* drops and re-acquires lock if it allocates */ - lock = create_lock(sb, start, end); - if (!lock) { - ret = -ENOMEM; - goto out_unlock; - } + lock = find_or_alloc_lock(sb, start, end); + if (!lock) + return -ENOMEM; spin_lock(&lock->lock); @@ -986,6 +1065,7 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i /* the fast path where we can use the granted mode */ if (lock_modes_match(lock->mode, mode)) { lock_inc_count(lock->users, mode); + get_lock(lock); *ret_lock = lock; ret = 0; break; @@ -1000,13 +1080,13 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i if (!lock->request_pending) { lock->request_pending = 1; + get_lock(lock); should_send = true; } else { should_send = false; } spin_unlock(&lock->lock); - spin_unlock(&linfo->lock); if (should_send) { nl.key = lock->start; @@ -1015,9 +1095,9 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = scoutfs_client_lock_request(sb, &nl); if (ret) { - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock->request_pending = 0; + put_lock(linfo, lock); break; } scoutfs_inc_counter(sb, lock_grant_request); @@ -1033,7 +1113,6 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i ret = 0; } - spin_lock(&linfo->lock); spin_lock(&lock->lock); if (ret) break; @@ -1048,9 +1127,6 @@ static int lock_key_range(struct super_block *sb, enum scoutfs_lock_mode mode, i wake_up(&lock->waitq); put_lock(linfo, lock); -out_unlock: - spin_unlock(&linfo->lock); - if (ret && ret != -EAGAIN && ret != -ERESTARTSYS) scoutfs_inc_counter(sb, lock_lock_error); @@ -1321,7 +1397,6 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou scoutfs_inc_counter(sb, lock_unlock); - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock_dec_count(lock->users, mode); @@ -1337,8 +1412,6 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } void scoutfs_lock_init_coverage(struct scoutfs_lock_coverage *cov) @@ -1406,7 +1479,7 @@ void scoutfs_lock_del_coverage(struct super_block *sb, * with the access mode and the access key must be in the lock's key * range. * - * This is called by lock holders who's use of the lock must be preventing + * This is called by lock holders whose use of the lock must be preventing * the mode and keys from changing. */ bool scoutfs_lock_protected(struct scoutfs_lock *lock, struct scoutfs_key *key, @@ -1443,6 +1516,8 @@ static void lock_shrink_worker(struct work_struct *work) spin_unlock(&linfo->shrink_wlist.lock); list_for_each_entry_safe(lock, tmp, &list, shrink_head) { + + spin_lock(&lock->lock); list_del_init(&lock->shrink_head); /* unlocked lock access, but should be stable since we queued */ @@ -1450,12 +1525,13 @@ static void lock_shrink_worker(struct work_struct *work) nl.old_mode = lock->mode; nl.new_mode = SCOUTFS_LOCK_NULL; + spin_unlock(&lock->lock); + ret = scoutfs_client_lock_request(sb, &nl); if (ret) { /* oh well, not freeing */ scoutfs_inc_counter(sb, lock_shrink_aborted); - spin_lock(&linfo->lock); spin_lock(&lock->lock); lock->request_pending = 0; @@ -1463,8 +1539,6 @@ static void lock_shrink_worker(struct work_struct *work) spin_unlock(&lock->lock); wake_up(&lock->waitq); put_lock(linfo, lock); - - spin_unlock(&linfo->lock); } } } @@ -1481,11 +1555,17 @@ static unsigned long lock_count_objects(struct shrinker *shrink, } /* - * Start the shrinking process for locks on the lru. If a lock is on - * the lru then it can't have any active users. We don't want to block - * or allocate here so all we do is get the lock, mark it request - * pending, and kick off the work. The work sends a null request and - * eventually the lock is freed by its response. + * Start the shrinking process for locks on the lru. Locks are always + * on the lru so we skip any locks that are being used by any other + * references. Lock put/free defines nesting of the linfo spinlock + * inside the lock's spinlock so we're careful to honor that here. Our + * reference to the lock protects its presence on the lru so we can + * always resume iterating from it after dropping and reacquiring the + * linfo lock. + * + * We don't want to block or allocate here so all we do is get the lock, + * mark it request pending, and kick off the work. The work sends a + * null request and eventually the lock is freed by its response. * * Only a racing lock attempt that isn't matched can prevent the lock * from being freed. It'll block waiting to send its request for its @@ -1497,39 +1577,48 @@ static unsigned long lock_scan_objects(struct shrinker *shrink, { struct lock_info *linfo = KC_SHRINKER_CONTAINER_OF(shrink, struct lock_info); struct super_block *sb = linfo->sb; - struct scoutfs_lock *lock; - struct scoutfs_lock *tmp; + struct scoutfs_lock *lock = NULL; unsigned long freed = 0; unsigned long nr = sc->nr_to_scan; scoutfs_inc_counter(sb, lock_scan_objects); spin_lock(&linfo->lock); - spin_lock(&linfo->shrink_wlist.lock); - list_for_each_entry_safe(lock, tmp, &linfo->lru_list, lru_head) { + lock = list_first_entry_or_null(&linfo->lru_list, struct scoutfs_lock, lru_head); + while (lock && nr > 0) { - BUG_ON(!lock_idle(lock)); - BUG_ON(lock->mode == SCOUTFS_LOCK_NULL); - BUG_ON(!list_empty(&lock->shrink_head)); + if (get_lock(lock)) { + spin_unlock(&linfo->lock); - if (nr-- == 0) - break; + spin_lock(&lock->lock); + if (lock->mode != SCOUTFS_LOCK_NULL && atomic_read(&lock->refcount) == 3) { + lock->request_pending = 1; + spin_lock(&linfo->shrink_wlist.lock); + list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); + spin_unlock(&linfo->shrink_wlist.lock); + get_lock(lock); + nr--; + freed++; + } + spin_unlock(&lock->lock); + put_lock(linfo, lock); - __lock_del_lru(linfo, lock); - lock->request_pending = 1; - freed++; - list_add_tail(&lock->shrink_head, &linfo->shrink_wlist.list); + spin_lock(&linfo->lock); + } - scoutfs_inc_counter(sb, lock_shrink_attempted); - trace_scoutfs_lock_shrink(sb, lock); + if (lock->lru_head.next != &linfo->lru_list) + lock = list_next_entry(lock, lru_head); + else + lock = NULL; } - queue_nonempty_work_list(linfo, &linfo->shrink_wlist); - - spin_unlock(&linfo->shrink_wlist.lock); spin_unlock(&linfo->lock); + spin_lock(&linfo->shrink_wlist.lock); + queue_nonempty_work_list(linfo, &linfo->shrink_wlist); + spin_unlock(&linfo->shrink_wlist.lock); + trace_scoutfs_lock_shrink_exit(sb, sc->nr_to_scan, freed); return freed; } @@ -1594,13 +1683,12 @@ static u64 get_held_lock_refresh_gen(struct super_block *sb, struct scoutfs_key if (!linfo) return 0; - spin_lock(&linfo->lock); - lock = lock_lookup(sb, start, NULL); + lock = find_lock(sb, start); if (lock) { if (lock_mode_can_read(lock->mode)) refresh_gen = lock->refresh_gen; + put_lock(linfo, lock); } - spin_unlock(&linfo->lock); return refresh_gen; } @@ -1689,7 +1777,6 @@ void scoutfs_lock_destroy(struct super_block *sb) trace_scoutfs_lock_destroy(sb, linfo); - /* make sure that no one's actively using locks */ spin_lock(&linfo->lock); for (node = rb_first(&linfo->lock_tree); node; node = rb_next(node)) { @@ -1717,45 +1804,43 @@ void scoutfs_lock_destroy(struct super_block *sb) /* * Usually lock_free is only called once locks are idle but all * locks are idle by definition during shutdown. We need to - * manually update the lock's state to reflect that we've given - * up on pending work that would otherwise prevent free from - * being called (and would trip assertions in our manual calling - * of free). + * drop references for any pending work that we've canceled so + * that we can tear down the locks. */ - spin_lock(&linfo->lock); - node = rb_first(&linfo->lock_tree); while (node) { lock = rb_entry(node, struct scoutfs_lock, node); node = rb_next(node); + atomic_inc(&lock->refcount); + list_for_each_entry_safe(ireq, ireq_tmp, &lock->inv_req_list, head) { list_del_init(&ireq->head); - put_lock(linfo, ireq->lock); + put_lock(linfo, lock); kfree(ireq); } - lock->request_pending = 0; - if (!list_empty(&lock->lru_head)) - __lock_del_lru(linfo, lock); + if (lock->request_pending) { + lock->request_pending = 0; + put_lock(linfo, lock); + } - spin_lock(&linfo->inv_wlist.lock); if (!list_empty(&lock->inv_head)) { list_del_init(&lock->inv_head); lock->invalidate_pending = 0; } - spin_unlock(&linfo->inv_wlist.lock); - spin_lock(&linfo->shrink_wlist.lock); - if (!list_empty(&lock->shrink_head)) + if (!list_empty(&lock->shrink_head)) { list_del_init(&lock->shrink_head); - spin_unlock(&linfo->shrink_wlist.lock); + put_lock(linfo, lock); + } + /* manually forcing removal for non-null locks */ + atomic_sub(2, &lock->refcount); lock_remove(linfo, lock); - lock_free(linfo, lock); - } - spin_unlock(&linfo->lock); + WARN_ON_ONCE(!put_lock(linfo, lock)); + } kfree(linfo); sbi->lock_info = NULL; diff --git a/kmod/src/lock.h b/kmod/src/lock.h index a04f02e5..dc6bd0f1 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -19,6 +19,7 @@ struct inode_deletion_lock_data; */ struct scoutfs_lock { struct super_block *sb; + atomic_t refcount; spinlock_t lock; struct scoutfs_key start; struct scoutfs_key end;