From 9c2122f7ded1ac9a7103110fac04ced4e6c34a83 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 18 Dec 2020 11:40:13 -0800 Subject: [PATCH] Add server btree merge processing This adds the server processing side of the btree merge functionality. The client isn't yet sending the log_merge messages so no merging will be performed. The bulk of the work happens as the server processess a get_log_merge message to build a merge request for the client. It starts a log merge if one isn't in flight. If one is in flight it checks to see if it should be spliced and maybe finished. In the common case it finds the next range to be merged and sends the request to the client to process. The commit_log_merge handler is the completion side of that request. If the request failed then we unwind its resources based on the stored request item. If it succeeds we record it in an item for get_ processing to splice eventually. Then we modify two existing server code paths. First, get_log_tree doesn't just create or use a single existing log btree for a client mount. If the existing log btree is large enough it sets its finalized flag and advances the nr to use a new log btree. That makes the old finalized log btree available for merging. Then we need to be a bit more careful when reclaiming the open log btree for a client. We can't use next to find the only open log btree, we use prev to find the last and make sure that it isn't already finalized. Signed-off-by: Zach Brown --- kmod/src/alloc.h | 10 + kmod/src/scoutfs_trace.h | 110 ++++ kmod/src/server.c | 1141 +++++++++++++++++++++++++++++++++++++- 3 files changed, 1232 insertions(+), 29 deletions(-) diff --git a/kmod/src/alloc.h b/kmod/src/alloc.h index 1e245c5f..9130d086 100644 --- a/kmod/src/alloc.h +++ b/kmod/src/alloc.h @@ -55,6 +55,16 @@ #define SCOUTFS_SERVER_DATA_FILL_LO \ (1ULL * 1024 * 1024 * 1024 >> SCOUTFS_BLOCK_SM_SHIFT) +/* + * Log merge meta allocations are only used for one request and will + * never use more than the dirty limit. + */ +#define SCOUTFS_LOG_MERGE_DIRTY_BYTE_LIMIT (64ULL * 1024 * 1024) +/* a few extra blocks for alloc blocks */ +#define SCOUTFS_SERVER_MERGE_FILL_TARGET \ + ((SCOUTFS_LOG_MERGE_DIRTY_BYTE_LIMIT >> SCOUTFS_BLOCK_LG_SHIFT) + 4) +#define SCOUTFS_SERVER_MERGE_FILL_LO SCOUTFS_SERVER_MERGE_FILL_TARGET + /* * Each of the server meta_alloc roots will try to keep a minimum amount * of free blocks. The server will swap roots when its current avail diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 2be1014a..4e58ef7a 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -2016,6 +2016,116 @@ TRACE_EVENT(scoutfs_trans_seq_last, SCSB_TRACE_ARGS, __entry->s_rid, __entry->trans_seq) ); +TRACE_EVENT(scoutfs_get_log_merge_status, + TP_PROTO(struct super_block *sb, u64 rid, struct scoutfs_key *next_range_key, + u64 nr_requests, u64 nr_complete, u64 last_seq, u64 seq), + + TP_ARGS(sb, rid, next_range_key, nr_requests, nr_complete, last_seq, seq), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + sk_trace_define(next_range_key) + __field(__u64, nr_requests) + __field(__u64, nr_complete) + __field(__u64, last_seq) + __field(__u64, seq) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + sk_trace_assign(next_range_key, next_range_key); + __entry->nr_requests = nr_requests; + __entry->nr_complete = nr_complete; + __entry->last_seq = last_seq; + __entry->seq = seq; + ), + + TP_printk(SCSBF" rid %016llx next_range_key "SK_FMT" nr_requests %llu nr_complete %llu last_seq %llu seq %llu", + SCSB_TRACE_ARGS, __entry->s_rid, sk_trace_args(next_range_key), + __entry->nr_requests, __entry->nr_complete, __entry->last_seq, __entry->seq) +); + +TRACE_EVENT(scoutfs_get_log_merge_request, + TP_PROTO(struct super_block *sb, u64 rid, + struct scoutfs_btree_root *root, struct scoutfs_key *start, + struct scoutfs_key *end, u64 last_seq, u64 seq), + + TP_ARGS(sb, rid, root, start, end, last_seq, seq), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + __field(__u64, root_blkno) + __field(__u64, root_seq) + __field(__u8, root_height) + sk_trace_define(start) + sk_trace_define(end) + __field(__u64, last_seq) + __field(__u64, seq) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + __entry->root_blkno = le64_to_cpu(root->ref.blkno); + __entry->root_seq = le64_to_cpu(root->ref.seq); + __entry->root_height = root->height; + sk_trace_assign(start, start); + sk_trace_assign(end, end); + __entry->last_seq = last_seq; + __entry->seq = seq; + ), + + TP_printk(SCSBF" rid %016llx root blkno %llu seq %llu height %u start "SK_FMT" end "SK_FMT" last_seq %llu seq %llu", + SCSB_TRACE_ARGS, __entry->s_rid, __entry->root_blkno, + __entry->root_seq, __entry->root_height, + sk_trace_args(start), sk_trace_args(end), __entry->last_seq, + __entry->seq) +); + +TRACE_EVENT(scoutfs_get_log_merge_complete, + TP_PROTO(struct super_block *sb, u64 rid, + struct scoutfs_btree_root *root, struct scoutfs_key *start, + struct scoutfs_key *end, struct scoutfs_key *remain, + u64 seq, u64 flags), + + TP_ARGS(sb, rid, root, start, end, remain, seq, flags), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + __field(__u64, root_blkno) + __field(__u64, root_seq) + __field(__u8, root_height) + sk_trace_define(start) + sk_trace_define(end) + sk_trace_define(remain) + __field(__u64, seq) + __field(__u64, flags) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + __entry->root_blkno = le64_to_cpu(root->ref.blkno); + __entry->root_seq = le64_to_cpu(root->ref.seq); + __entry->root_height = root->height; + sk_trace_assign(start, start); + sk_trace_assign(end, end); + sk_trace_assign(remain, remain); + __entry->seq = seq; + __entry->flags = flags; + ), + + TP_printk(SCSBF" rid %016llx root blkno %llu seq %llu height %u start "SK_FMT" end "SK_FMT" remain "SK_FMT" seq %llu flags 0x%llx", + SCSB_TRACE_ARGS, __entry->s_rid, __entry->root_blkno, + __entry->root_seq, __entry->root_height, + sk_trace_args(start), sk_trace_args(end), + sk_trace_args(remain), __entry->seq, __entry->flags) +); + DECLARE_EVENT_CLASS(scoutfs_forest_bloom_class, TP_PROTO(struct super_block *sb, struct scoutfs_key *key, u64 rid, u64 nr, u64 blkno, u64 seq, unsigned int count), diff --git a/kmod/src/server.c b/kmod/src/server.c index d89a83ca..15d3f6ed 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -96,6 +96,8 @@ struct server_info { struct scoutfs_block_writer wri; struct mutex logs_mutex; + struct work_struct log_merge_free_work; + struct mutex srch_mutex; struct mutex mounted_clients_mutex; @@ -604,6 +606,35 @@ static void set_extent_zone_bits(struct super_block *sb, void *cb_arg, struct sc mod_extent_bits(cba->zones, cba->zone_blocks, ext->start, ext->len, true); } +static int find_log_trees_item(struct super_block *sb, + struct scoutfs_btree_root *logs_root, + bool call_next, u64 rid, u64 nr, + struct scoutfs_log_trees *lt_ret) +{ + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + int ret; + + scoutfs_key_init_log_trees(&key, rid, nr); + if (call_next) + ret = scoutfs_btree_next(sb, logs_root, &key, &iref); + else + ret = scoutfs_btree_prev(sb, logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(struct scoutfs_log_trees)) { + if (le64_to_cpu(iref.key->sklt_rid) != rid) + ret = -ENOENT; + else + memcpy(lt_ret, iref.val, iref.val_len); + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + + return ret; +} + /* * Give the client roots to all the trees that they'll use to build * their transaction. @@ -613,6 +644,9 @@ static void set_extent_zone_bits(struct super_block *sb, void *cb_arg, struct sc * trees back into the core allocators. They're were committed with the * previous transaction so they're stable and can now be reused, even by * the server in this commit. + * + * If the committed log trees are large enough we finalize them and make + * them available to log merging. */ static int server_get_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, @@ -624,10 +658,12 @@ static int server_get_log_trees(struct super_block *sb, __le64 exclusive[SCOUTFS_DATA_ALLOC_ZONE_LE64S]; __le64 vacant[SCOUTFS_DATA_ALLOC_ZONE_LE64S]; struct alloc_extent_cb_args cba; - SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_log_trees fin; struct scoutfs_log_trees lt; struct scoutfs_key key; + bool have_fin = false; u64 data_zone_blocks; + u64 nr; int ret; if (arg_len != 0) { @@ -639,32 +675,55 @@ static int server_get_log_trees(struct super_block *sb, mutex_lock(&server->logs_mutex); - scoutfs_key_init_log_trees(&key, rid, U64_MAX); - - ret = scoutfs_btree_prev(sb, &super->logs_root, &key, &iref); + /* see if we have already have a finalized root from the rid */ + ret = find_log_trees_item(sb, &super->logs_root, true, rid, 0, <); if (ret < 0 && ret != -ENOENT) goto unlock; - if (ret == 0) { - if (iref.val_len == sizeof(struct scoutfs_log_trees)) { - key = *iref.key; - memcpy(<, iref.val, iref.val_len); - if (le64_to_cpu(key.sklt_rid) != rid) - ret = -ENOENT; - } else { - ret = -EIO; - } - scoutfs_btree_put_iref(&iref); - if (ret == -EIO) - goto unlock; + if (ret == 0 && le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) + have_fin = true; + + /* use the last non-finalized root, or start a new one */ + ret = find_log_trees_item(sb, &super->logs_root, false, rid, U64_MAX, + <); + if (ret < 0 && ret != -ENOENT) + goto unlock; + if (ret == 0 && le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) { + ret = -ENOENT; + nr = le64_to_cpu(lt.nr) + 1; + } else if (ret == -ENOENT) { + nr = 1; } - /* initialize new roots if we don't have any */ + /* initialize a new root if we don't have a non-finalized one */ if (ret == -ENOENT) { - key.sklt_rid = cpu_to_le64(rid); - key.sklt_nr = cpu_to_le64(1); memset(<, 0, sizeof(lt)); - lt.rid = key.sklt_rid; - lt.nr = key.sklt_nr; + lt.rid = cpu_to_le64(rid); + lt.nr = cpu_to_le64(nr); + } + + /* finalize an existing root when large enough and don't have one */ + if (lt.item_root.height > 2 && !have_fin) { + fin = lt; + memset(&fin.meta_avail, 0, sizeof(fin.meta_avail)); + memset(&fin.meta_freed, 0, sizeof(fin.meta_freed)); + memset(&fin.data_avail, 0, sizeof(fin.data_avail)); + memset(&fin.data_freed, 0, sizeof(fin.data_freed)); + memset(&fin.srch_file, 0, sizeof(fin.srch_file)); + le64_add_cpu(&fin.flags, SCOUTFS_LOG_TREES_FINALIZED); + + scoutfs_key_init_log_trees(&key, le64_to_cpu(fin.rid), + le64_to_cpu(fin.nr)); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->logs_root, &key, &fin, + sizeof(fin)); + if (ret < 0) + goto unlock; + + memset(<.item_root, 0, sizeof(lt.item_root)); + memset(<.bloom_ref, 0, sizeof(lt.bloom_ref)); + lt.max_item_seq = 0; + le64_add_cpu(<.nr, 1); + lt.flags = 0; } if (get_volopt_val(server, SCOUTFS_VOLOPT_DATA_ALLOC_ZONE_BLOCKS_NR, &data_zone_blocks)) { @@ -708,6 +767,8 @@ static int server_get_log_trees(struct super_block *sb, } /* update client's log tree's item */ + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr)); ret = scoutfs_btree_force(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); unlock: @@ -811,8 +872,9 @@ static int server_get_roots(struct super_block *sb, /* * A client is being evicted so we want to reclaim resources from their - * log tree items. The item trees and bloom refs stay around to be read - * and eventually merged and we reclaim all the allocator items. + * open log tree item. The item tree and bloom ref stay around to be + * read and we finalize the tree so that it will be merged. We reclaim + * all the allocator items. * * The caller holds the commit rwsem which means we do all this work in * one server commit. We'll need to keep the total amount of blocks in @@ -826,7 +888,7 @@ static int server_get_roots(struct super_block *sb, * We can return an error without fully reclaiming all the log item's * referenced data. */ -static int reclaim_log_trees(struct super_block *sb, u64 rid) +static int reclaim_open_log_tree(struct super_block *sb, u64 rid) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); @@ -838,14 +900,16 @@ static int reclaim_log_trees(struct super_block *sb, u64 rid) mutex_lock(&server->logs_mutex); - /* find the client's existing item */ - scoutfs_key_init_log_trees(&key, rid, 0); - ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + /* find the client's last open log_tree */ + scoutfs_key_init_log_trees(&key, rid, U64_MAX); + ret = scoutfs_btree_prev(sb, &super->logs_root, &key, &iref); if (ret == 0) { if (iref.val_len == sizeof(struct scoutfs_log_trees)) { key = *iref.key; memcpy(<, iref.val, iref.val_len); - if (le64_to_cpu(key.sklt_rid) != rid) + if ((le64_to_cpu(key.sklt_rid) != rid) || + (le64_to_cpu(lt.flags) & + SCOUTFS_LOG_TREES_FINALIZED)) ret = -ENOENT; } else { ret = -EIO; @@ -876,6 +940,7 @@ static int reclaim_log_trees(struct super_block *sb, u64 rid) /* the mount is no longer writing to the zones */ zero_data_alloc_zone_bits(<); + le64_add_cpu(<.flags, SCOUTFS_LOG_TREES_FINALIZED); err = scoutfs_btree_update(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); @@ -1275,6 +1340,910 @@ out: return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); } +/* + * Log merge range items are stored at the starting fs key of the range. + * The only fs key field that doesn't hold information is the zone, so + * we use the zone to differentiate all types that we store in the log + * merge tree. + */ +static void init_log_merge_key(struct scoutfs_key *key, u8 zone, u64 first, + u64 second) +{ + *key = (struct scoutfs_key) { + .sk_zone = zone, + ._sk_first = cpu_to_le64(first), + ._sk_second = cpu_to_le64(second), + }; +} + +static int next_log_merge_item_key(struct super_block *sb, struct scoutfs_btree_root *root, + u8 zone, struct scoutfs_key *key, void *val, size_t val_len) +{ + SCOUTFS_BTREE_ITEM_REF(iref); + int ret; + + ret = scoutfs_btree_next(sb, root, key, &iref); + if (ret == 0) { + if (iref.key->sk_zone != zone) + ret = -ENOENT; + else if (iref.val_len != val_len) + ret = -EIO; + else + memcpy(val, iref.val, val_len); + scoutfs_btree_put_iref(&iref); + } + + return ret; +} + +static int next_log_merge_item(struct super_block *sb, + struct scoutfs_btree_root *root, + u8 zone, u64 first, u64 second, + void *val, size_t val_len) +{ + struct scoutfs_key key; + + init_log_merge_key(&key, zone, first, second); + return next_log_merge_item_key(sb, root, zone, &key, val, val_len); +} + +/* + * We start a log merge operation if there are any finalized log trees + * whose greatest seq is within the last stable seq. This is called by + * every client's get_log_merge handler at a relatively low frequency + * until a merge starts. + */ +static int start_log_merge(struct super_block *sb, + struct scoutfs_super_block *super, + struct scoutfs_log_merge_status *stat_ret) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_log_trees *lt; + struct scoutfs_key key; + u64 last_seq; + bool start; + int ret; + int err; + + scoutfs_key_init_log_trees(&key, 0, 0); + + ret = get_stable_trans_seq(sb, &last_seq); + if (ret < 0) + goto out; + + scoutfs_key_init_log_trees(&key, 0, 0); + for (start = false; !start; scoutfs_key_inc(&key)) { + ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(*lt)) { + key = *iref.key; + lt = iref.val; + if ((le64_to_cpu(lt->flags) & + SCOUTFS_LOG_TREES_FINALIZED) && + (le64_to_cpu(lt->max_item_seq) <= + last_seq)) { + start = true; + } + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) + goto out; + } + + if (!start) { + ret = -ENOENT; + goto out; + } + + /* add an initial full-range */ + scoutfs_key_set_zeros(&rng.start); + scoutfs_key_set_ones(&rng.end); + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, &rng, sizeof(rng)); + if (ret < 0) + goto out; + + /* and add the merge status item */ + scoutfs_key_set_zeros(&stat.next_range_key); + stat.nr_requests = 0; + stat.nr_complete = 0; + stat.last_seq = cpu_to_le64(last_seq); + stat.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + /* queue free to see if there's lingering items to process */ + if (ret == 0) + queue_work(server->wq, &server->log_merge_free_work); +out: + if (ret == 0) + *stat_ret = stat; + return ret; +} + +/* Requests drain once we get this many completions to splice */ +#define LOG_MERGE_SPLICE_BATCH 8 + +/* + * Splice the completed subtrees from the clients back into the fs log + * tree as parents. Once they're spliced in, try and rebalance a path + * through them in case they need to be split or joined before the rest + * of their range can be processed. + * + * It's only safe to splice in merged parents when all the requests have + * drained and no requests are relying on stable key ranges of parents + * in the fs root. + * + * It doesn't matter that the fs tree produced by these subtree splices + * itself contains inconsistent items because the subtrees can contain + * fragments of transactions. The read-only finalized log btrees that + * are the source of the spliced items are still preferred by readers. + * It's only once all the finalized items have been merged, and all + * transactions are consistent, that we remove the finalized log trees + * and the fs tree items are used. + * + * As we splice in the subtrees we're implicitly allocating all the + * blocks referenced by the new subtree, and freeing all the blocks + * referenced by the old subtree that's overwritten. These allocs and + * frees were performed by the client as it did cow updates and were + * stored in the allocators that were sent with the completion. We + * merge in those allocators as we splice in the subtree. + * + * We can add back any remaining ranges for any partial completions and + * reset the next range key if there's still work to do. If the + * operation is complete then we tear down the input log_trees items and + * delete the status. + */ +static int splice_log_merge_completions(struct super_block *sb, + struct scoutfs_log_merge_status *stat, + bool no_ranges) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_complete comp; + struct scoutfs_log_merge_freeing fr; + struct scoutfs_log_merge_range rng; + struct scoutfs_log_trees lt = {{{0,}}}; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + u64 seq; + int ret; + + /* musn't rebalance fs tree parents while reqs rely on their key bounds */ + if (WARN_ON_ONCE(le64_to_cpu(stat->nr_requests) > 0)) + return -EIO; + + /* + * Splice in all the completed subtrees at the initial parent + * blocks in the main fs_tree before rebalancing any of them. + */ + for (seq = 0; ; seq++) { + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_COMPLETE_ZONE, seq, + 0, &comp, sizeof(comp)); + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + seq = le64_to_cpu(comp.seq); + + ret = scoutfs_btree_set_parent(sb, &server->alloc, &server->wri, + &super->fs_root, &comp.start, + &comp.root); + if (ret < 0) + goto out; + + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &comp.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &comp.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* clear allocators */ + memset(&comp.meta_avail, 0, sizeof(comp.meta_avail)); + memset(&comp.meta_freed, 0, sizeof(comp.meta_freed)); + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + seq, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &comp, sizeof(comp)); + if (ret < 0) + goto out; + } + + /* + * Now with all the parent blocks spliced in, rebalance items + * amongst parents that needed to split/join and delete the + * completion items, possibly returning ranges to process. + */ + for (seq = 0; ; seq++) { + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_COMPLETE_ZONE, seq, + 0, &comp, sizeof(comp)); + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + seq = le64_to_cpu(comp.seq); + + /* balance when there was a remaining key range */ + if (le64_to_cpu(comp.flags) & SCOUTFS_LOG_MERGE_COMP_REMAIN) { + ret = scoutfs_btree_rebalance(sb, &server->alloc, + &server->wri, + &super->fs_root, + &comp.start); + if (ret < 0) + goto out; + + rng.start = comp.remain; + rng.end = comp.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + no_ranges = false; + } + + /* delete the completion item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + seq, 0); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, + &key); + if (ret < 0) + goto out; + } + + /* update the status once all completes are processed */ + scoutfs_key_set_zeros(&stat->next_range_key); + stat->nr_complete = 0; + + /* update counts and done if there's still ranges to process */ + if (!no_ranges) { + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + stat, sizeof(*stat)); + goto out; + } + + /* no more ranges, free blooms and add freeing items for free work */ + lt.rid = 0; + lt.nr = 0; + for (;;) { + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr) + 1); + ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(lt)) { + key = *iref.key; + memcpy(<, iref.val, sizeof(lt)); + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + /* only free the inputs to the log merge that just finished */ + if (!(le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) || + (le64_to_cpu(lt.max_item_seq) > + le64_to_cpu(stat->last_seq))) + continue; + + fr.root = lt.item_root; + scoutfs_key_set_zeros(&fr.key); + fr.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_FREEING_ZONE, + le64_to_cpu(fr.seq), 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &fr, sizeof(fr)); + if (ret < 0) + goto out; + + if (lt.bloom_ref.blkno) { + ret = scoutfs_free_meta(sb, &server->alloc, + &server->wri, + le64_to_cpu(lt.bloom_ref.blkno)); + if (ret < 0) + goto out; + } + + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->logs_root, &key); + if (ret < 0) + goto out; + } + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret == 0) + queue_work(server->wq, &server->log_merge_free_work); +out: + BUG_ON(ret); /* inconsistent */ + + return ret; +} + +/* + * Search amongst the finalized log roots within the caller's merge seq looking + * for the earliest item within the caller's range. The caller has taken + * care of locking. + */ +static int next_least_log_item(struct super_block *sb, + struct scoutfs_btree_root *logs_root, + u64 seq, struct scoutfs_key *start, + struct scoutfs_key *end, + struct scoutfs_key *next_ret) +{ + struct scoutfs_btree_root item_root; + struct scoutfs_log_trees *lt; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + int ret; + + scoutfs_key_set_ones(next_ret); + + for (scoutfs_key_init_log_trees(&key, 0, 0); ; scoutfs_key_inc(&key)) { + + /* find the next finalized log root within the merge last_seq */ + ret = scoutfs_btree_next(sb, logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(*lt)) { + key = *iref.key; + lt = iref.val; + if ((le64_to_cpu(lt->flags) & + SCOUTFS_LOG_TREES_FINALIZED) && + (le64_to_cpu(lt->max_item_seq) <= seq)) + item_root = lt->item_root; + else + item_root.ref.blkno = 0; + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + goto out; + } + if (item_root.ref.blkno == 0) + continue; + + /* see if populated roots have item keys less than than next */ + ret = scoutfs_btree_next(sb, &item_root, start, &iref); + if (ret == 0) { + if (scoutfs_key_compare(iref.key, end) <= 0 && + scoutfs_key_compare(iref.key, next_ret) < 0) + *next_ret = *iref.key; + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + else + goto out; + } + } + +out: + if (ret == 0 && scoutfs_key_is_ones(next_ret)) + ret = -ENOENT; + + return ret; +} + +/* + * Once a merge is fully completed all of the finalized input log btrees + * are redundant and can be freed. + * + * As merging finishes and the status item is deleted, we also move all + * the finalized roots from log_trees items over into freeing items. + * This work is then kicked off which iterates over all the freeing + * items calling into the btree to free all its referenced blocks, with + * the key tracking partial progress. + * + * The freeing work is reasonably light. We only read the btree blocks + * and add freed blocks to merge back into the core allocators. The + * server can handle this load and we avoid the io overhead and + * complexity of farming it out to clients. + */ +static void server_log_merge_free_work(struct work_struct *work) +{ + struct server_info *server = container_of(work, struct server_info, + log_merge_free_work); + struct super_block *sb = server->sb; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_log_merge_freeing fr; + struct scoutfs_key key; + bool commit = false; + int ret = 0; + + /* shutdown waits for us, we'll eventually load set shutting_down */ + while (!server->shutting_down) { + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + commit = true; + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_FREEING_ZONE, + 0, 0, &fr, sizeof(fr)); + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + break; + } + + ret = scoutfs_btree_free_blocks(sb, &server->alloc, + &server->wri, &fr.key, + &fr.root, 10); + if (ret < 0) + break; + + /* freed blocks are in allocator, we *have* to update key */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_FREEING_ZONE, + le64_to_cpu(fr.seq), 0); + if (scoutfs_key_is_ones(&fr.key)) + ret = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + else + ret = scoutfs_btree_update(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &fr, sizeof(fr)); + /* freed blocks are in allocator, we *have* to update fr */ + BUG_ON(ret < 0); + + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + commit = false; + if (ret < 0) + break; + } + + if (commit) { + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + } + + if (ret < 0) { + scoutfs_err(sb, "server error freeing merged btree blocks: %d", + ret); + stop_server(server); + } + + /* not re-arming, regularly queued by the server during merging */ +} + +/* + * This will return ENOENT to the client if there is no work to do. + */ +static int server_get_log_merge(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + DECLARE_SERVER_INFO(sb, server); + u64 rid = scoutfs_net_client_rid(conn); + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + struct scoutfs_log_merge_range remain; + struct scoutfs_log_merge_request req; + struct scoutfs_key par_start; + struct scoutfs_key par_end; + struct scoutfs_key next_key; + struct scoutfs_key key; + bool ins_rng; + bool del_remain; + bool del_req; + bool upd_stat; + bool no_ranges; + bool no_next; + int ret; + int err; + + if (arg_len != 0) + return -EINVAL; + + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + +restart: + memset(&req, 0, sizeof(req)); + ins_rng = false; + del_remain = false; + del_req = false; + upd_stat = false; + + /* get the status item, maybe creating a new one */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret == -ENOENT) + ret = start_log_merge(sb, super, &stat); + if (ret < 0) + goto out; + + trace_scoutfs_get_log_merge_status(sb, rid, &stat.next_range_key, + le64_to_cpu(stat.nr_requests), + le64_to_cpu(stat.nr_complete), + le64_to_cpu(stat.last_seq), + le64_to_cpu(stat.seq)); + + /* find the next range, always checking for splicing */ + for (;;) { + key = stat.next_range_key; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = next_log_merge_item_key(sb, &super->log_merge, SCOUTFS_LOG_MERGE_RANGE_ZONE, + &key, &rng, sizeof(rng)); + if (ret < 0 && ret != -ENOENT) + goto out; + + /* maybe splice now that we know if there's ranges */ + no_next = ret == -ENOENT; + no_ranges = scoutfs_key_is_zeros(&stat.next_range_key) && ret == -ENOENT; + if (le64_to_cpu(stat.nr_requests) == 0 && + (no_next || le64_to_cpu(stat.nr_complete) >= LOG_MERGE_SPLICE_BATCH)) { + ret = splice_log_merge_completions(sb, &stat, no_ranges); + if (ret < 0) + goto out; + /* splicing resets key and adds ranges, could finish status */ + goto restart; + } + + /* no ranges from next for requests, future attempts will create or splice */ + if (no_next) { + ret = -ENOENT; + goto out; + } + + /* see if we should back off after splicing might have deleted completions */ + if ((le64_to_cpu(stat.nr_requests) + + le64_to_cpu(stat.nr_complete)) >= LOG_MERGE_SPLICE_BATCH) { + ret = -ENOENT; + goto out; + } + + /* find the next logged item in the next range */ + ret = next_least_log_item(sb, &super->logs_root, + le64_to_cpu(stat.last_seq), + &rng.start, &rng.end, &next_key); + if (ret == 0) + break; + /* drop the range if it contained no logged items */ + if (ret == -ENOENT) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + } + if (ret < 0) + goto out; + } + + /* start to build the request that's saved and sent to the client */ + req.logs_root = super->logs_root; + req.last_seq = stat.last_seq; + req.rid = cpu_to_le64(rid); + req.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + req.flags = 0; + if (super->fs_root.height > 2) + req.flags |= cpu_to_le64(SCOUTFS_LOG_MERGE_REQUEST_SUBTREE); + + /* find the fs_root parent block and its key range */ + ret = scoutfs_btree_get_parent(sb, &super->fs_root, &next_key, + &req.root) ?: + scoutfs_btree_parent_range(sb, &super->fs_root, &next_key, + &par_start, &par_end); + if (ret < 0) + goto out; + + /* start from next item, don't exceed parent key range */ + req.start = next_key; + req.end = rng.end; + if (scoutfs_key_compare(&par_end, &req.end) < 0) + req.end = par_end; + + /* delete the old range */ + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + ins_rng = true; + + /* add remaining range if we have to */ + if (scoutfs_key_compare(&rng.end, &req.end) > 0) { + remain.start = req.end; + scoutfs_key_inc(&remain.start); + remain.end = rng.end; + + key = remain.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &remain, sizeof(remain)); + if (ret < 0) + goto out; + del_remain = true; + } + + /* give the client an allocation pool to work with */ + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_fill_list(sb, &server->alloc, &server->wri, + &req.meta_avail, server->meta_avail, + SCOUTFS_SERVER_MERGE_FILL_LO, + SCOUTFS_SERVER_MERGE_FILL_TARGET); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* save the request that will be sent to the client */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(req.seq)); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &req, sizeof(req)); + if (ret < 0) + goto out; + del_req = true; + + trace_scoutfs_get_log_merge_request(sb, rid, &req.root, + &req.start, &req.end, + le64_to_cpu(req.last_seq), + le64_to_cpu(req.seq)); + + /* make sure next range avoids ranges for parent in use */ + stat.next_range_key = par_end; + if (!scoutfs_key_is_ones(&stat.next_range_key)) + scoutfs_key_inc(&stat.next_range_key); + + /* update the status requests count */ + le64_add_cpu(&stat.nr_requests, 1); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) + goto out; + upd_stat = true; + +out: + if (ret < 0) { + /* undo any our partial item changes */ + if (upd_stat) { + le64_add_cpu(&stat.nr_requests, -1ULL); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, + 0, 0); + err = scoutfs_btree_update(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + BUG_ON(err); /* inconsistent */ + } + + if (del_req) { + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, + rid, le64_to_cpu(req.seq)); + err = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + if (del_remain) { + key = remain.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + if (ins_rng) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + BUG_ON(err); /* inconsistent */ + } + + /* reclaim allocation if we failed */ + mutex_lock(&server->alloc_mutex); + err = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_avail); + mutex_unlock(&server->alloc_mutex); + BUG_ON(err); /* inconsistent */ + } + + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + + return scoutfs_net_response(sb, conn, cmd, id, ret, &req, sizeof(req)); +} + +/* + * Commit the client's leg merge work. Typically we store the + * completion so that we can later splice it back into the fs root and + * reclaim its allocators later in a batch. If it failed we reclaim it + * immediately. + */ +static int server_commit_log_merge(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + DECLARE_SERVER_INFO(sb, server); + u64 rid = scoutfs_net_client_rid(conn); + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_request orig_req; + struct scoutfs_log_merge_complete *comp; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + struct scoutfs_key key; + int ret; + + scoutfs_key_set_zeros(&rng.end); + + if (arg_len != sizeof(struct scoutfs_log_merge_complete)) + return -EINVAL; + comp = arg; + + trace_scoutfs_get_log_merge_complete(sb, rid, &comp->root, + &comp->start, &comp->end, + &comp->remain, + le64_to_cpu(comp->seq), + le64_to_cpu(comp->flags)); + + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + + /* find the status of the current log merge */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret < 0) { + WARN_ON_ONCE(ret == -ENOENT); /* inconsistent */ + goto out; + } + + /* find the completion's original saved request */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_REQUEST_ZONE, + rid, le64_to_cpu(comp->seq), + &orig_req, sizeof(orig_req)); + if (WARN_ON_ONCE(ret == 0 && (comp->rid != orig_req.rid || + comp->seq != orig_req.seq))) + ret = -ENOENT; /* inconsistency */ + if (ret < 0) { + WARN_ON_ONCE(ret == -ENOENT); /* inconsistency */ + goto out; + } + + /* delete the original request item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(orig_req.seq)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + + if (le64_to_cpu(comp->flags) & SCOUTFS_LOG_MERGE_COMP_ERROR) { + /* restore the range and reclaim the allocator if it failed */ + rng.start = orig_req.start; + rng.end = orig_req.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &orig_req.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &orig_req.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + } else { + /* otherwise store the completion for later splicing */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + le64_to_cpu(comp->seq), 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + comp, sizeof(*comp)); + if (ret < 0) + goto out; + + le64_add_cpu(&stat.nr_complete, 1ULL); + } + + /* and update the status counts */ + le64_add_cpu(&stat.nr_requests, -1ULL); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) + goto out; + +out: + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + BUG_ON(ret < 0); /* inconsistent */ + + return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); +} + /* The server is receiving an omap response from the client */ static int open_ino_map_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) @@ -1613,6 +2582,113 @@ static int cancel_srch_compact(struct super_block *sb, u64 rid) return ret; } +/* + * Clean up any log merge requests which have now been abandoned because + * their client was evicted. This is always called on eviction and + * there may have been no merge in progres or our client had no + * outstanding requests. For each pending request, we reclaim its + * allocators, delte its item, and update the status. + * + * The request we cancel might have been the last request which + * prevented batch processing, but we don't check that here. This is in + * the client eviction path and we want that to be as light and + * responsive as possible so we can get back up and running. The next + * client get_log_merge request will see that no more requests are + * outstanding. + * + * The caller holds a commit, but we're responsible for locking. + */ +static int cancel_log_merge(struct super_block *sb, u64 rid) +{ + DECLARE_SERVER_INFO(sb, server); + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_request req; + struct scoutfs_log_merge_range rng; + struct scoutfs_key key; + bool update = false; + u64 seq; + int ret; + + mutex_lock(&server->logs_mutex); + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + goto out; + } + + for (seq = 0; ; seq++) { + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + seq, &req, sizeof(req)); + if (ret == 0 && le64_to_cpu(req.rid) != rid) + ret = -ENOENT; + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + break; + } + + seq = le64_to_cpu(req.seq); + + /* remove request item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(req.seq)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + + /* restore range */ + rng.start = req.start; + rng.end = req.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + + /* reclaim allocator */ + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* update count */ + le64_add_cpu(&stat.nr_requests, -1ULL); + update = true; + } + + if (update) { + /* and update the status counts */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + } +out: + mutex_unlock(&server->logs_mutex); + + BUG_ON(ret < 0); /* XXX inconsistent */ + return ret; +} + /* * Farewell processing is async to the request processing work. Shutdown * waits for request processing to finish and then tears down the connection. @@ -1758,8 +2834,9 @@ static int reclaim_rid(struct super_block *sb, u64 rid) /* delete mounted client last, recovery looks for it */ ret = scoutfs_lock_server_farewell(sb, rid) ?: remove_trans_seq(sb, rid) ?: - reclaim_log_trees(sb, rid) ?: + reclaim_open_log_tree(sb, rid) ?: cancel_srch_compact(sb, rid) ?: + cancel_log_merge(sb, rid) ?: scoutfs_omap_remove_rid(sb, rid) ?: delete_mounted_client(sb, rid); @@ -1995,6 +3072,8 @@ static scoutfs_net_request_t server_req_funcs[] = { [SCOUTFS_NET_CMD_LOCK] = server_lock, [SCOUTFS_NET_CMD_SRCH_GET_COMPACT] = server_srch_get_compact, [SCOUTFS_NET_CMD_SRCH_COMMIT_COMPACT] = server_srch_commit_compact, + [SCOUTFS_NET_CMD_GET_LOG_MERGE] = server_get_log_merge, + [SCOUTFS_NET_CMD_COMMIT_LOG_MERGE] = server_commit_log_merge, [SCOUTFS_NET_CMD_OPEN_INO_MAP] = server_open_ino_map, [SCOUTFS_NET_CMD_GET_VOLOPT] = server_get_volopt, [SCOUTFS_NET_CMD_SET_VOLOPT] = server_set_volopt, @@ -2367,6 +3446,8 @@ shutdown: scoutfs_net_shutdown(sb, conn); server->conn = NULL; + flush_work(&server->log_merge_free_work); + /* stop tracking recovery, cancel timer, flush any fencing */ scoutfs_recov_shutdown(sb); flush_work(&server->fence_pending_recov_work); @@ -2434,6 +3515,7 @@ void scoutfs_server_stop(struct super_block *sb) cancel_work_sync(&server->work); cancel_work_sync(&server->farewell_work); cancel_work_sync(&server->commit_work); + cancel_work_sync(&server->log_merge_free_work); } int scoutfs_server_setup(struct super_block *sb) @@ -2459,6 +3541,7 @@ int scoutfs_server_setup(struct super_block *sb) INIT_WORK(&server->farewell_work, farewell_worker); mutex_init(&server->alloc_mutex); mutex_init(&server->logs_mutex); + INIT_WORK(&server->log_merge_free_work, server_log_merge_free_work); mutex_init(&server->srch_mutex); mutex_init(&server->mounted_clients_mutex); seqcount_init(&server->roots_seqcount);