diff --git a/kmod/src/client.c b/kmod/src/client.c index 39d98afb..55def453 100644 --- a/kmod/src/client.c +++ b/kmod/src/client.c @@ -357,6 +357,63 @@ out: } } +/* + * Perform a compaction in the client as requested by the server. The + * server has protected the input segments and allocated the output + * segnos for us. This executes in work queued by the client's net + * connection. It only reads and write segments. The server will + * update the manifest and allocators while processing the response. An + * error response includes the compaction id so that the server can + * clean it up. + * + * If we get duplicate requests across a reconnected socket we can have + * two workers performing the same compaction simultaneously. This + * isn't particularly efficient but it's rare and won't corrupt the + * output. Our response can be lost if the socket is shutdown while + * it's in flight, the server deals with this. + */ +static int client_compact(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + struct scoutfs_net_compact_response *resp = NULL; + struct scoutfs_net_compact_request *req; + int ret; + + if (arg_len != sizeof(struct scoutfs_net_compact_request)) { + ret = -EINVAL; + goto out; + } + req = arg; + + trace_scoutfs_client_compact_start(sb, le64_to_cpu(req->id), + req->last_level, req->flags); + + resp = kzalloc(sizeof(struct scoutfs_net_compact_response), GFP_NOFS); + if (!resp) { + ret = -ENOMEM; + } else { + resp->id = req->id; + ret = scoutfs_compact(sb, req, resp); + } + + trace_scoutfs_client_compact_stop(sb, le64_to_cpu(req->id), ret); + + if (ret < 0) + ret = scoutfs_net_response(sb, conn, cmd, id, ret, + &req->id, sizeof(req->id)); + else + ret = scoutfs_net_response(sb, conn, cmd, id, 0, + resp, sizeof(*resp)); + kfree(resp); +out: + return ret; +} + +static scoutfs_net_request_t client_req_funcs[] = { + [SCOUTFS_NET_CMD_COMPACT] = client_compact, +}; + /* * Called when either a connect attempt or established connection times * out and fails. @@ -405,9 +462,8 @@ int scoutfs_client_setup(struct super_block *sb) INIT_DELAYED_WORK(&client->connect_dwork, scoutfs_client_connect_worker); - /* client doesn't process any incoming requests yet */ client->conn = scoutfs_net_alloc_conn(sb, NULL, client_notify_down, 0, - NULL, "client"); + client_req_funcs, "client"); if (!client->conn) { ret = -ENOMEM; goto out; diff --git a/kmod/src/compact.c b/kmod/src/compact.c index b18b11d6..cfca5fd8 100644 --- a/kmod/src/compact.c +++ b/kmod/src/compact.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "super.h" #include "format.h" @@ -30,10 +31,6 @@ * segments in each level of the lsm tree and is what merges duplicate * and deletion keys. * - * When the manifest is modified in a way that requires compaction it - * kicks the compaction thread. The compaction thread calls into the - * manifest to find the segments that need to be compaction. - * * The compaction operation itself always involves a single "upper" * segment at a given level and a limited number of "lower" segments at * the next higher level whose key range intersects with the upper @@ -42,23 +39,10 @@ * Compaction proceeds by iterating over the items in the upper segment * and items in each of the lower segments in sort order. The items * from the two input segments are copied into new output segments in - * sorted order. Item space is reclaimed as duplicate or deletion items - * are removed. - * - * Once the compaction is completed the manifest is updated to remove - * the input segments and add the output segments. Here segment space - * is reclaimed when the input items fit in fewer output segments. + * sorted order. Space is reclaimed as duplicate or deletion items are + * removed and fewer segments are written than were read. */ -struct compact_info { - struct super_block *sb; - struct workqueue_struct *workq; - struct work_struct work; -}; - -#define DECLARE_COMPACT_INFO(sb, name) \ - struct compact_info *name = SCOUTFS_SB(sb)->compact_info - struct compact_seg { struct list_head entry; @@ -72,16 +56,12 @@ struct compact_seg { bool part_of_move; }; -/* - * A compaction request. It's filled up in scoutfs_compact_add() as - * the manifest is wlaked and it finds segments involved in the compaction. - */ struct compact_cursor { struct list_head csegs; /* buffer holds allocations and our returning them */ - u64 segnos[SCOUTFS_COMPACTION_MAX_UPDATE]; - unsigned nr_segnos; + u64 segnos[SCOUTFS_COMPACTION_MAX_OUTPUT]; + unsigned int nr_segnos; u8 lower_level; u8 last_level; @@ -371,6 +351,12 @@ static int compact_segments(struct super_block *sb, break; } + /* didn't get enough segnos */ + if (next_segno >= curs->nr_segnos) { + ret = -ENOSPC; + break; + } + cseg->segno = curs->segnos[next_segno]; curs->segnos[next_segno] = 0; next_segno++; @@ -435,153 +421,216 @@ static int compact_segments(struct super_block *sb, } /* - * Manifest walking is providing the details of the overall compaction - * operation. + * We want all the non-zero segnos sorted at the front of the array + * and the empty segnos all packed at the end. This is easily done by + * subtracting one from both then comparing as usual. All relations hold + * except that 0 becomes the greatest instead of the least. */ -void scoutfs_compact_describe(struct super_block *sb, void *data, - u8 upper_level, u8 last_level, bool sticky) +static int sort_cmp_segnos(const void *A, const void *B) { - struct compact_cursor *curs = data; + const u64 a = *(const u64 *)A - 1; + const u64 b = *(const u64 *)B - 1; - curs->lower_level = upper_level + 1; - curs->last_level = last_level; - curs->sticky = sticky; + return a < b ? -1 : a > b ? 1 : 0; } -/* - * Add a segment involved in the compaction operation. - * - * XXX Today we know that the caller is always adding only one upper segment - * and is then possibly adding all the lower overlapping segments. - */ -int scoutfs_compact_add(struct super_block *sb, void *data, - struct scoutfs_manifest_entry *ment) +static void sort_swap_segnos(void *A, void *B, int size) { - struct compact_cursor *curs = data; - struct compact_seg *cseg; - int ret; + u64 *a = A; + u64 *b = B; - cseg = alloc_cseg(sb, &ment->first, &ment->last); - if (!cseg) { - ret = -ENOMEM; + swap(*a, *b); +} + +static int verify_request(struct super_block *sb, + struct scoutfs_net_compact_request *req) +{ + int ret = -EINVAL; + int nr_segnos; + int nr_ents; + int i; + + /* no unknown flags */ + if (req->flags & ~SCOUTFS_NET_COMPACT_FLAG_STICKY) goto out; + + /* find the number of segments and entries */ + for (i = 0; i < ARRAY_SIZE(req->segnos); i++) { + if (req->segnos[i] == 0) + break; + } + nr_segnos = i; + + for (i = 0; i < ARRAY_SIZE(req->ents); i++) { + if (req->ents[i].segno == 0) + break; + } + nr_ents = i; + + /* must have at least an upper */ + if (nr_ents == 0) + goto out; + + sort(req->segnos, nr_segnos, sizeof(req->segnos[i]), + sort_cmp_segnos, sort_swap_segnos); + + /* segnos must be unique */ + for (i = 1; i < nr_segnos; i++) { + if (req->segnos[i] == req->segnos[i - 1]) + goto out; } - list_add_tail(&cseg->entry, &curs->csegs); + /* if we have a lower it must be under upper */ + if (nr_ents > 1 && (req->ents[1].level != req->ents[0].level + 1)) + goto out; - cseg->segno = ment->segno; - cseg->seq = ment->seq; - cseg->level = ment->level; + /* make sure lower ents are on the same level */ + for (i = 2; i < nr_ents; i++) { + if (req->ents[i].level != req->ents[i - 1].level) + goto out; + } - if (!curs->upper) - curs->upper = cseg; - else if (!curs->lower) - curs->lower = cseg; - if (curs->lower) - curs->last_lower = cseg; + for (i = 1; i < nr_ents; i++) { + /* lowers must overlap with upper */ + if (scoutfs_key_compare_ranges(&req->ents[0].first, + &req->ents[0].last, + &req->ents[i].first, + &req->ents[i].last) != 0) + goto out; + + /* lowers must be on the level below upper */ + if (req->ents[i].level != req->ents[0].level + 1) + goto out; + } + + /* last level must include lowest level */ + if (req->last_level < req->ents[nr_ents - 1].level) + goto out; + + for (i = 2; i < nr_ents; i++) { + /* lowers must be sorted by first key */ + if (scoutfs_key_compare(&req->ents[i].first, + &req->ents[i - 1].first) <= 0) + goto out; + + /* lowers must not overlap with each other */ + if (scoutfs_key_compare_ranges(&req->ents[i].first, + &req->ents[i].last, + &req->ents[i - 1].first, + &req->ents[i - 1].last) == 0) + goto out; + } ret = 0; out: + if (WARN_ON_ONCE(ret < 0)) { + scoutfs_inc_counter(sb, compact_invalid_request); + printk("id %llu last_level %u flags 0x%x\n", + le64_to_cpu(req->id), req->last_level, req->flags); + printk("segnos: "); + for (i = 0; i < ARRAY_SIZE(req->segnos); i++) + printk("%llu ", le64_to_cpu(req->segnos[i])); + printk("\n"); + printk("entries: "); + for (i = 0; i < ARRAY_SIZE(req->ents); i++) { + printk(" [%u] segno %llu seq %llu level %u first "SK_FMT" last "SK_FMT"\n", + i, le64_to_cpu(req->ents[i].segno), + le64_to_cpu(req->ents[i].seq), + req->ents[i].level, + SK_ARG(&req->ents[i].first), + SK_ARG(&req->ents[i].last)); + } + printk("\n"); + } + return ret; } /* - * Give the compaction cursor a segno to allocate from. - */ -void scoutfs_compact_add_segno(struct super_block *sb, void *data, u64 segno) -{ - struct compact_cursor *curs = data; - - curs->segnos[curs->nr_segnos++] = segno; -} - -/* - * Commit the result of a compaction based on the state of the cursor. - * The server caller stops the manifest from being written while we're - * making changes. We lock the manifest to atomically make our changes. + * Translate the compaction request into our native structs that we use + * to perform the compaction. The caller has verified that the request + * satisfies our constraints. * - * The erorr handling is sketchy here because calling the manifest from - * here is temporary. We should be sending a message to the server - * instead of calling the allocator and manifest. + * If we return an error the caller will clean up a partially prepared + * cursor. */ -int scoutfs_compact_commit(struct super_block *sb, void *c, void *r) +static int prepare_curs(struct super_block *sb, struct compact_cursor *curs, + struct scoutfs_net_compact_request *req) { struct scoutfs_manifest_entry ment; - struct compact_cursor *curs = c; - struct list_head *results = r; struct compact_seg *cseg; - int ret; + int ret = 0; int i; - /* free unused segnos that were allocated for the compaction */ - for (i = 0; i < curs->nr_segnos; i++) { - if (curs->segnos[i]) { - ret = scoutfs_server_free_segno(sb, curs->segnos[i]); - BUG_ON(ret); - } + curs->lower_level = req->ents[0].level + 1; + curs->last_level = req->last_level; + curs->sticky = !!(req->flags & SCOUTFS_NET_COMPACT_FLAG_STICKY); + + for (i = 0; i < ARRAY_SIZE(req->segnos); i++) { + if (req->segnos[i] == 0) + break; + curs->segnos[i] = le64_to_cpu(req->segnos[i]); } + curs->nr_segnos = i; - scoutfs_manifest_lock(sb); + for (i = 0; i < ARRAY_SIZE(req->ents); i++) { + if (req->ents[i].segno == 0) + break; - /* delete input segments, probably freeing their segnos */ - list_for_each_entry(cseg, &curs->csegs, entry) { - if (!cseg->part_of_move) { - ret = scoutfs_server_free_segno(sb, cseg->segno); - BUG_ON(ret); + scoutfs_init_ment_from_net(&ment, &req->ents[i]); + + cseg = alloc_cseg(sb, &ment.first, &ment.last); + if (!cseg) { + ret = -ENOMEM; + break; } - scoutfs_manifest_init_entry(&ment, cseg->level, 0, cseg->seq, - &cseg->first, NULL); - ret = scoutfs_manifest_del(sb, &ment); - BUG_ON(ret); + list_add_tail(&cseg->entry, &curs->csegs); + + cseg->segno = ment.segno; + cseg->seq = ment.seq; + cseg->level = ment.level; + + if (!curs->upper) + curs->upper = cseg; + else if (!curs->lower) + curs->lower = cseg; + if (curs->lower) + curs->last_lower = cseg; } - /* add output entries */ - list_for_each_entry(cseg, results, entry) { - /* XXX moved upper segments won't have read the segment :P */ - if (cseg->seg) - scoutfs_seg_init_ment(&ment, cseg->level, cseg->seg); - else - scoutfs_manifest_init_entry(&ment, cseg->level, - cseg->segno, cseg->seq, - &cseg->first, &cseg->last); - ret = scoutfs_manifest_add(sb, &ment); - BUG_ON(ret); - } - - scoutfs_manifest_unlock(sb); - - return 0; + return ret; } /* - * The compaction worker tries to make forward progress with compaction - * every time its kicked. It pretends to send a message requesting - * compaction parameters but in reality the net request function there - * is calling directly into the manifest and back into our compaction - * add routines. + * Perform a compaction by translating the incoming request into our + * working state, iterating over input segments and write output + * segments, then generating the response that describes the output + * segments. * - * We always try to clean up everything on errors. + * The server will either commit our response or cleanup the request + * if we return an error that the caller sends in response. */ -static void scoutfs_compact_func(struct work_struct *work) +int scoutfs_compact(struct super_block *sb, + struct scoutfs_net_compact_request *req, + struct scoutfs_net_compact_response *resp) { - struct compact_info *ci = container_of(work, struct compact_info, work); - struct super_block *sb = ci->sb; struct compact_cursor curs = {{NULL,}}; + struct scoutfs_manifest_entry ment; struct scoutfs_bio_completion comp; struct compact_seg *cseg; LIST_HEAD(results); int ret; int err; + int nr; INIT_LIST_HEAD(&curs.csegs); scoutfs_bio_init_comp(&comp); - ret = scoutfs_client_get_compaction(sb, (void *)&curs); - - /* short circuit no compaction work to do */ - if (ret == 0 && list_empty(&curs.csegs)) - return; + ret = verify_request(sb, req) ?: + prepare_curs(sb, &curs, req); + if (ret) + goto out; /* trace compaction ranges */ list_for_each_entry(cseg, &curs.csegs, entry) { @@ -590,84 +639,40 @@ static void scoutfs_compact_func(struct work_struct *work) &cseg->last); } - if (ret == 0 && !list_empty(&curs.csegs)) { - ret = compact_segments(sb, &curs, &comp, &results); + ret = compact_segments(sb, &curs, &comp, &results); - /* always wait for io completion */ - err = scoutfs_bio_wait_comp(sb, &comp); - if (!ret && err) - ret = err; - } - - /* don't update manifest on error, just free segnos */ - if (ret) { - list_for_each_entry(cseg, &results, entry) { - if (!cseg->part_of_move) - curs.segnos[curs.nr_segnos++] = cseg->segno; - } - free_cseg_list(sb, &curs.csegs); - free_cseg_list(sb, &results); - } - - err = scoutfs_client_finish_compaction(sb, &curs, &results); + /* always wait for io completion */ + err = scoutfs_bio_wait_comp(sb, &comp); if (!ret && err) ret = err; + if (ret) + goto out; + + /* fill entries for written output segments */ + nr = 0; + list_for_each_entry(cseg, &results, entry) { + /* XXX moved upper segments won't have read the segment :P */ + if (cseg->seg) + scoutfs_seg_init_ment(&ment, cseg->level, cseg->seg); + else + scoutfs_manifest_init_entry(&ment, cseg->level, + cseg->segno, cseg->seq, + &cseg->first, &cseg->last); + + trace_scoutfs_compact_output(sb, ment.level, ment.segno, + ment.seq, &ment.first, + &ment.last); + + scoutfs_init_ment_to_net(&resp->ents[nr++], &ment); + } + + ret = 0; +out: + if (ret == -ESTALE) + scoutfs_inc_counter(sb, compact_stale_error); free_cseg_list(sb, &curs.csegs); free_cseg_list(sb, &results); - if (ret == -ESTALE) - scoutfs_inc_counter(sb, compact_stale_error); - - WARN_ON_ONCE(ret && ret != -ESTALE); - trace_scoutfs_compact_func(sb, ret); -} - -void scoutfs_compact_kick(struct super_block *sb) -{ - DECLARE_COMPACT_INFO(sb, ci); - - queue_work(ci->workq, &ci->work); -} - -int scoutfs_compact_setup(struct super_block *sb) -{ - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct compact_info *ci; - - ci = kzalloc(sizeof(struct compact_info), GFP_KERNEL); - if (!ci) - return -ENOMEM; - - ci->sb = sb; - INIT_WORK(&ci->work, scoutfs_compact_func); - - ci->workq = alloc_workqueue("scoutfs_compact", 0, 1); - if (!ci->workq) { - kfree(ci); - return -ENOMEM; - } - - sbi->compact_info = ci; - - return 0; -} - -/* - * The system should be idle, there should not be any more manifest - * modification which would kick compaction. - */ -void scoutfs_compact_destroy(struct super_block *sb) -{ - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - DECLARE_COMPACT_INFO(sb, ci); - - if (ci) { - /* stop compaction from requeueing itself */ - cancel_work_sync(&ci->work); - destroy_workqueue(ci->workq); - sbi->compact_info = NULL; - - kfree(ci); - } + return ret; } diff --git a/kmod/src/compact.h b/kmod/src/compact.h index c163ce56..788cae2c 100644 --- a/kmod/src/compact.h +++ b/kmod/src/compact.h @@ -1,16 +1,8 @@ #ifndef _SCOUTFS_COMPACT_H_ #define _SCOUTFS_COMPACT_H_ -void scoutfs_compact_kick(struct super_block *sb); - -void scoutfs_compact_describe(struct super_block *sb, void *data, - u8 upper_level, u8 last_level, bool sticky); -int scoutfs_compact_add(struct super_block *sb, void *data, - struct scoutfs_manifest_entry *ment); -void scoutfs_compact_add_segno(struct super_block *sb, void *data, u64 segno); -int scoutfs_compact_commit(struct super_block *sb, void *c, void *r); - -int scoutfs_compact_setup(struct super_block *sb); -void scoutfs_compact_destroy(struct super_block *sb); +int scoutfs_compact(struct super_block *sb, + struct scoutfs_net_compact_request *req, + struct scoutfs_net_compact_response *resp); #endif diff --git a/kmod/src/counters.h b/kmod/src/counters.h index 3d5886c0..82bead6b 100644 --- a/kmod/src/counters.h +++ b/kmod/src/counters.h @@ -15,7 +15,9 @@ EXPAND_COUNTER(btree_read_error) \ EXPAND_COUNTER(btree_stale_read) \ EXPAND_COUNTER(btree_write_error) \ + EXPAND_COUNTER(compact_invalid_request) \ EXPAND_COUNTER(compact_operations) \ + EXPAND_COUNTER(compact_segment_busy) \ EXPAND_COUNTER(compact_segment_moved) \ EXPAND_COUNTER(compact_segment_read) \ EXPAND_COUNTER(compact_segment_write_bytes) \ diff --git a/kmod/src/format.h b/kmod/src/format.h index 4d91c250..d7736a38 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -369,6 +369,7 @@ struct scoutfs_super_block { struct scoutfs_btree_ring bring; __le64 next_seg_seq; __le64 next_node_id; + __le64 next_compact_id; struct scoutfs_btree_root alloc_root; struct scoutfs_manifest manifest; struct scoutfs_inet_addr server_addr; @@ -548,6 +549,7 @@ enum { SCOUTFS_NET_CMD_GET_LAST_SEQ, SCOUTFS_NET_CMD_GET_MANIFEST_ROOT, SCOUTFS_NET_CMD_STATFS, + SCOUTFS_NET_CMD_COMPACT, SCOUTFS_NET_CMD_UNKNOWN, }; @@ -623,15 +625,49 @@ struct scoutfs_net_extent_list { /* arbitrarily makes a nice ~1k extent list payload */ #define SCOUTFS_NET_EXTENT_LIST_MAX_NR 64 -/* XXX eventually we'll have net compaction and will need agents to agree */ - /* one upper segment and fanout lower segments */ -#define SCOUTFS_COMPACTION_MAX_INPUT (1 + SCOUTFS_MANIFEST_FANOUT) -/* sticky can add one, and so can item page alignment */ -#define SCOUTFS_COMPACTION_SLOP 2 -/* delete all inputs and insert all outputs (same goes for alloc|free segnos) */ -#define SCOUTFS_COMPACTION_MAX_UPDATE \ - (2 * (SCOUTFS_COMPACTION_MAX_INPUT + SCOUTFS_COMPACTION_SLOP)) +#define SCOUTFS_COMPACTION_MAX_INPUT (1 + SCOUTFS_MANIFEST_FANOUT) +/* sticky can split the input and item alignment padding can add a lower */ +#define SCOUTFS_COMPACTION_SEGNO_OVERHEAD 2 +#define SCOUTFS_COMPACTION_MAX_OUTPUT \ + (SCOUTFS_COMPACTION_MAX_INPUT + SCOUTFS_COMPACTION_SEGNO_OVERHEAD) + +/* + * A compact request is sent by the server to the client. It provides + * the input segments and enough allocated segnos to write the results. + * The id uniquely identifies this compaction request and is included in + * the response to clean up its allocated resources. + */ +struct scoutfs_net_compact_request { + __le64 id; + __u8 last_level; + __u8 flags; + __le64 segnos[SCOUTFS_COMPACTION_MAX_OUTPUT]; + struct scoutfs_net_manifest_entry ents[SCOUTFS_COMPACTION_MAX_INPUT]; +} __packed; + +/* + * A sticky compaction has more lower level segments that overlap with + * the end of the upper after the last lower level segment included in + * the compaction. Items left in the upper segment after the last lower + * need to be written to the upper level instead of the lower. The + * upper segment "sticks" in place instead of moving down to the lower + * level. + */ +#define SCOUTFS_NET_COMPACT_FLAG_STICKY (1 << 0) + +/* + * A compact response is sent by the client to the server. It describes + * the written output segments that need to be added to the manifest. + * The server compares the response to the request to free unused + * allocated segnos and input manifest entries. An empty response is + * valid and can happen if, say, the upper input segment completely + * deleted all the items in a single overlapping lower segment. + */ +struct scoutfs_net_compact_response { + __le64 id; + struct scoutfs_net_manifest_entry ents[SCOUTFS_COMPACTION_MAX_OUTPUT]; +} __packed; /* * Scoutfs file handle structure - this can be copied out to userspace diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c index 5ecd9f61..1fa6158a 100644 --- a/kmod/src/manifest.c +++ b/kmod/src/manifest.c @@ -29,6 +29,7 @@ #include "counters.h" #include "triggers.h" #include "client.h" +#include "spbm.h" #include "scoutfs_trace.h" /* @@ -47,6 +48,8 @@ struct manifest { /* calculated on mount, const thereafter */ u64 level_limits[SCOUTFS_MANIFEST_MAX_LEVEL + 1]; + u64 compacts_pending[SCOUTFS_MANIFEST_MAX_LEVEL + 1]; + struct scoutfs_spbm segno_busy; unsigned long flags; @@ -884,6 +887,121 @@ out: return ret; } +static bool level_should_compact(struct super_block *sb, int level) +{ + DECLARE_MANIFEST(sb, mani); + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + + BUG_ON(!rwsem_is_locked(&mani->rwsem)); + + return ((s64)le64_to_cpu(super->manifest.level_counts[level]) - + (s64)mani->compacts_pending[level]) > mani->level_limits[level]; +} + +int scoutfs_manifest_should_compact(struct super_block *sb) +{ + DECLARE_MANIFEST(sb, mani); + bool should = false; + int level; + + down_read(&mani->rwsem); + for (level = mani->nr_levels - 1; level >= 0; level--) { + if (level_should_compact(sb, level)) { + should = true; + break; + } + } + up_read(&mani->rwsem); + + return should; +} + +/* + * Record that a compaction operation is in flight. We mark the segnos + * involved so that we don't use them as inputs for other compactions + * and assume that the compaction will delete a segment from its upper + * level when deciding what level to compact. + */ +static int start_compact_request(struct super_block *sb, + struct scoutfs_net_compact_request *req) +{ + DECLARE_MANIFEST(sb, mani); + int level; + int ret = 0; + int i; + + BUG_ON(!rwsem_is_locked(&mani->rwsem)); + + for (i = 0; i < ARRAY_SIZE(req->ents); i++) { + if (req->ents[i].segno == 0) + break; + + ret = scoutfs_spbm_set(&mani->segno_busy, + le64_to_cpu(req->ents[i].segno)); + if (ret) { + while (i-- > 0) + scoutfs_spbm_clear(&mani->segno_busy, + le64_to_cpu(req->ents[i].segno)); + break; + } + } + + if (ret == 0) { + level = req->ents[0].level; + mani->compacts_pending[level]++; + } + + return ret; +} + +/* + * A compaction request has completed. No longer account for it in the + * level pending counts and stop tracking all its segments. + * + * This can be called in error paths with an empty zeroed request and it + * will do nothing. + */ +void scoutfs_manifest_compact_done(struct super_block *sb, + struct scoutfs_net_compact_request *req) +{ + DECLARE_MANIFEST(sb, mani); + int level; + int i; + + down_write(&mani->rwsem); + + for (i = 0; i < ARRAY_SIZE(req->ents); i++) { + if (req->ents[i].segno == 0) + break; + + scoutfs_spbm_clear(&mani->segno_busy, + le64_to_cpu(req->ents[i].segno)); + } + + if (i > 0) { + level = req->ents[0].level; + mani->compacts_pending[level]--; + } + + up_write(&mani->rwsem); +} + +static int add_entry_unless_busy(struct super_block *sb, + struct scoutfs_net_compact_request *req, + unsigned int ind, + struct scoutfs_manifest_entry *ment) +{ + DECLARE_MANIFEST(sb, mani); + + if (scoutfs_spbm_test(&mani->segno_busy, ment->segno)) { + scoutfs_inc_counter(sb, compact_segment_busy); + return -EAGAIN; + } + + scoutfs_init_ment_to_net(&req->ents[ind], ment); + return 0; +} + /* * Give the caller the segments that will be involved in the next * compaction. @@ -898,15 +1016,19 @@ out: * We add all the segments to the compaction caller's data and let it do * its thing. It'll allocate and free segments and update the manifest. * - * Returns the number of input segments or -errno. + * Returns: + * 0: no compactions were needed at the given level + * > 0: number of total imput segments in the compaction + * -EAGAIN: segments were already in a pending compaction + * -errno: fatal error * - * XXX this will get a lot more clever: - * - ensuring concurrent compactions don't overlap + * XXX this could be more clever: * - prioritize segments with deletion or incremental records * - prioritize partial segments * - maybe compact segments by age in a given level */ -int scoutfs_manifest_next_compact(struct super_block *sb, void *data) +static int next_compact_req(struct super_block *sb, int level, + struct scoutfs_net_compact_request *req) { DECLARE_MANIFEST(sb, mani); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -918,30 +1040,17 @@ int scoutfs_manifest_next_compact(struct super_block *sb, void *data) SCOUTFS_BTREE_ITEM_REF(iref); SCOUTFS_BTREE_ITEM_REF(over_iref); SCOUTFS_BTREE_ITEM_REF(prev); - struct scoutfs_key zeros; + static struct scoutfs_key zeros; bool wrapped; bool sticky; - int level; int ret; int nr = 0; int i; scoutfs_key_set_zeros(&zeros); + memset(req, 0, sizeof(*req)); - down_write(&mani->rwsem); - - for (level = mani->nr_levels - 1; level >= 0; level--) { - if (le64_to_cpu(super->manifest.level_counts[level]) > - mani->level_limits[level]) - break; - } - - trace_scoutfs_manifest_next_compact(sb, level); - - if (level < 0) { - ret = 0; - goto out; - } + BUG_ON(!rwsem_is_locked(&mani->rwsem)); /* fill ment and ret == 0 if we find an entry at the level */ if (level == 0) { @@ -1003,9 +1112,9 @@ again: } /* add the upper input segment */ - ret = scoutfs_compact_add(sb, data, &ment); + ret = add_entry_unless_busy(sb, req, nr, &ment); if (ret) - goto out; + goto skip; nr++; /* and add a fanout's worth of lower overlapping segments */ @@ -1029,7 +1138,7 @@ again: break; } - ret = scoutfs_compact_add(sb, data, &over); + ret = add_entry_unless_busy(sb, req, nr, &over); if (ret) goto out; nr++; @@ -1042,16 +1151,20 @@ again: if (ret < 0 && ret != -ENOENT) goto out; - scoutfs_compact_describe(sb, data, level, mani->nr_levels - 1, sticky); + req->last_level = mani->nr_levels - 1; + if (sticky) + req->flags |= SCOUTFS_NET_COMPACT_FLAG_STICKY; + ret = start_compact_request(sb, req); + if (ret) + goto out; + + ret = 0; +skip: /* record the next key to start from */ mani->compact_keys[level] = ment.last; scoutfs_key_inc(&mani->compact_keys[level]); - - ret = 0; out: - up_write(&mani->rwsem); - scoutfs_btree_put_iref(&iref); scoutfs_btree_put_iref(&over_iref); scoutfs_btree_put_iref(&prev); @@ -1059,6 +1172,81 @@ out: return ret ?: nr; } +/* + * Find the next segment to compact into its lower overlapping segments. + * Fill out the callers request describing all the segments involved in + * the operation. + * + * First we search for a level to compact. A level needs compaction if + * it has more segments than its limit. We search from the bottom up + * because segments are written at the top when there's space. By + * compacting from the bottom we pull new segments down until there's + * space. If we compacted from the top down then we could create an + * imbalanced top-heavy structure. + * + * At each level we find the segment from a cursor and try to compact it + * into its lower segments. Any of the segments involved could already + * be part of a pending compaction and need to be skipped. In that case + * we move to the next segment at the level. All the segments at the + * level could be busy so we detect when we skip to the first value we + * skipped to and move on. + * + * If we return a filled compact request then we've tracked it. We + * assume it will delete an upper segment and have marked all its segnos + * as busy so they won't be used by future compaction requests. The + * caller must call complete_done when the compact operation completes. + */ +int scoutfs_manifest_next_compact(struct super_block *sb, + struct scoutfs_net_compact_request *req) +{ + DECLARE_MANIFEST(sb, mani); + struct scoutfs_key key = {0,}; + bool first; + int level; + int ret = 0; + + memset(req, 0, sizeof(*req)); + + down_write(&mani->rwsem); + + for (level = mani->nr_levels - 1; level >= 0; level--) { + if (!level_should_compact(sb, level)) + continue; + + first = true; + + for (;;) { + ret = next_compact_req(sb, level, req); + if (ret > 0 || (ret < 0 && ret != -EAGAIN)) + goto out; + if (ret == 0) + break; + + /* remember first skip and keep going */ + if (first) { + first = false; + key = mani->compact_keys[level]; + continue; + } + + /* bail if we looped around */ + if (!scoutfs_key_compare(&key, + &mani->compact_keys[level])) { + ret = 0; + break; + } + } + /* continue to next level */ + } + +out: + up_write(&mani->rwsem); + + trace_scoutfs_manifest_next_compact(sb, level, ret); + + return ret; +} + int scoutfs_manifest_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -1071,6 +1259,7 @@ int scoutfs_manifest_setup(struct super_block *sb) return -ENOMEM; init_rwsem(&mani->rwsem); + scoutfs_spbm_init(&mani->segno_busy); for (i = 0; i < ARRAY_SIZE(mani->compact_keys); i++) scoutfs_key_set_zeros(&mani->compact_keys[i]); @@ -1101,6 +1290,7 @@ void scoutfs_manifest_destroy(struct super_block *sb) struct manifest *mani = sbi->manifest; if (mani) { + scoutfs_spbm_destroy(&mani->segno_busy); kfree(mani); sbi->manifest = NULL; } diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h index cd1a095d..b3ffdf54 100644 --- a/kmod/src/manifest.h +++ b/kmod/src/manifest.h @@ -38,7 +38,11 @@ int scoutfs_manifest_read_items(struct super_block *sb, int scoutfs_manifest_next_key(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_key *next_key); -int scoutfs_manifest_next_compact(struct super_block *sb, void *data); +int scoutfs_manifest_should_compact(struct super_block *sb); +int scoutfs_manifest_next_compact(struct super_block *sb, + struct scoutfs_net_compact_request *req); +void scoutfs_manifest_compact_done(struct super_block *sb, + struct scoutfs_net_compact_request *req); bool scoutfs_manifest_level0_full(struct super_block *sb); diff --git a/kmod/src/net.c b/kmod/src/net.c index e889574a..da97665e 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -348,7 +348,7 @@ static int submit_send(struct super_block *sb, WARN_ON_ONCE(flags & SCOUTFS_NET_FLAGS_UNKNOWN) || WARN_ON_ONCE(net_err >= SCOUTFS_NET_ERR_UNKNOWN) || WARN_ON_ONCE(data_len > SCOUTFS_NET_MAX_DATA_LEN) || - WARN_ON_ONCE(data_len && (!data || net_err)) || + WARN_ON_ONCE(data_len && data == NULL) || WARN_ON_ONCE(net_err && (!(flags & SCOUTFS_NET_FLAG_RESPONSE))) || WARN_ON_ONCE(id == 0 && (flags & SCOUTFS_NET_FLAG_RESPONSE))) return -EINVAL; @@ -430,12 +430,13 @@ static void saw_valid_greeting(struct scoutfs_net_connection *conn, u64 node_id) conn->valid_greeting = 1; conn->node_id = node_id; - if (conn->notify_up) - conn->notify_up(sb, conn, conn->info, node_id); list_splice_tail_init(&conn->resend_queue, &conn->send_queue); queue_work(conn->workq, &conn->send_work); spin_unlock(&conn->lock); + + if (conn->notify_up) + conn->notify_up(sb, conn, conn->info, node_id); } /* @@ -589,10 +590,6 @@ static bool invalid_message(struct scoutfs_net_header *nh) nh->error >= SCOUTFS_NET_ERR_UNKNOWN) return true; - /* errors can't have payloads */ - if (nh->data_len != 0 && nh->error != SCOUTFS_NET_ERR_NONE) - return true; - /* payloads have a limit */ if (le16_to_cpu(nh->data_len) > SCOUTFS_NET_MAX_DATA_LEN) return true; diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 962bfca7..132c94f1 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -998,21 +998,24 @@ TRACE_EVENT(scoutfs_xattr_set, ); TRACE_EVENT(scoutfs_manifest_next_compact, - TP_PROTO(struct super_block *sb, int level), + TP_PROTO(struct super_block *sb, int level, int ret), - TP_ARGS(sb, level), + TP_ARGS(sb, level, ret), TP_STRUCT__entry( __field(__u64, fsid) __field(int, level) + __field(int, ret) ), TP_fast_assign( __entry->fsid = FSID_ARG(sb); __entry->level = level; + __entry->ret = ret; ), - TP_printk(FSID_FMT" level %d", __entry->fsid, __entry->level) + TP_printk(FSID_FMT" level %d ret %d", __entry->fsid, __entry->level, + __entry->ret) ); TRACE_EVENT(scoutfs_advance_dirty_super, @@ -1069,22 +1072,127 @@ TRACE_EVENT(scoutfs_dir_add_next_linkref, __entry->found_dir_ino, __entry->name_len) ); -TRACE_EVENT(scoutfs_compact_func, - TP_PROTO(struct super_block *sb, int ret), +TRACE_EVENT(scoutfs_client_compact_start, + TP_PROTO(struct super_block *sb, u64 id, u8 last_level, u8 flags), - TP_ARGS(sb, ret), + TP_ARGS(sb, id, last_level, flags), TP_STRUCT__entry( __field(__u64, fsid) + __field(__u64, id) + __field(__u8, last_level) + __field(__u8, flags) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->id = id; + __entry->last_level = last_level; + __entry->flags = flags; + ), + + TP_printk("fsid "FSID_FMT" id %llu last_level %u flags 0x%x", + __entry->fsid, __entry->id, __entry->last_level, + __entry->flags) +); + +TRACE_EVENT(scoutfs_client_compact_stop, + TP_PROTO(struct super_block *sb, u64 id, int ret), + + TP_ARGS(sb, id, ret), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(__u64, id) __field(int, ret) ), TP_fast_assign( __entry->fsid = FSID_ARG(sb); + __entry->id = id; __entry->ret = ret; ), - TP_printk(FSID_FMT" ret %d", __entry->fsid, __entry->ret) + TP_printk("fsid "FSID_FMT" id %llu ret %d", + __entry->fsid, __entry->id, __entry->ret) +); + +TRACE_EVENT(scoutfs_server_compact_start, + TP_PROTO(struct super_block *sb, u64 id, u8 level, u64 node_id, + unsigned long client_nr, unsigned long server_nr, + unsigned long per_client), + + TP_ARGS(sb, id, level, node_id, client_nr, server_nr, per_client), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(__u64, id) + __field(__u8, level) + __field(__u64, node_id) + __field(unsigned long, client_nr) + __field(unsigned long, server_nr) + __field(unsigned long, per_client) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->id = id; + __entry->level = level; + __entry->node_id = node_id; + __entry->client_nr = client_nr; + __entry->server_nr = server_nr; + __entry->per_client = per_client; + ), + + TP_printk("fsid "FSID_FMT" id %llu level %u node_id %llu client_nr %lu server_nr %lu per_client %lu", + __entry->fsid, __entry->id, __entry->level, __entry->node_id, + __entry->client_nr, __entry->server_nr, __entry->per_client) +); + +TRACE_EVENT(scoutfs_server_compact_done, + TP_PROTO(struct super_block *sb, u64 id, u64 node_id, + unsigned long server_nr), + + TP_ARGS(sb, id, node_id, server_nr), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(__u64, id) + __field(__u64, node_id) + __field(unsigned long, server_nr) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->id = id; + __entry->node_id = node_id; + __entry->server_nr = server_nr; + ), + + TP_printk("fsid "FSID_FMT" id %llu node_id %llu server_nr %lu", + __entry->fsid, __entry->id, __entry->node_id, + __entry->server_nr) +); + +TRACE_EVENT(scoutfs_server_compact_response, + TP_PROTO(struct super_block *sb, u64 id, int error), + + TP_ARGS(sb, id, error), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(__u64, id) + __field(int, error) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->id = id; + __entry->error = error; + ), + + TP_printk("fsid "FSID_FMT" id %llu error %d", + __entry->fsid, __entry->id, __entry->error) ); TRACE_EVENT(scoutfs_write_begin, @@ -1277,6 +1385,12 @@ DEFINE_EVENT(scoutfs_manifest_class, scoutfs_compact_input, TP_ARGS(sb, level, segno, seq, first, last) ); +DEFINE_EVENT(scoutfs_manifest_class, scoutfs_compact_output, + TP_PROTO(struct super_block *sb, u8 level, u64 segno, u64 seq, + struct scoutfs_key *first, struct scoutfs_key *last), + TP_ARGS(sb, level, segno, seq, first, last) +); + DEFINE_EVENT(scoutfs_manifest_class, scoutfs_read_item_segment, TP_PROTO(struct super_block *sb, u8 level, u64 segno, u64 seq, struct scoutfs_key *first, struct scoutfs_key *last), @@ -1694,6 +1808,14 @@ DEFINE_EVENT(scoutfs_work_class, scoutfs_server_commit_work_exit, TP_PROTO(struct super_block *sb, u64 data, int ret), TP_ARGS(sb, data, ret) ); +DEFINE_EVENT(scoutfs_work_class, scoutfs_server_compact_work_enter, + TP_PROTO(struct super_block *sb, u64 data, int ret), + TP_ARGS(sb, data, ret) +); +DEFINE_EVENT(scoutfs_work_class, scoutfs_server_compact_work_exit, + TP_PROTO(struct super_block *sb, u64 data, int ret), + TP_ARGS(sb, data, ret) +); DEFINE_EVENT(scoutfs_work_class, scoutfs_net_proc_work_enter, TP_PROTO(struct super_block *sb, u64 data, int ret), TP_ARGS(sb, data, ret) @@ -2267,6 +2389,39 @@ DEFINE_EVENT(scoutfs_segno_class, scoutfs_free_segno, TP_PROTO(struct super_block *sb, u64 segno), TP_ARGS(sb, segno) ); +DEFINE_EVENT(scoutfs_segno_class, scoutfs_remove_segno, + TP_PROTO(struct super_block *sb, u64 segno), + TP_ARGS(sb, segno) +); + +DECLARE_EVENT_CLASS(scoutfs_server_client_count_class, + TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients), + + TP_ARGS(sb, node_id, nr_clients), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(__s64, node_id) + __field(unsigned long, nr_clients) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->node_id = node_id; + __entry->nr_clients = nr_clients; + ), + + TP_printk("fsid "FSID_FMT" node_id %llu nr_clients %lu", + __entry->fsid, __entry->node_id, __entry->nr_clients) +); +DEFINE_EVENT(scoutfs_server_client_count_class, scoutfs_server_client_up, + TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients), + TP_ARGS(sb, node_id, nr_clients) +); +DEFINE_EVENT(scoutfs_server_client_count_class, scoutfs_server_client_down, + TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients), + TP_ARGS(sb, node_id, nr_clients) +); #endif /* _TRACE_SCOUTFS_H */ diff --git a/kmod/src/seg.c b/kmod/src/seg.c index 3b4b33a0..49523b90 100644 --- a/kmod/src/seg.c +++ b/kmod/src/seg.c @@ -315,15 +315,6 @@ out: } -/* - * This just frees the segno for the given seg. It's gross but - * symmetrical with only being able to allocate segnos by allocating a - * seg. We'll probably have to do better. - */ -int scoutfs_seg_free_segno(struct super_block *sb, struct scoutfs_segment *seg) -{ - return scoutfs_server_free_segno(sb, seg->segno); -} /* * The bios submitted by this don't have page references themselves. If diff --git a/kmod/src/seg.h b/kmod/src/seg.h index 74f7f961..58c4a64a 100644 --- a/kmod/src/seg.h +++ b/kmod/src/seg.h @@ -34,7 +34,6 @@ void scoutfs_seg_put(struct scoutfs_segment *seg); int scoutfs_seg_alloc(struct super_block *sb, u64 segno, struct scoutfs_segment **seg_ret); -int scoutfs_seg_free_segno(struct super_block *sb, struct scoutfs_segment *seg); bool scoutfs_seg_fits_single(u32 nr_items, u32 val_bytes); bool scoutfs_seg_append_item(struct super_block *sb, struct scoutfs_segment *seg, struct scoutfs_key *key, struct kvec *val, diff --git a/kmod/src/server.c b/kmod/src/server.c index a91cf005..556229b4 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -34,6 +34,11 @@ #include "net.h" #include "endian_swap.h" +/* + * XXX pre commit: + * - comments + */ + /* * Every active mount can act as the server that listens on a net * connection and accepts connections from all the other mounts acting @@ -48,20 +53,19 @@ struct server_info { struct super_block *sb; spinlock_t lock; + wait_queue_head_t waitq; struct workqueue_struct *wq; struct delayed_work dwork; struct completion shutdown_comp; bool bind_warned; + struct scoutfs_net_connection *conn; /* request processing coordinates committing manifest and alloc */ struct rw_semaphore commit_rwsem; struct llist_head commit_waiters; struct work_struct commit_work; - /* adding new segments can have to wait for compaction */ - wait_queue_head_t compaction_waitq; - /* server remembers the stable manifest root for clients */ seqcount_t stable_seqcount; struct scoutfs_btree_root stable_manifest_root; @@ -75,6 +79,13 @@ struct server_info { struct list_head pending_frees; struct list_head clients; + unsigned long nr_clients; + + /* track compaction in flight */ + unsigned long compacts_per_client; + unsigned long nr_compacts; + struct list_head compacts; + struct work_struct compact_work; }; #define DECLARE_SERVER_INFO(sb, name) \ @@ -86,6 +97,7 @@ struct server_info { struct server_client_info { u64 node_id; struct list_head head; + unsigned long nr_compacts; }; struct commit_waiter { @@ -391,7 +403,7 @@ static int free_extent(struct super_block *sb, u64 start, u64 len) * This is called by the compaction code which is running in the server. * The server caller has held all the locks, etc. */ -int scoutfs_server_free_segno(struct super_block *sb, u64 segno) +static int free_segno(struct super_block *sb, u64 segno) { scoutfs_inc_counter(sb, server_free_segno); trace_scoutfs_free_segno(sb, segno); @@ -459,11 +471,51 @@ out: return ret; } +/* + * "allocating" a segno removes an unknown segment from the allocator + * and returns it, "removing" a segno removes a specific segno from the + * allocator. + */ +static int remove_segno(struct super_block *sb, u64 segno) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_extent ext; + int ret; + + trace_scoutfs_remove_segno(sb, segno); + + scoutfs_extent_init(&ext, SCOUTFS_FREE_EXTENT_BLKNO_TYPE, 0, + segno << SCOUTFS_SEGMENT_BLOCK_SHIFT, + SCOUTFS_SEGMENT_BLOCKS, 0, 0); + + down_write(&server->alloc_rwsem); + ret = scoutfs_extent_remove(sb, server_extent_io, &ext, NULL); + up_write(&server->alloc_rwsem); + return ret; +} + static void shutdown_server(struct server_info *server) { complete(&server->shutdown_comp); } +/* + * Queue compaction work if clients have capacity for processing + * requests and the manifest knows of levels with too many segments. + */ +static void try_queue_compact(struct server_info *server) +{ + struct super_block *sb = server->sb; + bool can_request; + + spin_lock(&server->lock); + can_request = server->nr_compacts < + (server->nr_clients * server->compacts_per_client); + spin_unlock(&server->lock); + if (can_request && scoutfs_manifest_should_compact(sb)) + queue_work(server->wq, &server->compact_work); +} + /* * This is called while still holding the rwsem that prevents commits so * that the caller can be sure to be woken by the next commit after they @@ -787,8 +839,7 @@ retry: scoutfs_manifest_unlock(sb); up_read(&server->commit_rwsem); /* XXX waits indefinitely? io errors? */ - wait_event(server->compaction_waitq, - !scoutfs_manifest_level0_full(sb)); + wait_event(server->waitq, !scoutfs_manifest_level0_full(sb)); goto retry; } @@ -804,7 +855,7 @@ retry: if (ret == 0) { ret = wait_for_commit(&cw); if (ret == 0) - scoutfs_compact_kick(sb); + try_queue_compact(server); } out: @@ -1044,97 +1095,597 @@ out: return ret; } +/* requests sent to clients are tracked so we can free resources */ +struct compact_request { + struct list_head head; + u64 node_id; + struct scoutfs_net_compact_request req; +}; + /* - * Eventually we're going to have messages that control compaction. - * Each client mount would have long-lived work that sends requests - * which are stuck in processing until there's work to do. They'd get - * their entries, perform the compaction, and send a reply. But we're - * not there yet. - * - * This is a short circuit that's called directly by a work function - * that's only queued on the server. It makes compaction work inside - * the commit consistency mechanics inside request processing and - * demonstrates the moving pieces that we'd need to cut up into a series - * of messages and replies. - * - * The compaction work caller cleans up everything on errors. + * Find a node that can process our compaction request. Return a + * node_id if we found a client and added the compaction to the client + * and server counts. Returns 0 if no suitable clients were found. */ -int scoutfs_client_get_compaction(struct super_block *sb, void *curs) +static u64 compact_request_start(struct super_block *sb, + struct compact_request *cr) { struct server_info *server = SCOUTFS_SB(sb)->server_info; - struct commit_waiter cw; - u64 segno; - int ret = 0; - int nr; - int i; + struct server_client_info *last; + struct server_client_info *sci; + u64 node_id = 0; - down_read(&server->commit_rwsem); + spin_lock(&server->lock); - nr = scoutfs_manifest_next_compact(sb, curs); - if (nr <= 0) { - up_read(&server->commit_rwsem); - return nr; - } + /* XXX no last_entry_or_null? :( */ + if (!list_empty(&server->clients)) + last = list_last_entry(&server->clients, + struct server_client_info, head); + else + last = NULL; - /* allow for expansion slop from sticky and alignment */ - for (i = 0; i < nr + SCOUTFS_COMPACTION_SLOP; i++) { - ret = alloc_segno(sb, &segno); - if (ret < 0) + while ((sci = list_first_entry_or_null(&server->clients, + struct server_client_info, + head)) != NULL) { + list_move_tail(&sci->head, &server->clients); + if (sci->nr_compacts < server->compacts_per_client) { + list_add(&cr->head, &server->compacts); + server->nr_compacts++; + sci->nr_compacts++; + node_id = sci->node_id; + cr->node_id = node_id; + break; + } + if (sci == last) break; - scoutfs_compact_add_segno(sb, curs, segno); } - if (ret == 0) - queue_commit_work(server, &cw); - up_read(&server->commit_rwsem); + trace_scoutfs_server_compact_start(sb, le64_to_cpu(cr->req.id), + cr->req.ents[0].level, node_id, + node_id ? sci->nr_compacts : 0, + server->nr_compacts, + server->compacts_per_client); - if (ret == 0) - ret = wait_for_commit(&cw); + spin_unlock(&server->lock); + + return node_id; +} + +/* + * Find a tracked compact request for the compaction id, remove it from + * the server and client counts, and return it to the caller. + */ +static struct compact_request *compact_request_done(struct super_block *sb, + u64 id) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct compact_request *ret = NULL; + struct server_client_info *sci; + struct compact_request *cr; + + spin_lock(&server->lock); + + list_for_each_entry(cr, &server->compacts, head) { + if (le64_to_cpu(cr->req.id) != id) + continue; + + list_for_each_entry(sci, &server->clients, head) { + if (sci->node_id == cr->node_id) { + sci->nr_compacts--; + break; + } + } + + server->nr_compacts--; + list_del_init(&cr->head); + ret = cr; + break; + } + + trace_scoutfs_server_compact_done(sb, id, ret ? ret->node_id : 0, + server->nr_compacts); + + spin_unlock(&server->lock); return ret; } /* - * This is a stub for recording the results of a compaction. We just - * call back into compaction to have it call the manifest and allocator - * updates. + * When a client disconnects we forget the compactions that they had + * in flight so that we have capacity to send compaction requests to the + * remaining clients. * - * In the future we'd encode the manifest and segnos in requests sent to - * the server who'd update the manifest and allocator in request - * processing. - * - * As we finish a compaction we wait level0 writers if it opened up - * space in level 0. + * XXX we do not free their allocated segnos because they could still be + * running and writing to those blocks. To do this safely we'd need + * full recovery procedures with fencing to ensure that they're not able + * to write to those blocks anymore. */ -int scoutfs_client_finish_compaction(struct super_block *sb, void *curs, - void *list) +static void forget_client_compacts(struct super_block *sb, + struct server_client_info *sci) { struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct compact_request *cr; + struct compact_request *pos; + LIST_HEAD(forget); + + spin_lock(&server->lock); + list_for_each_entry_safe(cr, pos, &server->compacts, head) { + if (cr->node_id == sci->node_id) { + sci->nr_compacts--; + server->nr_compacts--; + list_move(&cr->head, &forget); + } + } + spin_unlock(&server->lock); + + list_for_each_entry_safe(cr, pos, &forget, head) { + scoutfs_manifest_compact_done(sb, &cr->req); + list_del_init(&cr->head); + kfree(cr); + } +} + +static int segno_in_ents(__le64 segno, struct scoutfs_net_manifest_entry *ents, + unsigned int nr) +{ + int i; + + for (i = 0; i < nr; i++) { + if (ents[i].segno == 0) + break; + if (segno == ents[i].segno) + return 1; + } + + return 0; +} + +static int remove_segnos(struct super_block *sb, __le64 * __packed segnos, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup); + +/* + * Free segnos if they're not found in the unless entries. If this + * returns an error then we've cleaned up partial frees on error. This + * panics if it sees an error and can't cleanup on error. + * + * There are variants of this for lots of add/del, alloc/remove data + * structurs. + */ +static int free_segnos(struct super_block *sb, __le64 * __packed segnos, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup) + +{ + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (segnos[i] == 0) + break; + if (segno_in_ents(segnos[i], unless, nr_unless)) + continue; + + ret = free_segno(sb, le64_to_cpu(segnos[i])); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + remove_segnos(sb, segnos, i, unless, nr_unless, false); + break; + } + } + + return ret; +} + +static int alloc_segnos(struct super_block *sb, __le64 * __packed segnos, + unsigned int nr) + +{ + u64 segno; + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + ret = alloc_segno(sb, &segno); + if (ret < 0) { + free_segnos(sb, segnos, i, NULL, 0, false); + break; + } + segnos[i] = cpu_to_le64(segno); + } + + return ret; +} + +static int remove_segnos(struct super_block *sb, __le64 * __packed segnos, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup) + +{ + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (segnos[i] == 0) + break; + if (segno_in_ents(segnos[i], unless, nr_unless)) + continue; + + ret = remove_segno(sb, le64_to_cpu(segnos[i])); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + free_segnos(sb, segnos, i, unless, nr_unless, false); + break; + } + } + + return ret; +} + + +static int remove_entry_segnos(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup); + +static int free_entry_segnos(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup) +{ + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (ents[i].segno == 0) + break; + if (segno_in_ents(ents[i].segno, unless, nr_unless)) + continue; + + ret = free_segno(sb, le64_to_cpu(ents[i].segno)); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + remove_entry_segnos(sb, ents, i, unless, nr_unless, + false); + break; + } + } + + return ret; +} + +static int remove_entry_segnos(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, + struct scoutfs_net_manifest_entry *unless, + unsigned int nr_unless, bool cleanup) +{ + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (ents[i].segno == 0) + break; + if (segno_in_ents(ents[i].segno, unless, nr_unless)) + continue; + + ret = remove_segno(sb, le64_to_cpu(ents[i].segno)); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + free_entry_segnos(sb, ents, i, unless, nr_unless, + false); + break; + } + } + + return ret; +} + +static int del_manifest_entries(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, bool cleanup); + +static int add_manifest_entries(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, bool cleanup) +{ + struct scoutfs_manifest_entry ment; + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (ents[i].segno == 0) + break; + + scoutfs_init_ment_from_net(&ment, &ents[i]); + + ret = scoutfs_manifest_add(sb, &ment); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + del_manifest_entries(sb, ents, i, false); + break; + } + } + + return ret; +} + +static int del_manifest_entries(struct super_block *sb, + struct scoutfs_net_manifest_entry *ents, + unsigned int nr, bool cleanup) +{ + struct scoutfs_manifest_entry ment; + int ret = 0; + int i; + + for (i = 0; i < nr; i++) { + if (ents[i].segno == 0) + break; + + scoutfs_init_ment_from_net(&ment, &ents[i]); + + ret = scoutfs_manifest_del(sb, &ment); + BUG_ON(ret < 0 && !cleanup); + if (ret < 0) { + add_manifest_entries(sb, ents, i, false); + break; + } + } + + return ret; +} + +/* + * Process a received compaction response. This is called in concurrent + * processing work context so it's racing with other compaction + * responses and new compaction requests being built and sent. + * + * If the compaction failed then we only have to free the allocated + * output segnos sent in the request. + * + * If the compaction succeeded then we need to delete the input manifest + * entries, add any new output manifest entries, and free allocated + * segnos and input manifest segnos that aren't found in output segnos. + * + * And finally we always remove the compaction from the runtime client + * accounting + * + * As we finish a compaction we wake level0 writers if there's now space + * in level 0 for a new segment. + * + * Errors in processing are taken as an indication that this server is + * no longer able to do its job. We return hard errors which shut down + * the server in the hopes that another healthy server will start up. + * We may want to revisit this. + */ +static int compact_response(struct super_block *sb, + struct scoutfs_net_connection *conn, + void *resp, unsigned int resp_len, + int error, void *data) +{ + struct server_info *server = SCOUTFS_SB(sb)->server_info; + struct scoutfs_net_compact_response *cresp = NULL; + struct compact_request *cr = NULL; + bool level0_was_full = false; + bool add_ents = false; + bool del_ents = false; + bool rem_segnos = false; struct commit_waiter cw; - bool level0_was_full; + __le64 id; int ret; + if (error) { + /* an error response without an id is fatal */ + if (resp_len != sizeof(__le64)) { + ret = -EINVAL; + goto out; + } + + memcpy(&id, resp, resp_len); + + } else { + if (resp_len != sizeof(struct scoutfs_net_compact_response)) { + ret = -EINVAL; + goto out; + } + + cresp = resp; + id = cresp->id; + } + + trace_scoutfs_server_compact_response(sb, le64_to_cpu(id), error); + + /* XXX we only free tracked requests on responses, must still exist */ + cr = compact_request_done(sb, le64_to_cpu(id)); + if (WARN_ON_ONCE(cr == NULL)) { + ret = -ENOENT; + goto out; + } + down_read(&server->commit_rwsem); + scoutfs_manifest_lock(sb); level0_was_full = scoutfs_manifest_level0_full(sb); - ret = scoutfs_compact_commit(sb, curs, list); - if (ret == 0) { - queue_commit_work(server, &cw); - if (level0_was_full && !scoutfs_manifest_level0_full(sb)) - wake_up(&server->compaction_waitq); + if (error) { + ret = 0; + goto cleanup; } - up_read(&server->commit_rwsem); + /* delete old manifest entries */ + ret = del_manifest_entries(sb, cr->req.ents, ARRAY_SIZE(cr->req.ents), + true); + if (ret) + goto cleanup; + add_ents = true; + + /* add new manifest entries */ + ret = add_manifest_entries(sb, cresp->ents, ARRAY_SIZE(cresp->ents), + true); + if (ret) + goto cleanup; + del_ents = true; + + /* free allocated segnos not found in new entries */ + ret = free_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos), + cresp->ents, ARRAY_SIZE(cresp->ents), true); + if (ret) + goto cleanup; + rem_segnos = true; + + /* free input segnos not found in new entries */ + ret = free_entry_segnos(sb, cr->req.ents, ARRAY_SIZE(cr->req.ents), + cresp->ents, ARRAY_SIZE(cresp->ents), true); +cleanup: + /* cleanup partial commits on errors */ + if (ret < 0 && rem_segnos) + remove_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos), + cresp->ents, ARRAY_SIZE(cresp->ents), false); + if (ret < 0 && del_ents) + del_manifest_entries(sb, cresp->ents, ARRAY_SIZE(cresp->ents), + false); + if (ret < 0 && add_ents) + add_manifest_entries(sb, cr->req.ents, + ARRAY_SIZE(cr->req.ents), false); + + /* free all the allocated output segnos if compaction failed */ + if ((error || ret < 0) && cr != NULL) + free_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos), + NULL, 0, false); + + if (ret == 0 && level0_was_full && !scoutfs_manifest_level0_full(sb)) + wake_up(&server->waitq); if (ret == 0) + queue_commit_work(server, &cw); + scoutfs_manifest_unlock(sb); + up_read(&server->commit_rwsem); + + if (cr) { + scoutfs_manifest_compact_done(sb, &cr->req); + kfree(cr); + } + + if (ret == 0) { ret = wait_for_commit(&cw); + if (ret == 0) + try_queue_compact(server); + } - scoutfs_compact_kick(sb); - +out: return ret; } +/* + * The compaction worker executes as the manifest is updated and we see + * that a level has too many segments and clients aren't processing all + * their max number of compaction requests. Only one compaction worker + * executes. + * + * We have the manifest build us a compaction request, find a client to + * send it too, and record it for later completion processing. + * + * The manifest tracks pending compactions and won't use the same + * segments as inputs to multiple compactions. We track the number of + * compactions in flight to each client to keep them balanced. + */ +static void scoutfs_server_compact_worker(struct work_struct *work) +{ + struct server_info *server = container_of(work, struct server_info, + compact_work); + struct super_block *sb = server->sb; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_net_compact_request *req; + struct compact_request *cr; + struct commit_waiter cw; + int nr_segnos = 0; + u64 node_id; + __le64 id; + int ret; + + trace_scoutfs_server_compact_work_enter(sb, 0, 0); + + cr = kzalloc(sizeof(struct compact_request), GFP_NOFS); + if (!cr) { + ret = -ENOMEM; + goto out; + } + req = &cr->req; + + /* get the input manifest entries */ + ret = scoutfs_manifest_next_compact(sb, req); + if (ret <= 0) + goto out; + + nr_segnos = ret + SCOUTFS_COMPACTION_SEGNO_OVERHEAD; + + /* get the next id and allocate possible output segnos */ + down_read(&server->commit_rwsem); + + spin_lock(&server->lock); + id = super->next_compact_id; + le64_add_cpu(&super->next_compact_id, 1); + spin_unlock(&server->lock); + + ret = alloc_segnos(sb, req->segnos, nr_segnos); + if (ret == 0) + queue_commit_work(server, &cw); + up_read(&server->commit_rwsem); + if (ret == 0) + ret = wait_for_commit(&cw); + if (ret) + goto out; + + /* try to send to a node with capacity, they can disconnect */ +retry: + req->id = id; + node_id = compact_request_start(sb, cr); + if (node_id == 0) { + ret = 0; + goto out; + } + + /* response processing can complete compaction before this returns */ + ret = scoutfs_net_submit_request_node(sb, server->conn, node_id, + SCOUTFS_NET_CMD_COMPACT, + req, sizeof(*req), + compact_response, NULL, NULL); + if (ret < 0) { + cr = compact_request_done(sb, le64_to_cpu(id)); + BUG_ON(cr == NULL); /* must still be there, no node cleanup */ + } + if (ret == -ENOTCONN) + goto retry; + if (ret < 0) + goto out; + + /* cr is now owned by response processing */ + cr = NULL; + ret = 1; + +out: + if (ret <= 0 && cr != NULL) { + scoutfs_manifest_compact_done(sb, req); + + /* don't need to wait for commit when freeing in cleanup */ + down_read(&server->commit_rwsem); + free_segnos(sb, req->segnos, nr_segnos, NULL, 0, false); + up_read(&server->commit_rwsem); + + kfree(cr); + } + + if (ret > 0) + try_queue_compact(server); + + trace_scoutfs_server_compact_work_exit(sb, 0, ret); +} + /* * This relies on the caller having read the current super and advanced * its seq so that it's dirty. This will go away when we communicate @@ -1172,9 +1723,14 @@ static void server_notify_up(struct super_block *sb, if (node_id != 0) { sci->node_id = node_id; + sci->nr_compacts = 0; spin_lock(&server->lock); list_add_tail(&sci->head, &server->clients); + server->nr_clients++; + trace_scoutfs_server_client_up(sb, node_id, server->nr_clients); spin_unlock(&server->lock); + + try_queue_compact(server); } } @@ -1187,8 +1743,14 @@ static void server_notify_down(struct super_block *sb, if (node_id != 0) { spin_lock(&server->lock); - list_del(&sci->head); + list_del_init(&sci->head); + server->nr_clients--; + trace_scoutfs_server_client_down(sb, node_id, + server->nr_clients); spin_unlock(&server->lock); + + forget_client_compacts(sb, sci); + try_queue_compact(server); } else { shutdown_server(server); } @@ -1264,8 +1826,7 @@ static void scoutfs_server_worker(struct work_struct *work) /* start up the server subsystems before accepting */ ret = scoutfs_btree_setup(sb) ?: - scoutfs_manifest_setup(sb) ?: - scoutfs_compact_setup(sb); + scoutfs_manifest_setup(sb); if (ret) goto shutdown; @@ -1275,6 +1836,7 @@ static void scoutfs_server_worker(struct work_struct *work) scoutfs_info(sb, "server started on "SIN_FMT, SIN_ARG(&sin)); /* start accepting connections and processing work */ + server->conn = conn; scoutfs_net_listen(sb, conn); /* wait for listening down or umount, conn can still be live */ @@ -1285,13 +1847,12 @@ static void scoutfs_server_worker(struct work_struct *work) shutdown: /* wait for request processing */ scoutfs_net_shutdown(sb, conn); + /* drain compact work queued by responses */ + cancel_work_sync(&server->compact_work); /* wait for commit queued by request processing */ flush_work(&server->commit_work); + server->conn = NULL; - /* shut down all the server subsystems */ - scoutfs_compact_destroy(sb); - /* (wait for possible double commit work queued by compaction) */ - flush_work(&server->commit_work); destroy_pending_frees(sb); scoutfs_manifest_destroy(sb); scoutfs_btree_destroy(sb); @@ -1325,19 +1886,22 @@ int scoutfs_server_setup(struct super_block *sb) server->sb = sb; spin_lock_init(&server->lock); + init_waitqueue_head(&server->waitq); init_completion(&server->shutdown_comp); server->bind_warned = false; INIT_DELAYED_WORK(&server->dwork, scoutfs_server_worker); init_rwsem(&server->commit_rwsem); init_llist_head(&server->commit_waiters); INIT_WORK(&server->commit_work, scoutfs_server_commit_func); - init_waitqueue_head(&server->compaction_waitq); seqcount_init(&server->stable_seqcount); spin_lock_init(&server->seq_lock); INIT_LIST_HEAD(&server->pending_seqs); init_rwsem(&server->alloc_rwsem); INIT_LIST_HEAD(&server->pending_frees); INIT_LIST_HEAD(&server->clients); + server->compacts_per_client = 2; + INIT_LIST_HEAD(&server->compacts); + INIT_WORK(&server->compact_work, scoutfs_server_compact_worker); server->wq = alloc_workqueue("scoutfs_server", WQ_UNBOUND | WQ_NON_REENTRANT, 0); diff --git a/kmod/src/server.h b/kmod/src/server.h index 185f6c0b..365469b1 100644 --- a/kmod/src/server.h +++ b/kmod/src/server.h @@ -53,11 +53,6 @@ void scoutfs_init_ment_to_net(struct scoutfs_net_manifest_entry *net_ment, void scoutfs_init_ment_from_net(struct scoutfs_manifest_entry *ment, struct scoutfs_net_manifest_entry *net_ment); -int scoutfs_client_get_compaction(struct super_block *sb, void *curs); -int scoutfs_client_finish_compaction(struct super_block *sb, void *curs, - void *list); -int scoutfs_server_free_segno(struct super_block *sb, u64 segno); - int scoutfs_server_setup(struct super_block *sb); void scoutfs_server_destroy(struct super_block *sb);