Handle back to back invalidation requests

The client's incoming lock invalidation request handler triggers a
BUG_ON if it gets a request for a lock that is already processing a
previous invalidation request.  The server is supposed to only send
one request at a time.

The problem is that the batched invalidation request handling will send
responses outside of spinlock coverage before reacquirin the lock and
finishing processing once the response send has been successful.

This gives a window for another invalidation request to arrive after the
response was sent but before the invalidation finished processing.  This
triggers the bug.

The fix is to mark the lock such that we can recognize a valid second
request arriving after we send the response but before we finish
processing.  If it arrives we'll continue invalidation processing with
the arguments from the new request.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2021-04-22 17:00:50 -07:00
parent cbb031bb5d
commit a27c54568c

View File

@@ -713,7 +713,9 @@ int scoutfs_lock_grant_response(struct super_block *sb,
/* /*
* Each lock has received a lock invalidation request from the server * Each lock has received a lock invalidation request from the server
* which specifies a new mode for the lock. The server will only send * which specifies a new mode for the lock. The server will only send
* one invalidation request at a time for each lock. * one invalidation request at a time for each lock. The server can
* send another invalidate request after we send the response but before
* we reacquire the lock and finish invalidation.
* *
* This is an unsolicited request from the server so it can arrive at * This is an unsolicited request from the server so it can arrive at
* any time after we make the server aware of the lock by initially * any time after we make the server aware of the lock by initially
@@ -803,6 +805,9 @@ static void lock_invalidate_worker(struct work_struct *work)
ret = lock_invalidate(sb, lock, nl->old_mode, nl->new_mode); ret = lock_invalidate(sb, lock, nl->old_mode, nl->new_mode);
BUG_ON(ret); BUG_ON(ret);
/* allow another request after we respond but before we finish */
lock->inv_net_id = 0;
/* respond with the key and modes from the request */ /* respond with the key and modes from the request */
ret = scoutfs_client_lock_response(sb, net_id, nl); ret = scoutfs_client_lock_response(sb, net_id, nl);
BUG_ON(ret); BUG_ON(ret);
@@ -814,11 +819,13 @@ static void lock_invalidate_worker(struct work_struct *work)
spin_lock(&linfo->lock); spin_lock(&linfo->lock);
list_for_each_entry_safe(lock, tmp, &ready, inv_head) { list_for_each_entry_safe(lock, tmp, &ready, inv_head) {
list_del_init(&lock->inv_head);
lock->invalidate_pending = 0;
trace_scoutfs_lock_invalidated(sb, lock); trace_scoutfs_lock_invalidated(sb, lock);
wake_up(&lock->waitq); if (lock->inv_net_id == 0) {
/* finish if another request didn't arrive */
list_del_init(&lock->inv_head);
lock->invalidate_pending = 0;
wake_up(&lock->waitq);
}
put_lock(linfo, lock); put_lock(linfo, lock);
} }
@@ -833,34 +840,47 @@ out:
} }
/* /*
* Record an incoming invalidate request from the server and add its lock * Record an incoming invalidate request from the server and add its
* to the list for processing. * lock to the list for processing. This request can be from a new
* server and racing with invalidation that frees from an old server.
* It's fine to not find the requested lock and send an immediate
* response.
* *
* This is trusting the server and will crash if it's sent bad requests :/ * The invalidation process drops the linfo lock to send responses. The
* moment it does so we can receive another invalidation request (the
* server can ask us to go from write->read then read->null). We allow
* for one chain like this but it's a bug if we receive more concurrent
* invalidation requests than that. The server should be only sending
* one at a time.
*/ */
int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id, int scoutfs_lock_invalidate_request(struct super_block *sb, u64 net_id,
struct scoutfs_net_lock *nl) struct scoutfs_net_lock *nl)
{ {
DECLARE_LOCK_INFO(sb, linfo); DECLARE_LOCK_INFO(sb, linfo);
struct scoutfs_lock *lock; struct scoutfs_lock *lock;
int ret = 0;
scoutfs_inc_counter(sb, lock_invalidate_request); scoutfs_inc_counter(sb, lock_invalidate_request);
spin_lock(&linfo->lock); spin_lock(&linfo->lock);
lock = get_lock(sb, &nl->key); lock = get_lock(sb, &nl->key);
BUG_ON(!lock);
if (lock) { if (lock) {
BUG_ON(lock->invalidate_pending); BUG_ON(lock->inv_net_id != 0);
lock->invalidate_pending = 1;
lock->inv_nl = *nl;
lock->inv_net_id = net_id; lock->inv_net_id = net_id;
list_add_tail(&lock->inv_head, &linfo->inv_list); lock->inv_nl = *nl;
if (list_empty(&lock->inv_head)) {
list_add_tail(&lock->inv_head, &linfo->inv_list);
lock->invalidate_pending = 1;
}
trace_scoutfs_lock_invalidate_request(sb, lock); trace_scoutfs_lock_invalidate_request(sb, lock);
queue_inv_work(linfo); queue_inv_work(linfo);
} }
spin_unlock(&linfo->lock); spin_unlock(&linfo->lock);
return 0; if (!lock)
ret = scoutfs_client_lock_response(sb, net_id, nl);
return ret;
} }
/* /*