diff --git a/kmod/src/alloc.h b/kmod/src/alloc.h index 1e245c5f..9130d086 100644 --- a/kmod/src/alloc.h +++ b/kmod/src/alloc.h @@ -55,6 +55,16 @@ #define SCOUTFS_SERVER_DATA_FILL_LO \ (1ULL * 1024 * 1024 * 1024 >> SCOUTFS_BLOCK_SM_SHIFT) +/* + * Log merge meta allocations are only used for one request and will + * never use more than the dirty limit. + */ +#define SCOUTFS_LOG_MERGE_DIRTY_BYTE_LIMIT (64ULL * 1024 * 1024) +/* a few extra blocks for alloc blocks */ +#define SCOUTFS_SERVER_MERGE_FILL_TARGET \ + ((SCOUTFS_LOG_MERGE_DIRTY_BYTE_LIMIT >> SCOUTFS_BLOCK_LG_SHIFT) + 4) +#define SCOUTFS_SERVER_MERGE_FILL_LO SCOUTFS_SERVER_MERGE_FILL_TARGET + /* * Each of the server meta_alloc roots will try to keep a minimum amount * of free blocks. The server will swap roots when its current avail diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 2be1014a..4e58ef7a 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -2016,6 +2016,116 @@ TRACE_EVENT(scoutfs_trans_seq_last, SCSB_TRACE_ARGS, __entry->s_rid, __entry->trans_seq) ); +TRACE_EVENT(scoutfs_get_log_merge_status, + TP_PROTO(struct super_block *sb, u64 rid, struct scoutfs_key *next_range_key, + u64 nr_requests, u64 nr_complete, u64 last_seq, u64 seq), + + TP_ARGS(sb, rid, next_range_key, nr_requests, nr_complete, last_seq, seq), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + sk_trace_define(next_range_key) + __field(__u64, nr_requests) + __field(__u64, nr_complete) + __field(__u64, last_seq) + __field(__u64, seq) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + sk_trace_assign(next_range_key, next_range_key); + __entry->nr_requests = nr_requests; + __entry->nr_complete = nr_complete; + __entry->last_seq = last_seq; + __entry->seq = seq; + ), + + TP_printk(SCSBF" rid %016llx next_range_key "SK_FMT" nr_requests %llu nr_complete %llu last_seq %llu seq %llu", + SCSB_TRACE_ARGS, __entry->s_rid, sk_trace_args(next_range_key), + __entry->nr_requests, __entry->nr_complete, __entry->last_seq, __entry->seq) +); + +TRACE_EVENT(scoutfs_get_log_merge_request, + TP_PROTO(struct super_block *sb, u64 rid, + struct scoutfs_btree_root *root, struct scoutfs_key *start, + struct scoutfs_key *end, u64 last_seq, u64 seq), + + TP_ARGS(sb, rid, root, start, end, last_seq, seq), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + __field(__u64, root_blkno) + __field(__u64, root_seq) + __field(__u8, root_height) + sk_trace_define(start) + sk_trace_define(end) + __field(__u64, last_seq) + __field(__u64, seq) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + __entry->root_blkno = le64_to_cpu(root->ref.blkno); + __entry->root_seq = le64_to_cpu(root->ref.seq); + __entry->root_height = root->height; + sk_trace_assign(start, start); + sk_trace_assign(end, end); + __entry->last_seq = last_seq; + __entry->seq = seq; + ), + + TP_printk(SCSBF" rid %016llx root blkno %llu seq %llu height %u start "SK_FMT" end "SK_FMT" last_seq %llu seq %llu", + SCSB_TRACE_ARGS, __entry->s_rid, __entry->root_blkno, + __entry->root_seq, __entry->root_height, + sk_trace_args(start), sk_trace_args(end), __entry->last_seq, + __entry->seq) +); + +TRACE_EVENT(scoutfs_get_log_merge_complete, + TP_PROTO(struct super_block *sb, u64 rid, + struct scoutfs_btree_root *root, struct scoutfs_key *start, + struct scoutfs_key *end, struct scoutfs_key *remain, + u64 seq, u64 flags), + + TP_ARGS(sb, rid, root, start, end, remain, seq, flags), + + TP_STRUCT__entry( + SCSB_TRACE_FIELDS + __field(__u64, s_rid) + __field(__u64, root_blkno) + __field(__u64, root_seq) + __field(__u8, root_height) + sk_trace_define(start) + sk_trace_define(end) + sk_trace_define(remain) + __field(__u64, seq) + __field(__u64, flags) + ), + + TP_fast_assign( + SCSB_TRACE_ASSIGN(sb); + __entry->s_rid = rid; + __entry->root_blkno = le64_to_cpu(root->ref.blkno); + __entry->root_seq = le64_to_cpu(root->ref.seq); + __entry->root_height = root->height; + sk_trace_assign(start, start); + sk_trace_assign(end, end); + sk_trace_assign(remain, remain); + __entry->seq = seq; + __entry->flags = flags; + ), + + TP_printk(SCSBF" rid %016llx root blkno %llu seq %llu height %u start "SK_FMT" end "SK_FMT" remain "SK_FMT" seq %llu flags 0x%llx", + SCSB_TRACE_ARGS, __entry->s_rid, __entry->root_blkno, + __entry->root_seq, __entry->root_height, + sk_trace_args(start), sk_trace_args(end), + sk_trace_args(remain), __entry->seq, __entry->flags) +); + DECLARE_EVENT_CLASS(scoutfs_forest_bloom_class, TP_PROTO(struct super_block *sb, struct scoutfs_key *key, u64 rid, u64 nr, u64 blkno, u64 seq, unsigned int count), diff --git a/kmod/src/server.c b/kmod/src/server.c index d89a83ca..15d3f6ed 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -96,6 +96,8 @@ struct server_info { struct scoutfs_block_writer wri; struct mutex logs_mutex; + struct work_struct log_merge_free_work; + struct mutex srch_mutex; struct mutex mounted_clients_mutex; @@ -604,6 +606,35 @@ static void set_extent_zone_bits(struct super_block *sb, void *cb_arg, struct sc mod_extent_bits(cba->zones, cba->zone_blocks, ext->start, ext->len, true); } +static int find_log_trees_item(struct super_block *sb, + struct scoutfs_btree_root *logs_root, + bool call_next, u64 rid, u64 nr, + struct scoutfs_log_trees *lt_ret) +{ + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + int ret; + + scoutfs_key_init_log_trees(&key, rid, nr); + if (call_next) + ret = scoutfs_btree_next(sb, logs_root, &key, &iref); + else + ret = scoutfs_btree_prev(sb, logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(struct scoutfs_log_trees)) { + if (le64_to_cpu(iref.key->sklt_rid) != rid) + ret = -ENOENT; + else + memcpy(lt_ret, iref.val, iref.val_len); + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + + return ret; +} + /* * Give the client roots to all the trees that they'll use to build * their transaction. @@ -613,6 +644,9 @@ static void set_extent_zone_bits(struct super_block *sb, void *cb_arg, struct sc * trees back into the core allocators. They're were committed with the * previous transaction so they're stable and can now be reused, even by * the server in this commit. + * + * If the committed log trees are large enough we finalize them and make + * them available to log merging. */ static int server_get_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, @@ -624,10 +658,12 @@ static int server_get_log_trees(struct super_block *sb, __le64 exclusive[SCOUTFS_DATA_ALLOC_ZONE_LE64S]; __le64 vacant[SCOUTFS_DATA_ALLOC_ZONE_LE64S]; struct alloc_extent_cb_args cba; - SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_log_trees fin; struct scoutfs_log_trees lt; struct scoutfs_key key; + bool have_fin = false; u64 data_zone_blocks; + u64 nr; int ret; if (arg_len != 0) { @@ -639,32 +675,55 @@ static int server_get_log_trees(struct super_block *sb, mutex_lock(&server->logs_mutex); - scoutfs_key_init_log_trees(&key, rid, U64_MAX); - - ret = scoutfs_btree_prev(sb, &super->logs_root, &key, &iref); + /* see if we have already have a finalized root from the rid */ + ret = find_log_trees_item(sb, &super->logs_root, true, rid, 0, <); if (ret < 0 && ret != -ENOENT) goto unlock; - if (ret == 0) { - if (iref.val_len == sizeof(struct scoutfs_log_trees)) { - key = *iref.key; - memcpy(<, iref.val, iref.val_len); - if (le64_to_cpu(key.sklt_rid) != rid) - ret = -ENOENT; - } else { - ret = -EIO; - } - scoutfs_btree_put_iref(&iref); - if (ret == -EIO) - goto unlock; + if (ret == 0 && le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) + have_fin = true; + + /* use the last non-finalized root, or start a new one */ + ret = find_log_trees_item(sb, &super->logs_root, false, rid, U64_MAX, + <); + if (ret < 0 && ret != -ENOENT) + goto unlock; + if (ret == 0 && le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) { + ret = -ENOENT; + nr = le64_to_cpu(lt.nr) + 1; + } else if (ret == -ENOENT) { + nr = 1; } - /* initialize new roots if we don't have any */ + /* initialize a new root if we don't have a non-finalized one */ if (ret == -ENOENT) { - key.sklt_rid = cpu_to_le64(rid); - key.sklt_nr = cpu_to_le64(1); memset(<, 0, sizeof(lt)); - lt.rid = key.sklt_rid; - lt.nr = key.sklt_nr; + lt.rid = cpu_to_le64(rid); + lt.nr = cpu_to_le64(nr); + } + + /* finalize an existing root when large enough and don't have one */ + if (lt.item_root.height > 2 && !have_fin) { + fin = lt; + memset(&fin.meta_avail, 0, sizeof(fin.meta_avail)); + memset(&fin.meta_freed, 0, sizeof(fin.meta_freed)); + memset(&fin.data_avail, 0, sizeof(fin.data_avail)); + memset(&fin.data_freed, 0, sizeof(fin.data_freed)); + memset(&fin.srch_file, 0, sizeof(fin.srch_file)); + le64_add_cpu(&fin.flags, SCOUTFS_LOG_TREES_FINALIZED); + + scoutfs_key_init_log_trees(&key, le64_to_cpu(fin.rid), + le64_to_cpu(fin.nr)); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->logs_root, &key, &fin, + sizeof(fin)); + if (ret < 0) + goto unlock; + + memset(<.item_root, 0, sizeof(lt.item_root)); + memset(<.bloom_ref, 0, sizeof(lt.bloom_ref)); + lt.max_item_seq = 0; + le64_add_cpu(<.nr, 1); + lt.flags = 0; } if (get_volopt_val(server, SCOUTFS_VOLOPT_DATA_ALLOC_ZONE_BLOCKS_NR, &data_zone_blocks)) { @@ -708,6 +767,8 @@ static int server_get_log_trees(struct super_block *sb, } /* update client's log tree's item */ + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr)); ret = scoutfs_btree_force(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); unlock: @@ -811,8 +872,9 @@ static int server_get_roots(struct super_block *sb, /* * A client is being evicted so we want to reclaim resources from their - * log tree items. The item trees and bloom refs stay around to be read - * and eventually merged and we reclaim all the allocator items. + * open log tree item. The item tree and bloom ref stay around to be + * read and we finalize the tree so that it will be merged. We reclaim + * all the allocator items. * * The caller holds the commit rwsem which means we do all this work in * one server commit. We'll need to keep the total amount of blocks in @@ -826,7 +888,7 @@ static int server_get_roots(struct super_block *sb, * We can return an error without fully reclaiming all the log item's * referenced data. */ -static int reclaim_log_trees(struct super_block *sb, u64 rid) +static int reclaim_open_log_tree(struct super_block *sb, u64 rid) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); @@ -838,14 +900,16 @@ static int reclaim_log_trees(struct super_block *sb, u64 rid) mutex_lock(&server->logs_mutex); - /* find the client's existing item */ - scoutfs_key_init_log_trees(&key, rid, 0); - ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + /* find the client's last open log_tree */ + scoutfs_key_init_log_trees(&key, rid, U64_MAX); + ret = scoutfs_btree_prev(sb, &super->logs_root, &key, &iref); if (ret == 0) { if (iref.val_len == sizeof(struct scoutfs_log_trees)) { key = *iref.key; memcpy(<, iref.val, iref.val_len); - if (le64_to_cpu(key.sklt_rid) != rid) + if ((le64_to_cpu(key.sklt_rid) != rid) || + (le64_to_cpu(lt.flags) & + SCOUTFS_LOG_TREES_FINALIZED)) ret = -ENOENT; } else { ret = -EIO; @@ -876,6 +940,7 @@ static int reclaim_log_trees(struct super_block *sb, u64 rid) /* the mount is no longer writing to the zones */ zero_data_alloc_zone_bits(<); + le64_add_cpu(<.flags, SCOUTFS_LOG_TREES_FINALIZED); err = scoutfs_btree_update(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); @@ -1275,6 +1340,910 @@ out: return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); } +/* + * Log merge range items are stored at the starting fs key of the range. + * The only fs key field that doesn't hold information is the zone, so + * we use the zone to differentiate all types that we store in the log + * merge tree. + */ +static void init_log_merge_key(struct scoutfs_key *key, u8 zone, u64 first, + u64 second) +{ + *key = (struct scoutfs_key) { + .sk_zone = zone, + ._sk_first = cpu_to_le64(first), + ._sk_second = cpu_to_le64(second), + }; +} + +static int next_log_merge_item_key(struct super_block *sb, struct scoutfs_btree_root *root, + u8 zone, struct scoutfs_key *key, void *val, size_t val_len) +{ + SCOUTFS_BTREE_ITEM_REF(iref); + int ret; + + ret = scoutfs_btree_next(sb, root, key, &iref); + if (ret == 0) { + if (iref.key->sk_zone != zone) + ret = -ENOENT; + else if (iref.val_len != val_len) + ret = -EIO; + else + memcpy(val, iref.val, val_len); + scoutfs_btree_put_iref(&iref); + } + + return ret; +} + +static int next_log_merge_item(struct super_block *sb, + struct scoutfs_btree_root *root, + u8 zone, u64 first, u64 second, + void *val, size_t val_len) +{ + struct scoutfs_key key; + + init_log_merge_key(&key, zone, first, second); + return next_log_merge_item_key(sb, root, zone, &key, val, val_len); +} + +/* + * We start a log merge operation if there are any finalized log trees + * whose greatest seq is within the last stable seq. This is called by + * every client's get_log_merge handler at a relatively low frequency + * until a merge starts. + */ +static int start_log_merge(struct super_block *sb, + struct scoutfs_super_block *super, + struct scoutfs_log_merge_status *stat_ret) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_log_trees *lt; + struct scoutfs_key key; + u64 last_seq; + bool start; + int ret; + int err; + + scoutfs_key_init_log_trees(&key, 0, 0); + + ret = get_stable_trans_seq(sb, &last_seq); + if (ret < 0) + goto out; + + scoutfs_key_init_log_trees(&key, 0, 0); + for (start = false; !start; scoutfs_key_inc(&key)) { + ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(*lt)) { + key = *iref.key; + lt = iref.val; + if ((le64_to_cpu(lt->flags) & + SCOUTFS_LOG_TREES_FINALIZED) && + (le64_to_cpu(lt->max_item_seq) <= + last_seq)) { + start = true; + } + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) + goto out; + } + + if (!start) { + ret = -ENOENT; + goto out; + } + + /* add an initial full-range */ + scoutfs_key_set_zeros(&rng.start); + scoutfs_key_set_ones(&rng.end); + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, &rng, sizeof(rng)); + if (ret < 0) + goto out; + + /* and add the merge status item */ + scoutfs_key_set_zeros(&stat.next_range_key); + stat.nr_requests = 0; + stat.nr_complete = 0; + stat.last_seq = cpu_to_le64(last_seq); + stat.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + /* queue free to see if there's lingering items to process */ + if (ret == 0) + queue_work(server->wq, &server->log_merge_free_work); +out: + if (ret == 0) + *stat_ret = stat; + return ret; +} + +/* Requests drain once we get this many completions to splice */ +#define LOG_MERGE_SPLICE_BATCH 8 + +/* + * Splice the completed subtrees from the clients back into the fs log + * tree as parents. Once they're spliced in, try and rebalance a path + * through them in case they need to be split or joined before the rest + * of their range can be processed. + * + * It's only safe to splice in merged parents when all the requests have + * drained and no requests are relying on stable key ranges of parents + * in the fs root. + * + * It doesn't matter that the fs tree produced by these subtree splices + * itself contains inconsistent items because the subtrees can contain + * fragments of transactions. The read-only finalized log btrees that + * are the source of the spliced items are still preferred by readers. + * It's only once all the finalized items have been merged, and all + * transactions are consistent, that we remove the finalized log trees + * and the fs tree items are used. + * + * As we splice in the subtrees we're implicitly allocating all the + * blocks referenced by the new subtree, and freeing all the blocks + * referenced by the old subtree that's overwritten. These allocs and + * frees were performed by the client as it did cow updates and were + * stored in the allocators that were sent with the completion. We + * merge in those allocators as we splice in the subtree. + * + * We can add back any remaining ranges for any partial completions and + * reset the next range key if there's still work to do. If the + * operation is complete then we tear down the input log_trees items and + * delete the status. + */ +static int splice_log_merge_completions(struct super_block *sb, + struct scoutfs_log_merge_status *stat, + bool no_ranges) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_complete comp; + struct scoutfs_log_merge_freeing fr; + struct scoutfs_log_merge_range rng; + struct scoutfs_log_trees lt = {{{0,}}}; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + u64 seq; + int ret; + + /* musn't rebalance fs tree parents while reqs rely on their key bounds */ + if (WARN_ON_ONCE(le64_to_cpu(stat->nr_requests) > 0)) + return -EIO; + + /* + * Splice in all the completed subtrees at the initial parent + * blocks in the main fs_tree before rebalancing any of them. + */ + for (seq = 0; ; seq++) { + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_COMPLETE_ZONE, seq, + 0, &comp, sizeof(comp)); + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + seq = le64_to_cpu(comp.seq); + + ret = scoutfs_btree_set_parent(sb, &server->alloc, &server->wri, + &super->fs_root, &comp.start, + &comp.root); + if (ret < 0) + goto out; + + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &comp.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &comp.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* clear allocators */ + memset(&comp.meta_avail, 0, sizeof(comp.meta_avail)); + memset(&comp.meta_freed, 0, sizeof(comp.meta_freed)); + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + seq, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &comp, sizeof(comp)); + if (ret < 0) + goto out; + } + + /* + * Now with all the parent blocks spliced in, rebalance items + * amongst parents that needed to split/join and delete the + * completion items, possibly returning ranges to process. + */ + for (seq = 0; ; seq++) { + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_COMPLETE_ZONE, seq, + 0, &comp, sizeof(comp)); + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + seq = le64_to_cpu(comp.seq); + + /* balance when there was a remaining key range */ + if (le64_to_cpu(comp.flags) & SCOUTFS_LOG_MERGE_COMP_REMAIN) { + ret = scoutfs_btree_rebalance(sb, &server->alloc, + &server->wri, + &super->fs_root, + &comp.start); + if (ret < 0) + goto out; + + rng.start = comp.remain; + rng.end = comp.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + no_ranges = false; + } + + /* delete the completion item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + seq, 0); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, + &key); + if (ret < 0) + goto out; + } + + /* update the status once all completes are processed */ + scoutfs_key_set_zeros(&stat->next_range_key); + stat->nr_complete = 0; + + /* update counts and done if there's still ranges to process */ + if (!no_ranges) { + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + stat, sizeof(*stat)); + goto out; + } + + /* no more ranges, free blooms and add freeing items for free work */ + lt.rid = 0; + lt.nr = 0; + for (;;) { + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr) + 1); + ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(lt)) { + key = *iref.key; + memcpy(<, iref.val, sizeof(lt)); + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + + /* only free the inputs to the log merge that just finished */ + if (!(le64_to_cpu(lt.flags) & SCOUTFS_LOG_TREES_FINALIZED) || + (le64_to_cpu(lt.max_item_seq) > + le64_to_cpu(stat->last_seq))) + continue; + + fr.root = lt.item_root; + scoutfs_key_set_zeros(&fr.key); + fr.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_FREEING_ZONE, + le64_to_cpu(fr.seq), 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &fr, sizeof(fr)); + if (ret < 0) + goto out; + + if (lt.bloom_ref.blkno) { + ret = scoutfs_free_meta(sb, &server->alloc, + &server->wri, + le64_to_cpu(lt.bloom_ref.blkno)); + if (ret < 0) + goto out; + } + + scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), + le64_to_cpu(lt.nr)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->logs_root, &key); + if (ret < 0) + goto out; + } + + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret == 0) + queue_work(server->wq, &server->log_merge_free_work); +out: + BUG_ON(ret); /* inconsistent */ + + return ret; +} + +/* + * Search amongst the finalized log roots within the caller's merge seq looking + * for the earliest item within the caller's range. The caller has taken + * care of locking. + */ +static int next_least_log_item(struct super_block *sb, + struct scoutfs_btree_root *logs_root, + u64 seq, struct scoutfs_key *start, + struct scoutfs_key *end, + struct scoutfs_key *next_ret) +{ + struct scoutfs_btree_root item_root; + struct scoutfs_log_trees *lt; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key key; + int ret; + + scoutfs_key_set_ones(next_ret); + + for (scoutfs_key_init_log_trees(&key, 0, 0); ; scoutfs_key_inc(&key)) { + + /* find the next finalized log root within the merge last_seq */ + ret = scoutfs_btree_next(sb, logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(*lt)) { + key = *iref.key; + lt = iref.val; + if ((le64_to_cpu(lt->flags) & + SCOUTFS_LOG_TREES_FINALIZED) && + (le64_to_cpu(lt->max_item_seq) <= seq)) + item_root = lt->item_root; + else + item_root.ref.blkno = 0; + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + goto out; + } + if (item_root.ref.blkno == 0) + continue; + + /* see if populated roots have item keys less than than next */ + ret = scoutfs_btree_next(sb, &item_root, start, &iref); + if (ret == 0) { + if (scoutfs_key_compare(iref.key, end) <= 0 && + scoutfs_key_compare(iref.key, next_ret) < 0) + *next_ret = *iref.key; + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + else + goto out; + } + } + +out: + if (ret == 0 && scoutfs_key_is_ones(next_ret)) + ret = -ENOENT; + + return ret; +} + +/* + * Once a merge is fully completed all of the finalized input log btrees + * are redundant and can be freed. + * + * As merging finishes and the status item is deleted, we also move all + * the finalized roots from log_trees items over into freeing items. + * This work is then kicked off which iterates over all the freeing + * items calling into the btree to free all its referenced blocks, with + * the key tracking partial progress. + * + * The freeing work is reasonably light. We only read the btree blocks + * and add freed blocks to merge back into the core allocators. The + * server can handle this load and we avoid the io overhead and + * complexity of farming it out to clients. + */ +static void server_log_merge_free_work(struct work_struct *work) +{ + struct server_info *server = container_of(work, struct server_info, + log_merge_free_work); + struct super_block *sb = server->sb; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_log_merge_freeing fr; + struct scoutfs_key key; + bool commit = false; + int ret = 0; + + /* shutdown waits for us, we'll eventually load set shutting_down */ + while (!server->shutting_down) { + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + commit = true; + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_FREEING_ZONE, + 0, 0, &fr, sizeof(fr)); + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + break; + } + + ret = scoutfs_btree_free_blocks(sb, &server->alloc, + &server->wri, &fr.key, + &fr.root, 10); + if (ret < 0) + break; + + /* freed blocks are in allocator, we *have* to update key */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_FREEING_ZONE, + le64_to_cpu(fr.seq), 0); + if (scoutfs_key_is_ones(&fr.key)) + ret = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + else + ret = scoutfs_btree_update(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &fr, sizeof(fr)); + /* freed blocks are in allocator, we *have* to update fr */ + BUG_ON(ret < 0); + + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + commit = false; + if (ret < 0) + break; + } + + if (commit) { + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + } + + if (ret < 0) { + scoutfs_err(sb, "server error freeing merged btree blocks: %d", + ret); + stop_server(server); + } + + /* not re-arming, regularly queued by the server during merging */ +} + +/* + * This will return ENOENT to the client if there is no work to do. + */ +static int server_get_log_merge(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + DECLARE_SERVER_INFO(sb, server); + u64 rid = scoutfs_net_client_rid(conn); + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + struct scoutfs_log_merge_range remain; + struct scoutfs_log_merge_request req; + struct scoutfs_key par_start; + struct scoutfs_key par_end; + struct scoutfs_key next_key; + struct scoutfs_key key; + bool ins_rng; + bool del_remain; + bool del_req; + bool upd_stat; + bool no_ranges; + bool no_next; + int ret; + int err; + + if (arg_len != 0) + return -EINVAL; + + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + +restart: + memset(&req, 0, sizeof(req)); + ins_rng = false; + del_remain = false; + del_req = false; + upd_stat = false; + + /* get the status item, maybe creating a new one */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret == -ENOENT) + ret = start_log_merge(sb, super, &stat); + if (ret < 0) + goto out; + + trace_scoutfs_get_log_merge_status(sb, rid, &stat.next_range_key, + le64_to_cpu(stat.nr_requests), + le64_to_cpu(stat.nr_complete), + le64_to_cpu(stat.last_seq), + le64_to_cpu(stat.seq)); + + /* find the next range, always checking for splicing */ + for (;;) { + key = stat.next_range_key; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = next_log_merge_item_key(sb, &super->log_merge, SCOUTFS_LOG_MERGE_RANGE_ZONE, + &key, &rng, sizeof(rng)); + if (ret < 0 && ret != -ENOENT) + goto out; + + /* maybe splice now that we know if there's ranges */ + no_next = ret == -ENOENT; + no_ranges = scoutfs_key_is_zeros(&stat.next_range_key) && ret == -ENOENT; + if (le64_to_cpu(stat.nr_requests) == 0 && + (no_next || le64_to_cpu(stat.nr_complete) >= LOG_MERGE_SPLICE_BATCH)) { + ret = splice_log_merge_completions(sb, &stat, no_ranges); + if (ret < 0) + goto out; + /* splicing resets key and adds ranges, could finish status */ + goto restart; + } + + /* no ranges from next for requests, future attempts will create or splice */ + if (no_next) { + ret = -ENOENT; + goto out; + } + + /* see if we should back off after splicing might have deleted completions */ + if ((le64_to_cpu(stat.nr_requests) + + le64_to_cpu(stat.nr_complete)) >= LOG_MERGE_SPLICE_BATCH) { + ret = -ENOENT; + goto out; + } + + /* find the next logged item in the next range */ + ret = next_least_log_item(sb, &super->logs_root, + le64_to_cpu(stat.last_seq), + &rng.start, &rng.end, &next_key); + if (ret == 0) + break; + /* drop the range if it contained no logged items */ + if (ret == -ENOENT) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + } + if (ret < 0) + goto out; + } + + /* start to build the request that's saved and sent to the client */ + req.logs_root = super->logs_root; + req.last_seq = stat.last_seq; + req.rid = cpu_to_le64(rid); + req.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); + req.flags = 0; + if (super->fs_root.height > 2) + req.flags |= cpu_to_le64(SCOUTFS_LOG_MERGE_REQUEST_SUBTREE); + + /* find the fs_root parent block and its key range */ + ret = scoutfs_btree_get_parent(sb, &super->fs_root, &next_key, + &req.root) ?: + scoutfs_btree_parent_range(sb, &super->fs_root, &next_key, + &par_start, &par_end); + if (ret < 0) + goto out; + + /* start from next item, don't exceed parent key range */ + req.start = next_key; + req.end = rng.end; + if (scoutfs_key_compare(&par_end, &req.end) < 0) + req.end = par_end; + + /* delete the old range */ + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + ins_rng = true; + + /* add remaining range if we have to */ + if (scoutfs_key_compare(&rng.end, &req.end) > 0) { + remain.start = req.end; + scoutfs_key_inc(&remain.start); + remain.end = rng.end; + + key = remain.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &remain, sizeof(remain)); + if (ret < 0) + goto out; + del_remain = true; + } + + /* give the client an allocation pool to work with */ + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_fill_list(sb, &server->alloc, &server->wri, + &req.meta_avail, server->meta_avail, + SCOUTFS_SERVER_MERGE_FILL_LO, + SCOUTFS_SERVER_MERGE_FILL_TARGET); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* save the request that will be sent to the client */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(req.seq)); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &req, sizeof(req)); + if (ret < 0) + goto out; + del_req = true; + + trace_scoutfs_get_log_merge_request(sb, rid, &req.root, + &req.start, &req.end, + le64_to_cpu(req.last_seq), + le64_to_cpu(req.seq)); + + /* make sure next range avoids ranges for parent in use */ + stat.next_range_key = par_end; + if (!scoutfs_key_is_ones(&stat.next_range_key)) + scoutfs_key_inc(&stat.next_range_key); + + /* update the status requests count */ + le64_add_cpu(&stat.nr_requests, 1); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) + goto out; + upd_stat = true; + +out: + if (ret < 0) { + /* undo any our partial item changes */ + if (upd_stat) { + le64_add_cpu(&stat.nr_requests, -1ULL); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, + 0, 0); + err = scoutfs_btree_update(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + BUG_ON(err); /* inconsistent */ + } + + if (del_req) { + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, + rid, le64_to_cpu(req.seq)); + err = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + if (del_remain) { + key = remain.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_delete(sb, &server->alloc, + &server->wri, + &super->log_merge, &key); + BUG_ON(err); /* inconsistent */ + } + + if (ins_rng) { + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + err = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + BUG_ON(err); /* inconsistent */ + } + + /* reclaim allocation if we failed */ + mutex_lock(&server->alloc_mutex); + err = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_avail); + mutex_unlock(&server->alloc_mutex); + BUG_ON(err); /* inconsistent */ + } + + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + + return scoutfs_net_response(sb, conn, cmd, id, ret, &req, sizeof(req)); +} + +/* + * Commit the client's leg merge work. Typically we store the + * completion so that we can later splice it back into the fs root and + * reclaim its allocators later in a batch. If it failed we reclaim it + * immediately. + */ +static int server_commit_log_merge(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + DECLARE_SERVER_INFO(sb, server); + u64 rid = scoutfs_net_client_rid(conn); + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_log_merge_request orig_req; + struct scoutfs_log_merge_complete *comp; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_range rng; + struct scoutfs_key key; + int ret; + + scoutfs_key_set_zeros(&rng.end); + + if (arg_len != sizeof(struct scoutfs_log_merge_complete)) + return -EINVAL; + comp = arg; + + trace_scoutfs_get_log_merge_complete(sb, rid, &comp->root, + &comp->start, &comp->end, + &comp->remain, + le64_to_cpu(comp->seq), + le64_to_cpu(comp->flags)); + + scoutfs_server_hold_commit(sb); + mutex_lock(&server->logs_mutex); + + /* find the status of the current log merge */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret < 0) { + WARN_ON_ONCE(ret == -ENOENT); /* inconsistent */ + goto out; + } + + /* find the completion's original saved request */ + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_REQUEST_ZONE, + rid, le64_to_cpu(comp->seq), + &orig_req, sizeof(orig_req)); + if (WARN_ON_ONCE(ret == 0 && (comp->rid != orig_req.rid || + comp->seq != orig_req.seq))) + ret = -ENOENT; /* inconsistency */ + if (ret < 0) { + WARN_ON_ONCE(ret == -ENOENT); /* inconsistency */ + goto out; + } + + /* delete the original request item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(orig_req.seq)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + + if (le64_to_cpu(comp->flags) & SCOUTFS_LOG_MERGE_COMP_ERROR) { + /* restore the range and reclaim the allocator if it failed */ + rng.start = orig_req.start; + rng.end = orig_req.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &orig_req.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &orig_req.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + } else { + /* otherwise store the completion for later splicing */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_COMPLETE_ZONE, + le64_to_cpu(comp->seq), 0); + ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + comp, sizeof(*comp)); + if (ret < 0) + goto out; + + le64_add_cpu(&stat.nr_complete, 1ULL); + } + + /* and update the status counts */ + le64_add_cpu(&stat.nr_requests, -1ULL); + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + if (ret < 0) + goto out; + +out: + mutex_unlock(&server->logs_mutex); + ret = scoutfs_server_apply_commit(sb, ret); + BUG_ON(ret < 0); /* inconsistent */ + + return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); +} + /* The server is receiving an omap response from the client */ static int open_ino_map_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) @@ -1613,6 +2582,113 @@ static int cancel_srch_compact(struct super_block *sb, u64 rid) return ret; } +/* + * Clean up any log merge requests which have now been abandoned because + * their client was evicted. This is always called on eviction and + * there may have been no merge in progres or our client had no + * outstanding requests. For each pending request, we reclaim its + * allocators, delte its item, and update the status. + * + * The request we cancel might have been the last request which + * prevented batch processing, but we don't check that here. This is in + * the client eviction path and we want that to be as light and + * responsive as possible so we can get back up and running. The next + * client get_log_merge request will see that no more requests are + * outstanding. + * + * The caller holds a commit, but we're responsible for locking. + */ +static int cancel_log_merge(struct super_block *sb, u64 rid) +{ + DECLARE_SERVER_INFO(sb, server); + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_log_merge_status stat; + struct scoutfs_log_merge_request req; + struct scoutfs_log_merge_range rng; + struct scoutfs_key key; + bool update = false; + u64 seq; + int ret; + + mutex_lock(&server->logs_mutex); + + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0, + &stat, sizeof(stat)); + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + goto out; + } + + for (seq = 0; ; seq++) { + ret = next_log_merge_item(sb, &super->log_merge, + SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + seq, &req, sizeof(req)); + if (ret == 0 && le64_to_cpu(req.rid) != rid) + ret = -ENOENT; + if (ret < 0) { + if (ret == -ENOENT) + ret = 0; + break; + } + + seq = le64_to_cpu(req.seq); + + /* remove request item */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_REQUEST_ZONE, rid, + le64_to_cpu(req.seq)); + ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, + &super->log_merge, &key); + if (ret < 0) + goto out; + + /* restore range */ + rng.start = req.start; + rng.end = req.end; + + key = rng.start; + key.sk_zone = SCOUTFS_LOG_MERGE_RANGE_ZONE; + ret = scoutfs_btree_insert(sb, &server->alloc, + &server->wri, + &super->log_merge, &key, + &rng, sizeof(rng)); + if (ret < 0) + goto out; + + /* reclaim allocator */ + mutex_lock(&server->alloc_mutex); + ret = scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_avail) ?: + scoutfs_alloc_splice_list(sb, &server->alloc, + &server->wri, + server->other_freed, + &req.meta_freed); + mutex_unlock(&server->alloc_mutex); + if (ret < 0) + goto out; + + /* update count */ + le64_add_cpu(&stat.nr_requests, -1ULL); + update = true; + } + + if (update) { + /* and update the status counts */ + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + &stat, sizeof(stat)); + } +out: + mutex_unlock(&server->logs_mutex); + + BUG_ON(ret < 0); /* XXX inconsistent */ + return ret; +} + /* * Farewell processing is async to the request processing work. Shutdown * waits for request processing to finish and then tears down the connection. @@ -1758,8 +2834,9 @@ static int reclaim_rid(struct super_block *sb, u64 rid) /* delete mounted client last, recovery looks for it */ ret = scoutfs_lock_server_farewell(sb, rid) ?: remove_trans_seq(sb, rid) ?: - reclaim_log_trees(sb, rid) ?: + reclaim_open_log_tree(sb, rid) ?: cancel_srch_compact(sb, rid) ?: + cancel_log_merge(sb, rid) ?: scoutfs_omap_remove_rid(sb, rid) ?: delete_mounted_client(sb, rid); @@ -1995,6 +3072,8 @@ static scoutfs_net_request_t server_req_funcs[] = { [SCOUTFS_NET_CMD_LOCK] = server_lock, [SCOUTFS_NET_CMD_SRCH_GET_COMPACT] = server_srch_get_compact, [SCOUTFS_NET_CMD_SRCH_COMMIT_COMPACT] = server_srch_commit_compact, + [SCOUTFS_NET_CMD_GET_LOG_MERGE] = server_get_log_merge, + [SCOUTFS_NET_CMD_COMMIT_LOG_MERGE] = server_commit_log_merge, [SCOUTFS_NET_CMD_OPEN_INO_MAP] = server_open_ino_map, [SCOUTFS_NET_CMD_GET_VOLOPT] = server_get_volopt, [SCOUTFS_NET_CMD_SET_VOLOPT] = server_set_volopt, @@ -2367,6 +3446,8 @@ shutdown: scoutfs_net_shutdown(sb, conn); server->conn = NULL; + flush_work(&server->log_merge_free_work); + /* stop tracking recovery, cancel timer, flush any fencing */ scoutfs_recov_shutdown(sb); flush_work(&server->fence_pending_recov_work); @@ -2434,6 +3515,7 @@ void scoutfs_server_stop(struct super_block *sb) cancel_work_sync(&server->work); cancel_work_sync(&server->farewell_work); cancel_work_sync(&server->commit_work); + cancel_work_sync(&server->log_merge_free_work); } int scoutfs_server_setup(struct super_block *sb) @@ -2459,6 +3541,7 @@ int scoutfs_server_setup(struct super_block *sb) INIT_WORK(&server->farewell_work, farewell_worker); mutex_init(&server->alloc_mutex); mutex_init(&server->logs_mutex); + INIT_WORK(&server->log_merge_free_work, server_log_merge_free_work); mutex_init(&server->srch_mutex); mutex_init(&server->mounted_clients_mutex); seqcount_init(&server->roots_seqcount);