scoutfs: have net.c commit btree blocks

Convert the net server metadata dirtying and committing code to use the
btree instead of the ring.  It has to be careful to setup and teardown
the btree info as it starts up and shuts down the server.

This fixes up some questionable setup/teardown changes made in the
previous patches to convert the manifest and allocator.  We could rebase
the patches to merge those together.  But given that the previous
patches don't work at all without the net updates it might not be worth
the trouble.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-06-30 13:18:33 -07:00
parent ff5a094833
commit c2f13ccf24

View File

@@ -80,10 +80,10 @@ struct net_info {
struct sock_info *listening_sinf;
bool server_loaded;
/* server commits ring changes while processing requests */
struct rw_semaphore ring_commit_rwsem;
struct llist_head ring_commit_waiters;
struct work_struct ring_commit_work;
/* server commits metadata while processing requests */
struct rw_semaphore commit_rwsem;
struct llist_head commit_waiters;
struct work_struct commit_work;
/* level 0 segment addition waits for it to clear */
wait_queue_head_t waitq;
@@ -283,12 +283,12 @@ struct commit_waiter {
*/
static void queue_commit_work(struct net_info *nti, struct commit_waiter *cw)
{
lockdep_assert_held(&nti->ring_commit_rwsem);
lockdep_assert_held(&nti->commit_rwsem);
cw->ret = 0;
init_completion(&cw->comp);
llist_add(&cw->node, &nti->ring_commit_waiters);
queue_work(nti->proc_wq, &nti->ring_commit_work);
llist_add(&cw->node, &nti->commit_waiters);
queue_work(nti->proc_wq, &nti->commit_work);
}
static int wait_for_commit(struct commit_waiter *cw)
@@ -306,9 +306,9 @@ static int wait_for_commit(struct commit_waiter *cw)
*
* Getting that batching right is bound up in the concurrency of request
* processing so a clear way to implement the batched commits is to
* implement commits with work funcs like the processing. This ring
* commit work is queued on the non-reentrant proc_wq so there will only
* ever be one commit executing at a time.
* implement commits with work funcs like the processing. This commit
* work is queued on the non-reentrant proc_wq so there will only ever
* be one commit executing at a time.
*
* Processing paths acquire the rwsem for reading while they're making
* multiple dependent changes. When they're done and want it persistent
@@ -317,17 +317,16 @@ static int wait_for_commit(struct commit_waiter *cw)
* performs the commit. Readers can run concurrently with these
* commits.
*/
static void scoutfs_net_ring_commit_func(struct work_struct *work)
static void scoutfs_net_commit_func(struct work_struct *work)
{
struct net_info *nti = container_of(work, struct net_info,
ring_commit_work);
struct net_info *nti = container_of(work, struct net_info, commit_work);
struct super_block *sb = nti->sb;
struct commit_waiter *cw;
struct commit_waiter *pos;
struct llist_node *node;
int ret;
down_write(&nti->ring_commit_rwsem);
down_write(&nti->commit_rwsem);
if (scoutfs_btree_has_dirty(sb)) {
ret = scoutfs_alloc_apply_pending(sb) ?:
@@ -344,7 +343,7 @@ static void scoutfs_net_ring_commit_func(struct work_struct *work)
ret = 0;
}
node = llist_del_all(&nti->ring_commit_waiters);
node = llist_del_all(&nti->commit_waiters);
/* waiters always wait on completion, cw could be free after complete */
llist_for_each_entry_safe(cw, pos, node, node) {
@@ -352,7 +351,7 @@ static void scoutfs_net_ring_commit_func(struct work_struct *work)
complete(&cw->comp);
}
up_write(&nti->ring_commit_rwsem);
up_write(&nti->commit_rwsem);
}
static struct send_buf *alloc_sbuf(unsigned data_len)
@@ -391,7 +390,7 @@ static struct send_buf *process_bulk_alloc(struct super_block *sb,void *req,
ns = (void *)sbuf->nh->data;
ns->nr = cpu_to_le16(SCOUTFS_BULK_ALLOC_COUNT);
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
for (i = 0; i < SCOUTFS_BULK_ALLOC_COUNT; i++) {
ret = scoutfs_alloc_segno(sb, &segno);
@@ -408,7 +407,7 @@ static struct send_buf *process_bulk_alloc(struct super_block *sb,void *req,
if (ret == 0)
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
@@ -521,12 +520,12 @@ static struct send_buf *process_record_segment(struct super_block *sb,
}
retry:
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
scoutfs_manifest_lock(sb);
if (scoutfs_manifest_level0_full(sb)) {
scoutfs_manifest_unlock(sb);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
/* XXX waits indefinitely? io errors? */
wait_event(nti->waitq, !scoutfs_manifest_level0_full(sb));
goto retry;
@@ -539,7 +538,7 @@ retry:
if (ret == 0)
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
@@ -575,13 +574,13 @@ static struct send_buf *process_alloc_segno(struct super_block *sb,
goto out;
}
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
ret = scoutfs_alloc_segno(sb, &segno);
if (ret == 0)
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
@@ -627,7 +626,7 @@ static struct send_buf *process_alloc_inodes(struct super_block *sb,
if (!sbuf)
return ERR_PTR(-ENOMEM);
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
spin_lock(&sbi->next_ino_lock);
ino = le64_to_cpu(super->next_ino);
@@ -636,7 +635,7 @@ static struct send_buf *process_alloc_inodes(struct super_block *sb,
spin_unlock(&sbi->next_ino_lock);
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
ret = wait_for_commit(&cw);
@@ -700,7 +699,7 @@ static struct send_buf *process_advance_seq(struct super_block *sb,
goto out;
}
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
spin_lock(&nti->seq_lock);
@@ -725,7 +724,7 @@ static struct send_buf *process_advance_seq(struct super_block *sb,
spin_unlock(&nti->seq_lock);
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
ret = wait_for_commit(&cw);
out:
@@ -882,8 +881,8 @@ static void destroy_server_state(struct super_block *sb)
scoutfs_compact_destroy(sb);
scoutfs_alloc_destroy(sb);
/* XXX this drops dirty data on the floor.. has it committed? */
scoutfs_btree_write_complete(sb);
scoutfs_manifest_destroy(sb);
scoutfs_btree_destroy(sb);
/* XXX these should be persistent and reclaimed during recovery */
list_for_each_entry_safe(ps, tmp, &nti->pending_seqs, head) {
@@ -912,7 +911,7 @@ static void scoutfs_net_proc_func(struct work_struct *work)
mutex_lock(&nti->mutex);
if (!nti->server_loaded) {
ret = scoutfs_read_supers(sb, &SCOUTFS_SB(sb)->super) ?:
scoutfs_btree_prepare_write(sb) ?:
scoutfs_btree_setup(sb) ?:
scoutfs_manifest_setup(sb) ?:
scoutfs_alloc_setup(sb) ?:
scoutfs_compact_setup(sb);
@@ -1229,7 +1228,7 @@ static void scoutfs_net_shutdown_func(struct work_struct *work)
if (sinf == nti->listening_sinf) {
nti->listening_sinf = NULL;
/* shutdown the server, processing won't leave rings dirty */
/* shutdown the server, processing won't leave dirty metadata */
destroy_server_state(sb);
nti->server_loaded = false;
@@ -1418,9 +1417,9 @@ u64 *scoutfs_net_bulk_alloc(struct super_block *sb)
*
* This is a short circuit that's called directly by a work function
* that's only queued on the server. It makes compaction work inside
* the ring update consistency mechanics inside net message processing
* and demonstrates the moving pieces that we'd need to cut up into a
* series of messages and replies.
* the commit consistency mechanics inside net message processing and
* demonstrates the moving pieces that we'd need to cut up into a series
* of messages and replies.
*
* The compaction work caller cleans up everything on errors.
*/
@@ -1433,11 +1432,11 @@ int scoutfs_net_get_compaction(struct super_block *sb, void *curs)
int nr;
int i;
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
nr = scoutfs_manifest_next_compact(sb, curs);
if (nr <= 0) {
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
return nr;
}
@@ -1451,7 +1450,7 @@ int scoutfs_net_get_compaction(struct super_block *sb, void *curs)
if (ret == 0)
queue_commit_work(nti, &cw);
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
@@ -1479,7 +1478,7 @@ int scoutfs_net_finish_compaction(struct super_block *sb, void *curs,
bool level0_was_full;
int ret;
down_read(&nti->ring_commit_rwsem);
down_read(&nti->commit_rwsem);
level0_was_full = scoutfs_manifest_level0_full(sb);
@@ -1490,7 +1489,7 @@ int scoutfs_net_finish_compaction(struct super_block *sb, void *curs,
wake_up(&nti->waitq);
}
up_read(&nti->ring_commit_rwsem);
up_read(&nti->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
@@ -2066,9 +2065,9 @@ int scoutfs_net_setup(struct super_block *sb)
INIT_LIST_HEAD(&nti->to_send);
nti->next_id = 1;
INIT_DELAYED_WORK(&nti->server_work, scoutfs_net_server_func);
init_rwsem(&nti->ring_commit_rwsem);
init_llist_head(&nti->ring_commit_waiters);
INIT_WORK(&nti->ring_commit_work, scoutfs_net_ring_commit_func);
init_rwsem(&nti->commit_rwsem);
init_llist_head(&nti->commit_waiters);
INIT_WORK(&nti->commit_work, scoutfs_net_commit_func);
init_waitqueue_head(&nti->waitq);
spin_lock_init(&nti->seq_lock);
INIT_LIST_HEAD(&nti->pending_seqs);