From 093f8ead58312dd0da3bd0eba0e943a8c455ff9e Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Mon, 24 Feb 2020 10:01:54 -0800 Subject: [PATCH] scoutfs: refactor server commit locking Server processing paths had open coded management of holding and applying transactions. Refactor that into hold_commit() and apply_commit() helpers. It makes the code a whole lot clearer and gives us a place in hold_commit() to add code that needs to be run before anything is modified in a commit on the server. Signed-off-by: Zach Brown --- kmod/src/server.c | 187 +++++++++++++++++++++++++--------------------- 1 file changed, 101 insertions(+), 86 deletions(-) diff --git a/kmod/src/server.c b/kmod/src/server.c index ba6c0342..39e38cd8 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -64,6 +64,7 @@ struct server_info { struct rw_semaphore commit_rwsem; struct llist_head commit_waiters; struct work_struct commit_work; + bool prepared_commit; /* server tracks seq use */ struct rw_semaphore seq_rwsem; @@ -109,14 +110,44 @@ static void stop_server(struct server_info *server) } /* - * This is called while still holding the rwsem that prevents commits so - * that the caller can be sure to be woken by the next commit after they - * queue and release the lock. + * Hold the shared rwsem that lets multiple holders modify blocks in the + * current commit and prevents the commit worker from acquiring the + * exclusive write lock to write the commit. This can fail for the + * first holder failing to prepare a new commit. + */ +static int hold_commit(struct super_block *sb) +{ + DECLARE_SERVER_INFO(sb, server); + int ret = 0; + + down_read(&server->commit_rwsem); + + while (!server->prepared_commit) { + up_read(&server->commit_rwsem); + down_write(&server->commit_rwsem); + + server->prepared_commit = true; + ret = 0; + + up_write(&server->commit_rwsem); + if (ret) + break; + down_read(&server->commit_rwsem); + } + + return ret; +} + +/* + * This is called while holding the commit and returns once the commit + * is successfully written. Many holders can all wait for all holders + * to drain before their shared commit is applied and they're all woken. * - * It's important to realize that the caller's commit_waiter list node - * might be serviced by a currently running commit work while queueing - * another work run in the future. This caller can return from - * wait_for_commit() while the commit_work is still queued. + * It's important to realize that our commit_waiter list node might be + * serviced by a currently executing commit work that is blocked waiting + * for the holders to release the commit_rwsem. This caller can return + * from wait_for_commit() while another future commit_work is still + * queued. * * This could queue delayed work but we're first trying to have batching * work by having concurrent modification line up behind a commit in @@ -124,24 +155,26 @@ static void stop_server(struct server_info *server) * will race to make their changes and they'll all be applied by the * next commit after that. */ -static void queue_commit_work(struct server_info *server, - struct commit_waiter *cw) +static int apply_commit(struct super_block *sb, int err) { - lockdep_assert_held(&server->commit_rwsem); + DECLARE_SERVER_INFO(sb, server); + struct commit_waiter cw; - cw->ret = 0; - init_completion(&cw->comp); - llist_add(&cw->node, &server->commit_waiters); - queue_work(server->wq, &server->commit_work); -} + if (err == 0) { + cw.ret = 0; + init_completion(&cw.comp); + llist_add(&cw.node, &server->commit_waiters); + queue_work(server->wq, &server->commit_work); + } -/* - * Wait for a commit during request processing and return its status. - */ -static inline int wait_for_commit(struct commit_waiter *cw) -{ - wait_for_completion(&cw->comp); - return cw->ret; + up_read(&server->commit_rwsem); + + if (err == 0) { + wait_for_completion(&cw.comp); + err = cw.ret; + } + + return err; } /* @@ -157,11 +190,10 @@ static void update_free_blocks(__le64 *blocks, struct scoutfs_radix_root *prev, } /* - * A core function of request processing is to modify the manifest and - * allocator. Often the processing needs to make the modifications - * persistent before replying. We'd like to batch these commits as much - * as is reasonable so that we don't degrade to a few IO round trips per - * request. + * Concurrent request processing dirties blocks in a commit and makes + * the modifications persistent before replying. We'd like to batch + * these commits as much as is reasonable so that we don't degrade to a + * few IO round trips per request. * * Getting that batching right is bound up in the concurrency of request * processing so a clear way to implement the batched commits is to @@ -210,6 +242,7 @@ static void scoutfs_server_commit_func(struct work_struct *work) goto out; } + server->prepared_commit = false; ret = 0; out: node = llist_del_all(&server->commit_waiters); @@ -228,11 +261,9 @@ static int server_alloc_inodes(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { - DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_inode_alloc ial = { 0, }; - struct commit_waiter cw; __le64 lecount; u64 ino; u64 nr; @@ -245,7 +276,9 @@ static int server_alloc_inodes(struct super_block *sb, memcpy(&lecount, arg, arg_len); - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; spin_lock(&sbi->next_ino_lock); ino = le64_to_cpu(super->next_ino); @@ -253,13 +286,11 @@ static int server_alloc_inodes(struct super_block *sb, le64_add_cpu(&super->next_ino, nr); spin_unlock(&sbi->next_ino_lock); - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - - ial.ino = cpu_to_le64(ino); - ial.nr = cpu_to_le64(nr); - - ret = wait_for_commit(&cw); + ret = apply_commit(sb, ret); + if (ret == 0) { + ial.ino = cpu_to_le64(ino); + ial.nr = cpu_to_le64(nr); + } out: return scoutfs_net_response(sb, conn, cmd, id, ret, &ial, sizeof(ial)); } @@ -285,7 +316,6 @@ static int server_get_log_trees(struct super_block *sb, struct scoutfs_log_trees_key ltk; struct scoutfs_log_trees_val ltv; struct scoutfs_log_trees lt; - struct commit_waiter cw; u64 count; u64 target; int ret; @@ -295,7 +325,9 @@ static int server_get_log_trees(struct super_block *sb, goto out; } - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; mutex_lock(&server->logs_mutex); @@ -377,12 +409,7 @@ static int server_get_log_trees(struct super_block *sb, unlock: mutex_unlock(&server->logs_mutex); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) - ret = wait_for_commit(&cw); - + ret = apply_commit(sb, ret); if (ret == 0) { lt.meta_avail = ltv.meta_avail; lt.meta_freed = ltv.meta_freed; @@ -415,7 +442,6 @@ static int server_commit_log_trees(struct super_block *sb, struct scoutfs_log_trees_key ltk; struct scoutfs_log_trees_val ltv; struct scoutfs_log_trees *lt; - struct commit_waiter cw; int ret; if (arg_len != sizeof(struct scoutfs_log_trees)) { @@ -424,7 +450,10 @@ static int server_commit_log_trees(struct super_block *sb, } lt = arg; - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret < 0) + goto out; + mutex_lock(&server->logs_mutex); /* find the client's existing item */ @@ -469,11 +498,7 @@ static int server_commit_log_trees(struct super_block *sb, unlock: mutex_unlock(&server->logs_mutex); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) - ret = wait_for_commit(&cw); + ret = apply_commit(sb, ret); out: WARN_ON_ONCE(ret < 0); return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); @@ -588,7 +613,6 @@ static int server_advance_seq(struct super_block *sb, DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; - struct commit_waiter cw; __le64 their_seq; __le64 next_seq; struct scoutfs_trans_seq_btree_key tsk; @@ -601,7 +625,10 @@ static int server_advance_seq(struct super_block *sb, } memcpy(&their_seq, arg, sizeof(their_seq)); - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; + down_write(&server->seq_rwsem); if (their_seq != 0) { @@ -629,11 +656,7 @@ static int server_advance_seq(struct super_block *sb, &tsk, sizeof(tsk), NULL, 0); out: up_write(&server->seq_rwsem); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) - ret = wait_for_commit(&cw); + ret = apply_commit(sb, ret); return scoutfs_net_response(sb, conn, cmd, id, ret, &next_seq, sizeof(next_seq)); @@ -936,7 +959,6 @@ static int server_greeting(struct super_block *sb, struct scoutfs_net_greeting *gr = arg; struct scoutfs_net_greeting greet; DECLARE_SERVER_INFO(sb, server); - struct commit_waiter cw; __le64 umb = 0; bool reconnecting; bool first_contact; @@ -966,7 +988,9 @@ static int server_greeting(struct super_block *sb, } if (gr->server_term == 0) { - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret < 0) + goto send_err; spin_lock(&server->lock); umb = super->unmount_barrier; @@ -977,13 +1001,8 @@ static int server_greeting(struct super_block *sb, le64_to_cpu(gr->flags)); mutex_unlock(&server->farewell_mutex); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) { - ret = wait_for_commit(&cw); - queue_work(server->wq, &server->farewell_work); - } + ret = apply_commit(sb, ret); + queue_work(server->wq, &server->farewell_work); } else { umb = gr->unmount_barrier; } @@ -1021,15 +1040,13 @@ send_err: if (le64_to_cpu(gr->server_term) != server->term) { /* we're now doing two commits per greeting, not great */ - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; ret = scoutfs_lock_server_greeting(sb, le64_to_cpu(gr->rid), gr->server_term != 0); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) - ret = wait_for_commit(&cw); + ret = apply_commit(sb, ret); if (ret) goto out; } @@ -1089,7 +1106,6 @@ static void farewell_worker(struct work_struct *work) struct farewell_request *tmp; struct farewell_request *fw; SCOUTFS_BTREE_ITEM_REF(iref); - struct commit_waiter cw; unsigned int nr_unmounting = 0; unsigned int nr_mounted = 0; LIST_HEAD(reqs); @@ -1174,30 +1190,29 @@ static void farewell_worker(struct work_struct *work) /* process and send farewell responses */ list_for_each_entry_safe(fw, tmp, &send, entry) { - - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; ret = scoutfs_lock_server_farewell(sb, fw->rid) ?: remove_trans_seq(sb, fw->rid) ?: reclaim_log_trees(sb, fw->rid) ?: delete_mounted_client(sb, fw->rid); - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - if (ret == 0) - ret = wait_for_commit(&cw); + ret = apply_commit(sb, ret); if (ret) goto out; } /* update the unmount barrier if we deleted all voting clients */ if (deleted && nr_mounted == 0) { - down_read(&server->commit_rwsem); + ret = hold_commit(sb); + if (ret) + goto out; + le64_add_cpu(&super->unmount_barrier, 1); - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); - ret = wait_for_commit(&cw); + + ret = apply_commit(sb, ret); if (ret) goto out; }