From 91acf92666cff8b7f5e26284510f3f657c161ee5 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 18 Dec 2020 11:48:44 -0800 Subject: [PATCH] Add client btree merge processing Add the client work which is regularly scheduled to ask the server for log merging work to do. The relatively simple client work gets a request from the server, finds the log roots to merge given the reqeust seq, performs the merge with a btree call and callbacks, and commits the result to the server. Signed-off-by: Zach Brown --- kmod/src/client.c | 20 ++++++ kmod/src/client.h | 4 ++ kmod/src/forest.c | 169 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 192 insertions(+), 1 deletion(-) diff --git a/kmod/src/client.c b/kmod/src/client.c index 7a4b4322..68fe4736 100644 --- a/kmod/src/client.c +++ b/kmod/src/client.c @@ -217,6 +217,26 @@ int scoutfs_client_srch_commit_compact(struct super_block *sb, res, sizeof(*res), NULL, 0); } +int scoutfs_client_get_log_merge(struct super_block *sb, + struct scoutfs_log_merge_request *req) +{ + struct client_info *client = SCOUTFS_SB(sb)->client_info; + + return scoutfs_net_sync_request(sb, client->conn, + SCOUTFS_NET_CMD_GET_LOG_MERGE, + NULL, 0, req, sizeof(*req)); +} + +int scoutfs_client_commit_log_merge(struct super_block *sb, + struct scoutfs_log_merge_complete *comp) +{ + struct client_info *client = SCOUTFS_SB(sb)->client_info; + + return scoutfs_net_sync_request(sb, client->conn, + SCOUTFS_NET_CMD_COMMIT_LOG_MERGE, + comp, sizeof(*comp), NULL, 0); +} + int scoutfs_client_send_omap_response(struct super_block *sb, u64 id, struct scoutfs_open_ino_map *map) { diff --git a/kmod/src/client.h b/kmod/src/client.h index f8866abd..1cbcbc1d 100644 --- a/kmod/src/client.h +++ b/kmod/src/client.h @@ -22,6 +22,10 @@ int scoutfs_client_srch_get_compact(struct super_block *sb, struct scoutfs_srch_compact *sc); int scoutfs_client_srch_commit_compact(struct super_block *sb, struct scoutfs_srch_compact *res); +int scoutfs_client_get_log_merge(struct super_block *sb, + struct scoutfs_log_merge_request *req); +int scoutfs_client_commit_log_merge(struct super_block *sb, + struct scoutfs_log_merge_complete *comp); int scoutfs_client_send_omap_response(struct super_block *sb, u64 id, struct scoutfs_open_ino_map *map); int scoutfs_client_open_ino_map(struct super_block *sb, u64 group_nr, diff --git a/kmod/src/forest.c b/kmod/src/forest.c index f88ac5e2..37be80a0 100644 --- a/kmod/src/forest.c +++ b/kmod/src/forest.c @@ -52,6 +52,8 @@ */ struct forest_info { + struct super_block *sb; + struct mutex mutex; struct scoutfs_alloc *alloc; struct scoutfs_block_writer *wri; @@ -60,6 +62,9 @@ struct forest_info { struct mutex srch_mutex; struct scoutfs_srch_file srch_file; struct scoutfs_block *srch_bl; + + struct workqueue_struct *workq; + struct delayed_work log_merge_dwork; }; #define DECLARE_FOREST_INFO(sb, name) \ @@ -572,6 +577,149 @@ void scoutfs_forest_get_btrees(struct super_block *sb, <->bloom_ref); } +/* + * Compare input items to merge by their log item value seq when their + * keys match. + */ +static int merge_cmp(void *a_val, int a_val_len, void *b_val, int b_val_len) +{ + struct scoutfs_log_item_value *a = a_val; + struct scoutfs_log_item_value *b = b_val; + + /* sort merge item by seq */ + return scoutfs_cmp(le64_to_cpu(a->seq), le64_to_cpu(b->seq)); +} + +static bool merge_is_del(void *val, int val_len) +{ + struct scoutfs_log_item_value *liv = val; + + return !!(liv->flags & SCOUTFS_LOG_ITEM_FLAG_DELETION); +} + +#define LOG_MERGE_DELAY_MS (5 * MSEC_PER_SEC) + +/* + * Regularly try to get a log merge request from the server. If we get + * a request we walk the log_trees items to find input trees and pass + * them to btree_merge. All of our work is done in dirty blocks + * allocated from available free blocks that the server gave us. If we + * hit an error then we drop our dirty blocks without writing them and + * send an error flag to the server so they can reclaim our allocators + * and ignore the rest of our work. + */ +static void scoutfs_forest_log_merge_worker(struct work_struct *work) +{ + struct forest_info *finf = container_of(work, struct forest_info, + log_merge_dwork.work); + struct super_block *sb = finf->sb; + struct scoutfs_btree_root_head *rhead = NULL; + struct scoutfs_btree_root_head *tmp; + struct scoutfs_log_merge_complete comp; + struct scoutfs_log_merge_request req; + struct scoutfs_log_trees *lt; + struct scoutfs_block_writer wri; + struct scoutfs_alloc alloc; + SCOUTFS_BTREE_ITEM_REF(iref); + struct scoutfs_key next; + struct scoutfs_key key; + unsigned long delay; + LIST_HEAD(inputs); + int ret; + + ret = scoutfs_client_get_log_merge(sb, &req); + if (ret < 0) + goto resched; + + comp.root = req.root; + comp.start = req.start; + comp.end = req.end; + comp.remain = req.end; + comp.rid = req.rid; + comp.seq = req.seq; + comp.flags = 0; + + scoutfs_alloc_init(&alloc, &req.meta_avail, &req.meta_freed); + scoutfs_block_writer_init(sb, &wri); + + /* find finalized input log trees up to last_seq */ + for (scoutfs_key_init_log_trees(&key, 0, 0); ; scoutfs_key_inc(&key)) { + + if (!rhead) { + rhead = kmalloc(sizeof(*rhead), GFP_NOFS); + if (!rhead) { + ret = -ENOMEM; + goto out; + } + } + + ret = scoutfs_btree_next(sb, &req.logs_root, &key, &iref); + if (ret == 0) { + if (iref.val_len == sizeof(*lt)) { + key = *iref.key; + lt = iref.val; + if ((le64_to_cpu(lt->flags) & + SCOUTFS_LOG_TREES_FINALIZED) && + (le64_to_cpu(lt->max_item_seq) <= + le64_to_cpu(req.last_seq))) { + rhead->root = lt->item_root; + list_add_tail(&rhead->head, &inputs); + rhead = NULL; + } + } else { + ret = -EIO; + } + scoutfs_btree_put_iref(&iref); + } + if (ret < 0) { + if (ret == -ENOENT) { + ret = 0; + break; + } + goto out; + } + } + + /* shouldn't be possible, but it's harmless */ + if (list_empty(&inputs)) { + ret = 0; + goto out; + } + + ret = scoutfs_btree_merge(sb, &alloc, &wri, &req.start, &req.end, + &next, &comp.root, &inputs, merge_cmp, + merge_is_del, + !!(req.flags & cpu_to_le64(SCOUTFS_LOG_MERGE_REQUEST_SUBTREE)), + sizeof(struct scoutfs_log_item_value), + SCOUTFS_LOG_MERGE_DIRTY_BYTE_LIMIT, 10); + if (ret == -ERANGE) { + comp.remain = next; + le64_add_cpu(&comp.flags, SCOUTFS_LOG_MERGE_COMP_REMAIN); + ret = 0; + } + +out: + scoutfs_alloc_prepare_commit(sb, &alloc, &wri); + if (ret == 0) + ret = scoutfs_block_writer_write(sb, &wri); + scoutfs_block_writer_forget_all(sb, &wri); + + comp.meta_avail = alloc.avail; + comp.meta_freed = alloc.freed; + if (ret < 0) + le64_add_cpu(&comp.flags, SCOUTFS_LOG_MERGE_COMP_ERROR); + + ret = scoutfs_client_commit_log_merge(sb, &comp); + + kfree(rhead); + list_for_each_entry_safe(rhead, tmp, &inputs, head) + kfree(rhead); + +resched: + delay = ret == 0 ? 0 : msecs_to_jiffies(LOG_MERGE_DELAY_MS); + queue_delayed_work(finf->workq, &finf->log_merge_dwork, delay); +} + int scoutfs_forest_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -585,10 +733,23 @@ int scoutfs_forest_setup(struct super_block *sb) } /* the finf fields will be setup as we open a transaction */ + finf->sb = sb; mutex_init(&finf->mutex); mutex_init(&finf->srch_mutex); - + INIT_DELAYED_WORK(&finf->log_merge_dwork, + scoutfs_forest_log_merge_worker); sbi->forest_info = finf; + + finf->workq = alloc_workqueue("scoutfs_log_merge", WQ_NON_REENTRANT | + WQ_UNBOUND | WQ_HIGHPRI, 0); + if (!finf->workq) { + ret = -ENOMEM; + goto out; + } + + queue_delayed_work(finf->workq, &finf->log_merge_dwork, + msecs_to_jiffies(LOG_MERGE_DELAY_MS)); + ret = 0; out: if (ret) @@ -604,6 +765,12 @@ void scoutfs_forest_destroy(struct super_block *sb) if (finf) { scoutfs_block_put(sb, finf->srch_bl); + + if (finf->workq) { + cancel_delayed_work_sync(&finf->log_merge_dwork); + destroy_workqueue(finf->workq); + } + kfree(finf); sbi->forest_info = NULL; }