diff --git a/kmod/src/server.c b/kmod/src/server.c index 6367e350..35cf6162 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -2058,6 +2058,13 @@ out: * reset the next range key if there's still work to do. If the * operation is complete then we tear down the input log_trees items and * delete the status. + * + * Processing all the completions can take more than one transaction. + * We return -EINPROGRESS if we have to commit a transaction and the + * caller will apply the commit and immediate call back in so we can + * perform another commit. We need to be very careful to leave the + * status in a state where requests won't be issued at the wrong time + * (by forcing nr_completions to a batch while we delete them). */ static int splice_log_merge_completions(struct super_block *sb, struct scoutfs_log_merge_status *stat, @@ -2070,15 +2077,29 @@ static int splice_log_merge_completions(struct super_block *sb, struct scoutfs_log_merge_range rng; struct scoutfs_log_trees lt = {{{0,}}}; SCOUTFS_BTREE_ITEM_REF(iref); + bool upd_stat = true; + int einprogress = 0; struct scoutfs_key key; char *err_str = NULL; + u32 alloc_low; + u32 tmp; u64 seq; int ret; + int err; /* musn't rebalance fs tree parents while reqs rely on their key bounds */ if (WARN_ON_ONCE(le64_to_cpu(stat->nr_requests) > 0)) return -EIO; + /* + * Be overly conservative about how low the allocator can get + * before we commit. This gives us a lot of work to do in a + * commit while also allowing a pretty big smallest allocator to + * work with the theoretically unbounded alloc list splicing. + */ + scoutfs_alloc_meta_remaining(&server->alloc, &alloc_low, &tmp); + alloc_low = min(alloc_low, tmp) / 4; + /* * Splice in all the completed subtrees at the initial parent * blocks in the main fs_tree before rebalancing any of them. @@ -2100,6 +2121,22 @@ static int splice_log_merge_completions(struct super_block *sb, seq = le64_to_cpu(comp.seq); + /* + * Use having cleared the lists as an indication that + * we've already set the parents and don't need to dirty + * the btree blocks to do it all over again. This is + * safe because there is always an fs block that the + * merge dirties and frees into the meta_freed list. + */ + if (comp.meta_avail.ref.blkno == 0 && comp.meta_freed.ref.blkno == 0) + continue; + + if (scoutfs_alloc_meta_low(sb, &server->alloc, alloc_low)) { + einprogress = -EINPROGRESS; + ret = 0; + goto out; + } + ret = scoutfs_btree_set_parent(sb, &server->alloc, &server->wri, &super->fs_root, &comp.start, &comp.root); @@ -2134,6 +2171,14 @@ static int splice_log_merge_completions(struct super_block *sb, } } + /* + * Once we start rebalancing we force the number of completions + * to a batch so that requests won't be issued. Once we're done + * we clear the completion count and requests can flow again. + */ + if (le64_to_cpu(stat->nr_complete) < LOG_MERGE_SPLICE_BATCH) + stat->nr_complete = cpu_to_le64(LOG_MERGE_SPLICE_BATCH); + /* * Now with all the parent blocks spliced in, rebalance items * amongst parents that needed to split/join and delete the @@ -2155,6 +2200,12 @@ static int splice_log_merge_completions(struct super_block *sb, seq = le64_to_cpu(comp.seq); + if (scoutfs_alloc_meta_low(sb, &server->alloc, alloc_low)) { + einprogress = -EINPROGRESS; + ret = 0; + goto out; + } + /* balance when there was a remaining key range */ if (le64_to_cpu(comp.flags) & SCOUTFS_LOG_MERGE_COMP_REMAIN) { ret = scoutfs_btree_rebalance(sb, &server->alloc, @@ -2194,18 +2245,11 @@ static int splice_log_merge_completions(struct super_block *sb, } } - /* update the status once all completes are processed */ - scoutfs_key_set_zeros(&stat->next_range_key); - stat->nr_complete = 0; - /* update counts and done if there's still ranges to process */ if (!no_ranges) { - init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); - ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, - &super->log_merge, &key, - stat, sizeof(*stat)); - if (ret < 0) - err_str = "update status"; + scoutfs_key_set_zeros(&stat->next_range_key); + stat->nr_complete = 0; + ret = 0; goto out; } @@ -2241,6 +2285,12 @@ static int splice_log_merge_completions(struct super_block *sb, (le64_to_cpu(lt.finalize_seq) < le64_to_cpu(stat->seq)))) continue; + if (scoutfs_alloc_meta_low(sb, &server->alloc, alloc_low)) { + einprogress = -EINPROGRESS; + ret = 0; + goto out; + } + fr.root = lt.item_root; scoutfs_key_set_zeros(&fr.key); fr.seq = cpu_to_le64(scoutfs_server_next_seq(sb)); @@ -2274,9 +2324,10 @@ static int splice_log_merge_completions(struct super_block *sb, } le64_add_cpu(&super->inode_count, le64_to_cpu(lt.inode_count_delta)); - } + /* everything's done, remove the merge operation */ + upd_stat = false; init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->log_merge, &key); @@ -2285,12 +2336,23 @@ static int splice_log_merge_completions(struct super_block *sb, else err_str = "deleting merge status item"; out: + if (upd_stat) { + init_log_merge_key(&key, SCOUTFS_LOG_MERGE_STATUS_ZONE, 0, 0); + err = scoutfs_btree_update(sb, &server->alloc, &server->wri, + &super->log_merge, &key, + stat, sizeof(struct scoutfs_log_merge_status)); + if (err && !ret) { + err_str = "updating merge status item"; + ret = err; + } + } + if (ret < 0) scoutfs_err(sb, "server error %d splicing log merge completion: %s", ret, err_str); BUG_ON(ret); /* inconsistent */ - return ret; + return ret ?: einprogress; } /* @@ -2465,6 +2527,12 @@ static void server_log_merge_free_work(struct work_struct *work) } /* + * Clients regularly ask if there is log merge work to do. We process + * completions inline before responding so that we don't create large + * delays between completion processing and the next request. We don't + * mind if the client get_log_merge request sees high latency, the + * blocked caller has nothing else to do. + * * This will return ENOENT to the client if there is no work to do. */ static int server_get_log_merge(struct super_block *sb, @@ -2532,14 +2600,22 @@ restart: goto out; } - /* maybe splice now that we know if there's ranges */ + /* splice if we have a batch or ran out of ranges */ no_next = ret == -ENOENT; no_ranges = scoutfs_key_is_zeros(&stat.next_range_key) && ret == -ENOENT; if (le64_to_cpu(stat.nr_requests) == 0 && (no_next || le64_to_cpu(stat.nr_complete) >= LOG_MERGE_SPLICE_BATCH)) { ret = splice_log_merge_completions(sb, &stat, no_ranges); - if (ret < 0) + if (ret == -EINPROGRESS) { + mutex_unlock(&server->logs_mutex); + ret = server_apply_commit(sb, &hold, 0); + if (ret < 0) + goto respond; + server_hold_commit(sb, &hold); + mutex_lock(&server->logs_mutex); + } else if (ret < 0) { goto out; + } /* splicing resets key and adds ranges, could finish status */ goto restart; } @@ -2741,6 +2817,7 @@ out: mutex_unlock(&server->logs_mutex); ret = server_apply_commit(sb, &hold, ret); +respond: return scoutfs_net_response(sb, conn, cmd, id, ret, &req, sizeof(req)); }