scoutfs: refactor server commit locking

Server processing paths had open coded management of holding and
applying transactions.  Refactor that into hold_commit() and
apply_commit() helpers.  It makes the code a whole lot clearer and gives
us a place in hold_commit() to add code that needs to be run before
anything is modified in a commit on the server.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2020-02-24 10:01:54 -08:00
committed by Zach Brown
parent ce7f7bdbd3
commit 093f8ead58

View File

@@ -64,6 +64,7 @@ struct server_info {
struct rw_semaphore commit_rwsem;
struct llist_head commit_waiters;
struct work_struct commit_work;
bool prepared_commit;
/* server tracks seq use */
struct rw_semaphore seq_rwsem;
@@ -109,14 +110,44 @@ static void stop_server(struct server_info *server)
}
/*
* This is called while still holding the rwsem that prevents commits so
* that the caller can be sure to be woken by the next commit after they
* queue and release the lock.
* Hold the shared rwsem that lets multiple holders modify blocks in the
* current commit and prevents the commit worker from acquiring the
* exclusive write lock to write the commit. This can fail for the
* first holder failing to prepare a new commit.
*/
static int hold_commit(struct super_block *sb)
{
DECLARE_SERVER_INFO(sb, server);
int ret = 0;
down_read(&server->commit_rwsem);
while (!server->prepared_commit) {
up_read(&server->commit_rwsem);
down_write(&server->commit_rwsem);
server->prepared_commit = true;
ret = 0;
up_write(&server->commit_rwsem);
if (ret)
break;
down_read(&server->commit_rwsem);
}
return ret;
}
/*
* This is called while holding the commit and returns once the commit
* is successfully written. Many holders can all wait for all holders
* to drain before their shared commit is applied and they're all woken.
*
* It's important to realize that the caller's commit_waiter list node
* might be serviced by a currently running commit work while queueing
* another work run in the future. This caller can return from
* wait_for_commit() while the commit_work is still queued.
* It's important to realize that our commit_waiter list node might be
* serviced by a currently executing commit work that is blocked waiting
* for the holders to release the commit_rwsem. This caller can return
* from wait_for_commit() while another future commit_work is still
* queued.
*
* This could queue delayed work but we're first trying to have batching
* work by having concurrent modification line up behind a commit in
@@ -124,24 +155,26 @@ static void stop_server(struct server_info *server)
* will race to make their changes and they'll all be applied by the
* next commit after that.
*/
static void queue_commit_work(struct server_info *server,
struct commit_waiter *cw)
static int apply_commit(struct super_block *sb, int err)
{
lockdep_assert_held(&server->commit_rwsem);
DECLARE_SERVER_INFO(sb, server);
struct commit_waiter cw;
cw->ret = 0;
init_completion(&cw->comp);
llist_add(&cw->node, &server->commit_waiters);
queue_work(server->wq, &server->commit_work);
}
if (err == 0) {
cw.ret = 0;
init_completion(&cw.comp);
llist_add(&cw.node, &server->commit_waiters);
queue_work(server->wq, &server->commit_work);
}
/*
* Wait for a commit during request processing and return its status.
*/
static inline int wait_for_commit(struct commit_waiter *cw)
{
wait_for_completion(&cw->comp);
return cw->ret;
up_read(&server->commit_rwsem);
if (err == 0) {
wait_for_completion(&cw.comp);
err = cw.ret;
}
return err;
}
/*
@@ -157,11 +190,10 @@ static void update_free_blocks(__le64 *blocks, struct scoutfs_radix_root *prev,
}
/*
* A core function of request processing is to modify the manifest and
* allocator. Often the processing needs to make the modifications
* persistent before replying. We'd like to batch these commits as much
* as is reasonable so that we don't degrade to a few IO round trips per
* request.
* Concurrent request processing dirties blocks in a commit and makes
* the modifications persistent before replying. We'd like to batch
* these commits as much as is reasonable so that we don't degrade to a
* few IO round trips per request.
*
* Getting that batching right is bound up in the concurrency of request
* processing so a clear way to implement the batched commits is to
@@ -210,6 +242,7 @@ static void scoutfs_server_commit_func(struct work_struct *work)
goto out;
}
server->prepared_commit = false;
ret = 0;
out:
node = llist_del_all(&server->commit_waiters);
@@ -228,11 +261,9 @@ static int server_alloc_inodes(struct super_block *sb,
struct scoutfs_net_connection *conn,
u8 cmd, u64 id, void *arg, u16 arg_len)
{
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_net_inode_alloc ial = { 0, };
struct commit_waiter cw;
__le64 lecount;
u64 ino;
u64 nr;
@@ -245,7 +276,9 @@ static int server_alloc_inodes(struct super_block *sb,
memcpy(&lecount, arg, arg_len);
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
spin_lock(&sbi->next_ino_lock);
ino = le64_to_cpu(super->next_ino);
@@ -253,13 +286,11 @@ static int server_alloc_inodes(struct super_block *sb,
le64_add_cpu(&super->next_ino, nr);
spin_unlock(&sbi->next_ino_lock);
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
ial.ino = cpu_to_le64(ino);
ial.nr = cpu_to_le64(nr);
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
if (ret == 0) {
ial.ino = cpu_to_le64(ino);
ial.nr = cpu_to_le64(nr);
}
out:
return scoutfs_net_response(sb, conn, cmd, id, ret, &ial, sizeof(ial));
}
@@ -285,7 +316,6 @@ static int server_get_log_trees(struct super_block *sb,
struct scoutfs_log_trees_key ltk;
struct scoutfs_log_trees_val ltv;
struct scoutfs_log_trees lt;
struct commit_waiter cw;
u64 count;
u64 target;
int ret;
@@ -295,7 +325,9 @@ static int server_get_log_trees(struct super_block *sb,
goto out;
}
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
mutex_lock(&server->logs_mutex);
@@ -377,12 +409,7 @@ static int server_get_log_trees(struct super_block *sb,
unlock:
mutex_unlock(&server->logs_mutex);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
if (ret == 0) {
lt.meta_avail = ltv.meta_avail;
lt.meta_freed = ltv.meta_freed;
@@ -415,7 +442,6 @@ static int server_commit_log_trees(struct super_block *sb,
struct scoutfs_log_trees_key ltk;
struct scoutfs_log_trees_val ltv;
struct scoutfs_log_trees *lt;
struct commit_waiter cw;
int ret;
if (arg_len != sizeof(struct scoutfs_log_trees)) {
@@ -424,7 +450,10 @@ static int server_commit_log_trees(struct super_block *sb,
}
lt = arg;
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret < 0)
goto out;
mutex_lock(&server->logs_mutex);
/* find the client's existing item */
@@ -469,11 +498,7 @@ static int server_commit_log_trees(struct super_block *sb,
unlock:
mutex_unlock(&server->logs_mutex);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
out:
WARN_ON_ONCE(ret < 0);
return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0);
@@ -588,7 +613,6 @@ static int server_advance_seq(struct super_block *sb,
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct commit_waiter cw;
__le64 their_seq;
__le64 next_seq;
struct scoutfs_trans_seq_btree_key tsk;
@@ -601,7 +625,10 @@ static int server_advance_seq(struct super_block *sb,
}
memcpy(&their_seq, arg, sizeof(their_seq));
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
down_write(&server->seq_rwsem);
if (their_seq != 0) {
@@ -629,11 +656,7 @@ static int server_advance_seq(struct super_block *sb,
&tsk, sizeof(tsk), NULL, 0);
out:
up_write(&server->seq_rwsem);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
return scoutfs_net_response(sb, conn, cmd, id, ret,
&next_seq, sizeof(next_seq));
@@ -936,7 +959,6 @@ static int server_greeting(struct super_block *sb,
struct scoutfs_net_greeting *gr = arg;
struct scoutfs_net_greeting greet;
DECLARE_SERVER_INFO(sb, server);
struct commit_waiter cw;
__le64 umb = 0;
bool reconnecting;
bool first_contact;
@@ -966,7 +988,9 @@ static int server_greeting(struct super_block *sb,
}
if (gr->server_term == 0) {
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret < 0)
goto send_err;
spin_lock(&server->lock);
umb = super->unmount_barrier;
@@ -977,13 +1001,8 @@ static int server_greeting(struct super_block *sb,
le64_to_cpu(gr->flags));
mutex_unlock(&server->farewell_mutex);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0) {
ret = wait_for_commit(&cw);
queue_work(server->wq, &server->farewell_work);
}
ret = apply_commit(sb, ret);
queue_work(server->wq, &server->farewell_work);
} else {
umb = gr->unmount_barrier;
}
@@ -1021,15 +1040,13 @@ send_err:
if (le64_to_cpu(gr->server_term) != server->term) {
/* we're now doing two commits per greeting, not great */
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
ret = scoutfs_lock_server_greeting(sb, le64_to_cpu(gr->rid),
gr->server_term != 0);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
if (ret)
goto out;
}
@@ -1089,7 +1106,6 @@ static void farewell_worker(struct work_struct *work)
struct farewell_request *tmp;
struct farewell_request *fw;
SCOUTFS_BTREE_ITEM_REF(iref);
struct commit_waiter cw;
unsigned int nr_unmounting = 0;
unsigned int nr_mounted = 0;
LIST_HEAD(reqs);
@@ -1174,30 +1190,29 @@ static void farewell_worker(struct work_struct *work)
/* process and send farewell responses */
list_for_each_entry_safe(fw, tmp, &send, entry) {
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
ret = scoutfs_lock_server_farewell(sb, fw->rid) ?:
remove_trans_seq(sb, fw->rid) ?:
reclaim_log_trees(sb, fw->rid) ?:
delete_mounted_client(sb, fw->rid);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
if (ret)
goto out;
}
/* update the unmount barrier if we deleted all voting clients */
if (deleted && nr_mounted == 0) {
down_read(&server->commit_rwsem);
ret = hold_commit(sb);
if (ret)
goto out;
le64_add_cpu(&super->unmount_barrier, 1);
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
ret = wait_for_commit(&cw);
ret = apply_commit(sb, ret);
if (ret)
goto out;
}