diff --git a/kmod/src/format.h b/kmod/src/format.h index f1bc61b8..c097d0c4 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -153,7 +153,7 @@ enum { * isn't unused key space between blocks in a level. We might search * blocks when we didn't need to. */ -struct scoutfs_ring_manifest_entry { +struct scoutfs_manifest_entry { __le64 blkno; __le64 seq; __u8 level; @@ -163,10 +163,6 @@ struct scoutfs_ring_manifest_entry { #define SCOUTFS_MANIFESTS_PER_LEVEL 10 -struct scoutfs_ring_del_manifest { - __le64 blkno; -} __packed; - /* 2^22 * 10^13 > 2^64 */ #define SCOUTFS_MAX_LEVEL 13 diff --git a/kmod/src/ival.h b/kmod/src/ival.h index 4e693e61..6c944e0e 100644 --- a/kmod/src/ival.h +++ b/kmod/src/ival.h @@ -5,6 +5,11 @@ struct scoutfs_ival_tree { struct rb_root root; }; +static inline void scoutfs_init_ival_tree(struct scoutfs_ival_tree *tree) +{ + tree->root = RB_ROOT; +} + struct scoutfs_ival { struct rb_node node; struct scoutfs_key start; @@ -21,6 +26,16 @@ struct scoutfs_ival *scoutfs_next_ival(struct scoutfs_ival_tree *tree, struct scoutfs_key *end, struct scoutfs_ival *ival); +/* + * Walk all the intervals in postorder. This lets us free each ival we + * see without erasing and rebalancing. + */ +#define foreach_postorder_ival_safe(itree, ival, node, tmp) \ + for (node = rb_first_postorder(&(itree)->root); \ + ival = container_of(node, struct scoutfs_ival, node), \ + (node && (tmp = *node, 1)), node; \ + node = rb_next_postorder(&tmp)) + // struct rb_node { // long unsigned int __rb_parent_color; /* 0 8 */ // struct rb_node * rb_right; /* 8 8 */ diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c index 23054330..236bf911 100644 --- a/kmod/src/manifest.c +++ b/kmod/src/manifest.c @@ -13,366 +13,272 @@ #include #include #include +#include #include "super.h" #include "format.h" #include "manifest.h" #include "key.h" #include "ring.h" +#include "ival.h" #include "scoutfs_trace.h" /* - * The manifest organizes log segment blocks into a tree structure. + * The manifest organizes log segments into levels of item indexes. New + * segments arrive at level 0 which can have many segments with + * overlapping keys. Then segments are merged into progressively larger + * higher levels which do not have segments with overlapping keys. * - * Each level of the tree contains an ordered list of log segments whose - * item keys don't overlap. The first level (level 0) of the tree is - * the exception whose segments can have key ranges that overlap. - * - * We also store pointers to the manifest entries in a radix tree - * indexed by their block number so that we can easily update existing - * entries. - * - * Level 0 segments are stored in the list with the most recent at the - * head of the list. Level 0's rb tree will always be empty. + * All the segments for all the levels are stored in one interval tree. + * This lets reads find all the overlapping segments in all levels with + * one tree walk instead of walks per level. It also lets us move + * segments around the levels by updating their level field rather than + * removing them from one level index and adding them to another. */ struct scoutfs_manifest { spinlock_t lock; - - struct radix_tree_root blkno_radix; - struct list_head level_zero; - - struct scoutfs_level { - struct rb_root root; - u64 count; - } levels[SCOUTFS_MAX_LEVEL + 1]; + struct scoutfs_ival_tree itree; }; +/* + * There's some redundancy between the interval struct and the manifest + * entry struct. If we re-use both we duplicate fields and memory + * pressure is precious here. So we have a native combination of the + * two. + */ struct scoutfs_manifest_node { - struct rb_node node; - struct list_head head; - - struct scoutfs_ring_manifest_entry ment; + struct scoutfs_ival ival; + u64 blkno; + u64 seq; + unsigned char level; }; -static void insert_mnode(struct rb_root *root, - struct scoutfs_manifest_node *ins) -{ - struct rb_node **node = &root->rb_node; - struct scoutfs_manifest_node *mnode; - struct rb_node *parent = NULL; - int cmp; - - while (*node) { - parent = *node; - mnode = rb_entry(*node, struct scoutfs_manifest_node, node); - - cmp = scoutfs_key_cmp(&ins->ment.first, &mnode->ment.first); - if (cmp < 0) - node = &(*node)->rb_left; - else - node = &(*node)->rb_right; - } - - rb_link_node(&ins->node, parent, node); - rb_insert_color(&ins->node, root); -} - -static struct scoutfs_manifest_node *find_mnode(struct rb_root *root, - struct scoutfs_key *key) -{ - struct rb_node *node = root->rb_node; - struct scoutfs_manifest_node *mnode; - int cmp; - - while (node) { - mnode = rb_entry(node, struct scoutfs_manifest_node, node); - - cmp = scoutfs_cmp_key_range(key, &mnode->ment.first, - &mnode->ment.last); - if (cmp < 0) - node = node->rb_left; - else if (cmp > 0) - node = node->rb_right; - else - return mnode; - } - - return NULL; -} - /* - * Find a manifest node at the given block number and return it after - * removing it from either the level 0 list or level rb trees. It's - * left in the blkno radix. + * Remove an exact match of the entry from the manifest. It's normal + * for ring replay can try to remove an entry that doesn't exist if ring + * wrapping and manifest deletion combine in just the right way. */ -static struct scoutfs_manifest_node *unlink_mnode(struct scoutfs_manifest *mani, - u64 blkno) - -{ - struct scoutfs_manifest_node *mnode; - - mnode = radix_tree_lookup(&mani->blkno_radix, blkno); - if (mnode) { - trace_scoutfs_delete_manifest(&mnode->ment); - - if (!list_empty(&mnode->head)) - list_del_init(&mnode->head); - if (!RB_EMPTY_NODE(&mnode->node)) { - rb_erase(&mnode->node, - &mani->levels[mnode->ment.level].root); - mani->levels[mnode->ment.level].count--; - RB_CLEAR_NODE(&mnode->node); - } - } - - return mnode; -} - -/* - * This is called during ring replay. Because of the way the ring works - * we can get deletion entries for segments that we don't yet have - * in the replayed ring state. - */ -void scoutfs_delete_manifest(struct super_block *sb, u64 blkno) +static void delete_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *ment) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_manifest *mani = sbi->mani; struct scoutfs_manifest_node *mnode; + struct scoutfs_ival *ival; + + ival = NULL; + while ((ival = scoutfs_next_ival(&mani->itree, &ment->first, + &ment->last, ival))) { + mnode = container_of(ival, struct scoutfs_manifest_node, ival); + + if (mnode->blkno == le64_to_cpu(ment->blkno) && + mnode->seq == le64_to_cpu(ment->seq) && + !scoutfs_key_cmp(&ment->first, &mnode->ival.start) && + !scoutfs_key_cmp(&ment->last, &mnode->ival.end)) + break; + } + + if (ival) { + trace_scoutfs_delete_manifest(ment); + + scoutfs_remove_ival(&mani->itree, &mnode->ival); + kfree(mnode); + } +} + +void scoutfs_delete_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *ment) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_manifest *mani = sbi->mani; spin_lock(&mani->lock); - mnode = unlink_mnode(mani, blkno); - if (mnode) - radix_tree_delete(&mani->blkno_radix, blkno); + delete_manifest(sb, ment); spin_unlock(&mani->lock); - if (mnode) - kfree(mnode); } -/* - * A newly inserted manifest can be inserted at the level - * above the first block that it intersects. - */ -static u8 insertion_level(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment) +static void insert_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *ment, + struct scoutfs_manifest_node *mnode) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_manifest *mani = sbi->mani; + + trace_scoutfs_insert_manifest(ment); + + mnode->ival.start = ment->first; + mnode->ival.end = ment->last; + mnode->blkno = le64_to_cpu(ment->blkno); + mnode->seq = le64_to_cpu(ment->seq); + mnode->level = ment->level; + + scoutfs_insert_ival(&mani->itree, &mnode->ival); +} + +int scoutfs_insert_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *ment) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_manifest *mani = sbi->mani; struct scoutfs_manifest_node *mnode; - int i; - list_for_each_entry(mnode, &mani->level_zero, head) { - if (scoutfs_cmp_key_ranges(&ment->first, &ment->last, - &mnode->ment.first, - &mnode->ment.last) == 0) - return 0; - } - - /* XXX this <= looks fishy :/ */ - for (i = 1; i <= SCOUTFS_MAX_LEVEL; i++) { - mnode = find_mnode(&mani->levels[i].root, &ment->first); - if (mnode) - break; - if (mani->levels[i].count < SCOUTFS_MANIFESTS_PER_LEVEL) - return i; - } - - return i - 1; -} - -/* - * Insert an manifest entry into the blkno radix and either level 0 list - * or greater level rbtrees as appropriate. The new entry will replace - * any existing entry at its blkno, perhaps with different keys and - * level. - * - * The caller can ask that we find the highest level that the entry can - * be inserted into before it intersects with an existing entry. The - * caller's entry is updated with the new level so they can store it in - * the ring. Doing so here avoids extra ring churn of doing it later in - * merging. - */ -static int insert_manifest(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment, - bool find_level) -{ - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_manifest *mani = sbi->mani; - struct scoutfs_manifest_node *mnode; - struct scoutfs_manifest_node *found; - u64 blkno = le64_to_cpu(ment->blkno); - int ret = 0; - - /* allocation/preloading should be cheap enough to always try */ - mnode = kmalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS); + mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS); if (!mnode) return -ENOMEM; /* XXX hmm, fatal? prealloc?*/ - ret = radix_tree_preload(GFP_NOFS & ~__GFP_HIGHMEM); + spin_lock(&mani->lock); + insert_manifest(sb, ment, mnode); + spin_unlock(&mani->lock); + + return 0; +} + +/* + * The caller has inserted a temporary manifest entry while they were + * dirtying a segment. It's done now and they want the final segment + * range stored in the manifest and logged in the ring. + * + * If this returns an error then nothing has changed. + * + * XXX we'd also need to add stale manifest entry's to the ring + * XXX In the future we'd send it to the leader + */ +int scoutfs_finalize_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *existing, + struct scoutfs_manifest_entry *updated) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_manifest *mani = sbi->mani; + struct scoutfs_manifest_node *mnode; + int ret; + + mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS); + if (!mnode) + return -ENOMEM; /* XXX hmm, fatal? prealloc?*/ + + ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST, + updated, + sizeof(struct scoutfs_manifest_entry)); if (ret) { kfree(mnode); return ret; } - INIT_LIST_HEAD(&mnode->head); - RB_CLEAR_NODE(&mnode->node); - spin_lock(&mani->lock); - - /* reuse found to avoid radix delete/insert churn */ - found = unlink_mnode(mani, blkno); - if (!found) { - radix_tree_insert(&mani->blkno_radix, blkno, mnode); - } else { - swap(found, mnode); - } - - /* careful to find our level after deleting old blkno ment */ - if (find_level) - ment->level = insertion_level(sb, ment); - - trace_scoutfs_insert_manifest(ment); - - mnode->ment = *ment; - if (ment->level) { - insert_mnode(&mani->levels[ment->level].root, mnode); - mani->levels[ment->level].count++; - } else { - list_add(&mnode->head, &mani->level_zero); - } - + delete_manifest(sb, existing); + insert_manifest(sb, updated, mnode); spin_unlock(&mani->lock); - radix_tree_preload_end(); - kfree(found); return 0; } -/* Index an existing entry */ -int scoutfs_insert_manifest(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment) +/* sorted by increasing level then decreasing seq */ +static int cmp_ments(const void *A, const void *B) { - return insert_manifest(sb, ment, false); + const struct scoutfs_manifest_entry *a = A; + const struct scoutfs_manifest_entry *b = B; + int cmp; + + cmp = (int)a->level - (int)b->level; + if (cmp) + return cmp; + + if (le64_to_cpu(a->seq) > le64_to_cpu(b->seq)) + return -1; + if (le64_to_cpu(a->seq) < le64_to_cpu(b->seq)) + return 1; + return 0; +} + +static void swap_ments(void *A, void *B, int size) +{ + struct scoutfs_manifest_entry *a = A; + struct scoutfs_manifest_entry *b = B; + + swap(*a, *b); } /* - * Add an entry for a newly written segment to the indexes and record it - * in the ring. The entry can be modified by insertion. + * Give the caller an allocated array of manifest entries that intersect + * their search key. The array is sorted in the order for searching for + * the most recent item: decreasing sequence in level 0 then increasing + * levels. * - * XXX we'd also need to add stale manifest entry's to the ring - * XXX In the future we'd send it to the leader + * The live manifest can change while the caller walks their array but + * the segments will not be reclaimed and the caller has grants that + * protect their items in the segments even if the segments shift over + * time. + * + * The number of elements in the array is returned, or negative errors, + * and the array is not allocated if 0 is returned. + * + * XXX need to actually keep the segments from being reclaimed */ -int scoutfs_new_manifest(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment) -{ - int ret; - - ret = insert_manifest(sb, ment, true); - if (!ret) { - ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST, - ment, sizeof(*ment)); - if (ret) - scoutfs_delete_manifest(sb, le64_to_cpu(ment->blkno)); - } - - return ret; -} - -/* - * Fill the caller's ment with the next log segment in the manifest that - * might contain the given range. The caller initializes the ment to - * zeros to find the first log segment. - * - * This can return multiple log segments from level 0 in decreasing age. - * Then it can return at most one log segment in each level that - * intersects the given range. - * - * Returns true if an entry was found and is now described in ment, - * false when there are no more segments that contain the range. - * - * XXX could use the l0 seq to walk the list and skipb locks we've - * already seen. I'm not sure that we'll be able to keep manifest - * entries pinned while we're away blocking. We might fail to find the - * last entry's block in the radix when we return. - */ -bool scoutfs_foreach_range_segment(struct super_block *sb, - struct scoutfs_key *first, - struct scoutfs_key *last, - struct scoutfs_ring_manifest_entry *ment) +int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key, + struct scoutfs_manifest_entry **ments_ret) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_manifest *mani = sbi->mani; + struct scoutfs_manifest_entry *ments; struct scoutfs_manifest_node *mnode; - bool found = false; + struct scoutfs_ival *ival; + unsigned nr; int i; - if (ment->level >= SCOUTFS_MAX_LEVEL) - return false; + /* make a reasonably large initial guess */ + i = 16; + ments = NULL; + do { + kfree(ments); + nr = i; + ments = kmalloc(nr * sizeof(struct scoutfs_manifest_entry), + GFP_NOFS); + if (!ments) + return -ENOMEM; - spin_lock(&mani->lock); - - if (ment->level == 0) { - if (ment->blkno) { - mnode = radix_tree_lookup(&mani->blkno_radix, - le64_to_cpu(ment->blkno)); - mnode = list_next_entry(mnode, head); - } else { - mnode = list_first_entry(&mani->level_zero, - struct scoutfs_manifest_node, - head); - } - - list_for_each_entry_from(mnode, &mani->level_zero, head) { - if (scoutfs_cmp_key_ranges(first, last, - &mnode->ment.first, - &mnode->ment.last) == 0) { - *ment = mnode->ment; - found = true; - break; + spin_lock(&mani->lock); + i = 0; + ival = NULL; + while ((ival = scoutfs_next_ival(&mani->itree, key, key, + ival))) { + if (i < nr) { + mnode = container_of(ival, + struct scoutfs_manifest_node, ival); + ments[i].blkno = cpu_to_le64(mnode->blkno); + ments[i].seq = cpu_to_le64(mnode->seq); + ments[i].level = mnode->level; + ments[i].first = ival->start; + ments[i].last = ival->end; } + i++; } + spin_unlock(&mani->lock); + + } while (i > nr); + + if (i) { + sort(ments, i, sizeof(struct scoutfs_manifest_entry), + cmp_ments, swap_ments); + } else { + kfree(ments); + ments = NULL; } - if (!found) { - /* - * The log segments in the each level fully cover the - * key range and don't overlap. So we will always find - * a segment that matches whatever key we look for. We - * look for the start of the range because iterators are - * walk the keyspace sequentially. - */ - for (i = ment->level + 1; i <= SCOUTFS_MAX_LEVEL; i++) { - mnode = find_mnode(&mani->levels[i].root, first); - if (mnode) { - *ment = mnode->ment; - found = true; - break; - } - } - if (!found) - ment->level = SCOUTFS_MAX_LEVEL; - } - - spin_unlock(&mani->lock); - - return found; + *ments_ret = ments; + return i; } int scoutfs_setup_manifest(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_manifest *mani; - int i; - mani = kmalloc(sizeof(struct scoutfs_manifest), GFP_KERNEL); + mani = kzalloc(sizeof(struct scoutfs_manifest), GFP_KERNEL); if (!mani) return -ENOMEM; spin_lock_init(&mani->lock); - INIT_RADIX_TREE(&mani->blkno_radix, GFP_NOFS); - INIT_LIST_HEAD(&mani->level_zero); - - for (i = 0; i < ARRAY_SIZE(mani->levels); i++) - mani->levels[i].root = RB_ROOT; + scoutfs_init_ival_tree(&mani->itree); sbi->mani = mani; @@ -380,34 +286,21 @@ int scoutfs_setup_manifest(struct super_block *sb) } /* - * This is called once the manifest will no longer be used. We iterate - * over the blkno radix deleting radix entries and freeing manifest - * nodes. + * This is called once the manifest will no longer be used. */ void scoutfs_destroy_manifest(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_manifest *mani = sbi->mani; - struct scoutfs_manifest_node *mnodes[16]; - unsigned long first_index = 0; - int ret; - int i; + struct scoutfs_ival *ival; + struct rb_node *node; + struct rb_node tmp; - for (;;) { - ret = radix_tree_gang_lookup(&mani->blkno_radix, - (void **)mnodes, first_index, - ARRAY_SIZE(mnodes)); - if (!ret) - break; + if (mani) { + foreach_postorder_ival_safe(&mani->itree, ival, node, tmp) + kfree(ival); - for (i = 0; i < ret; i++) { - first_index = le64_to_cpu(mnodes[i]->ment.blkno); - radix_tree_delete(&mani->blkno_radix, first_index); - kfree(mnodes[i]); - } - first_index++; + kfree(mani); + sbi->mani = NULL; } - - kfree(sbi->mani); - sbi->mani = NULL; } diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h index ea3eef18..5223f069 100644 --- a/kmod/src/manifest.h +++ b/kmod/src/manifest.h @@ -5,14 +5,14 @@ int scoutfs_setup_manifest(struct super_block *sb); void scoutfs_destroy_manifest(struct super_block *sb); int scoutfs_insert_manifest(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment); -int scoutfs_new_manifest(struct super_block *sb, - struct scoutfs_ring_manifest_entry *ment); -void scoutfs_delete_manifest(struct super_block *sb, u64 blkno); + struct scoutfs_manifest_entry *ment); +void scoutfs_delete_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *ment); +int scoutfs_finalize_manifest(struct super_block *sb, + struct scoutfs_manifest_entry *existing, + struct scoutfs_manifest_entry *updated); -bool scoutfs_foreach_range_segment(struct super_block *sb, - struct scoutfs_key *first, - struct scoutfs_key *last, - struct scoutfs_ring_manifest_entry *ment); +int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key, + struct scoutfs_manifest_entry **ments_ret); #endif diff --git a/kmod/src/ring.c b/kmod/src/ring.c index 6743fb9f..cbb36835 100644 --- a/kmod/src/ring.c +++ b/kmod/src/ring.c @@ -28,8 +28,7 @@ static int replay_ring_block(struct super_block *sb, struct buffer_head *bh) { struct scoutfs_ring_block *ring = (void *)bh->b_data; struct scoutfs_ring_entry *ent = (void *)(ring + 1); - struct scoutfs_ring_manifest_entry *ment; - struct scoutfs_ring_del_manifest *del; + struct scoutfs_manifest_entry *ment; struct scoutfs_ring_bitmap *bm; int ret = 0; int i; @@ -43,8 +42,8 @@ static int replay_ring_block(struct super_block *sb, struct buffer_head *bh) ret = scoutfs_insert_manifest(sb, ment); break; case SCOUTFS_RING_DEL_MANIFEST: - del = (void *)(ent + 1); - scoutfs_delete_manifest(sb, le64_to_cpu(del->blkno)); + ment = (void *)(ent + 1); + scoutfs_delete_manifest(sb, ment); break; case SCOUTFS_RING_BITMAP: bm = (void *)(ent + 1); diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 51da5c23..44f312c3 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -25,6 +25,7 @@ #include #include "key.h" +#include "format.h" TRACE_EVENT(scoutfs_bloom_hit, TP_PROTO(struct scoutfs_key *key), @@ -189,7 +190,7 @@ TRACE_EVENT(scoutfs_write_super, ); TRACE_EVENT(scoutfs_insert_manifest, - TP_PROTO(struct scoutfs_ring_manifest_entry *ment), + TP_PROTO(struct scoutfs_manifest_entry *ment), TP_ARGS(ment), @@ -225,7 +226,7 @@ TRACE_EVENT(scoutfs_insert_manifest, ); TRACE_EVENT(scoutfs_delete_manifest, - TP_PROTO(struct scoutfs_ring_manifest_entry *ment), + TP_PROTO(struct scoutfs_manifest_entry *ment), TP_ARGS(ment), diff --git a/kmod/src/segment.c b/kmod/src/segment.c index c5504626..0591c7f7 100644 --- a/kmod/src/segment.c +++ b/kmod/src/segment.c @@ -66,7 +66,7 @@ struct scoutfs_item_iter { struct buffer_head *bh; struct scoutfs_item *item; u64 blkno; - bool restart_after; + struct scoutfs_key after_seg; }; void scoutfs_put_iter_list(struct list_head *list) @@ -138,8 +138,6 @@ static bool try_lock_dirty_mutex(struct super_block *sb, u64 blkno) * then search through the item keys. The first matching item we find * is returned. * - * XXX lock the dirty log segment? - * * -ENOENT is returned if the item isn't present. The caller needs to put * the ref if we return success. */ @@ -147,12 +145,15 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_item_ref *ref) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_ring_manifest_entry ment; struct scoutfs_item *item = NULL; struct scoutfs_bloom_bits bits; + struct scoutfs_manifest_entry *ments; struct buffer_head *bh; bool locked; + u64 blkno; int ret; + int nr; + int i; /* XXX hold manifest */ @@ -160,13 +161,19 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key, item = NULL; ret = -ENOENT; - memset(&ment, 0, sizeof(struct scoutfs_ring_manifest_entry)); - while (scoutfs_foreach_range_segment(sb, key, key, &ment)) { + nr = scoutfs_manifest_find_key(sb, key, &ments); + if (nr < 0) + return nr; + if (nr == 0) + return -ENOENT; + + for (i = 0; i < nr; i++) { /* XXX read-ahead all bloom blocks */ + blkno = le64_to_cpu(ments[i].blkno); + /* XXX verify seqs */ - ret = scoutfs_test_bloom_bits(sb, le64_to_cpu(ment.blkno), - key, &bits); + ret = scoutfs_test_bloom_bits(sb, blkno, key, &bits); if (ret < 0) break; if (!ret) { @@ -176,9 +183,8 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key, /* XXX read-ahead all item header blocks */ - locked = try_lock_dirty_mutex(sb, le64_to_cpu(ment.blkno)); - ret = scoutfs_skip_lookup(sb, le64_to_cpu(ment.blkno), key, - &bh, &item); + locked = try_lock_dirty_mutex(sb, blkno); + ret = scoutfs_skip_lookup(sb, blkno, key, &bh, &item); if (locked) mutex_unlock(&sbi->dirty_mutex); if (ret) { @@ -189,12 +195,14 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key, break; } + kfree(ments); + /* XXX release manifest */ /* XXX read-ahead all value blocks? */ if (!ret) { - ret = populate_ref(sb, le64_to_cpu(ment.blkno), bh, item, ref); + ret = populate_ref(sb, blkno, bh, item, ref); brelse(bh); } @@ -312,49 +320,49 @@ static int start_dirty_segment(struct super_block *sb, u64 blkno) } /* - * As we fill a dirty segment we don't know which keys it's going to - * contain. We add a manifest entry in memory that has it contain all - * items so that reading will know to search the dirty segment. + * As we start to fill a dirty segment we don't know which keys it's + * going to contain. We add a manifest entry in memory that has it + * contain all items so that reading will know to search the dirty + * segment. * * Once it's finalized we know the specific range of items it contains * and we update the manifest entry in memory for that range and write * that to the ring. + * + * Inserting the updated segment can fail. If we deleted the segment, + * then insertion failed, then reinserting the original entry could fail. + * Instead we briefly allow two manifest entries for the same segment. */ static int update_dirty_segment_manifest(struct super_block *sb, u64 blkno, bool all_items) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_ring_manifest_entry ment; + struct scoutfs_manifest_entry ment; + struct scoutfs_manifest_entry updated; struct scoutfs_item_block *iblk; struct buffer_head *bh; - int ret; ment.blkno = cpu_to_le64(blkno); ment.seq = sbi->super.hdr.seq; ment.level = 0; - - if (all_items) { - memset(&ment.first, 0, sizeof(struct scoutfs_key)); - memset(&ment.last, ~0, sizeof(struct scoutfs_key)); - } else { - bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS); - if (!bh) { - ret = -EIO; - goto out; - } - - iblk = (void *)bh->b_data; - ment.first = iblk->first; - ment.last = iblk->last; - brelse(bh); - } + memset(&ment.first, 0, sizeof(struct scoutfs_key)); + memset(&ment.last, ~0, sizeof(struct scoutfs_key)); if (all_items) - ret = scoutfs_insert_manifest(sb, &ment); - else - ret = scoutfs_new_manifest(sb, &ment); -out: - return ret; + return scoutfs_insert_manifest(sb, &ment); + + bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS); + if (!bh) + return -EIO; + + updated = ment; + + iblk = (void *)bh->b_data; + updated.first = iblk->first; + updated.last = iblk->last; + brelse(bh); + + return scoutfs_finalize_manifest(sb, &ment, &updated); } /* @@ -670,41 +678,40 @@ int scoutfs_delete_item(struct super_block *sb, struct scoutfs_item_ref *ref) * * We put the segment references and iteration cursors in a list in the * caller so that they can find many next items by advancing the cursors - * without having to walk the manifest and perform initial binary + * without having to walk the manifest and perform initial skip list * searches in each segment. * * The caller is responsible for putting the item ref if we return * success. -ENOENT is returned if there are no more items in the * search range. - * - * XXX this is wonky. We don't want to search the manifest for the - * range, just the initial value. Then we record the last key in - * segments we finish and only restart if least is > that or there are - * no least. We have to advance the first key when restarting the - * search. */ int scoutfs_next_item(struct super_block *sb, struct scoutfs_key *first, struct scoutfs_key *last, struct list_head *iter_list, struct scoutfs_item_ref *ref) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_ring_manifest_entry ment; + struct scoutfs_manifest_entry *ments = NULL; + struct scoutfs_key key = *first; + struct scoutfs_key least_hole; struct scoutfs_item_iter *least; struct scoutfs_item_iter *iter; struct scoutfs_item_iter *pos; bool locked; int ret; + int nr; + int i; restart: if (list_empty(iter_list)) { + /* find all the segments that may contain the key */ + ret = scoutfs_manifest_find_key(sb, &key, &ments); + if (ret == 0) + ret = -ENOENT; + if (ret < 0) + goto out; + nr = ret; - /* - * Find all the segments that intersect the search range - * and find the next item in the block from the start - * of the range. - */ - memset(&ment, 0, sizeof(struct scoutfs_ring_manifest_entry)); - while (scoutfs_foreach_range_segment(sb, first, last, &ment)) { + for (i = 0; i < nr; i++) { iter = kzalloc(sizeof(struct scoutfs_item_iter), GFP_NOFS); if (!iter) { @@ -712,38 +719,32 @@ restart: goto out; } - /* - * We will restart the walk of the manifest blocks if - * we iterate over all the items in this block without - * exhausting the search range. - */ - if (ment.level > 0 && - scoutfs_key_cmp(&ment.last, last) < 0) - iter->restart_after = true; - - iter->blkno = le64_to_cpu(ment.blkno); + iter->blkno = le64_to_cpu(ments[i].blkno); + iter->after_seg = ments[i].last; + scoutfs_inc_key(&iter->after_seg); list_add_tail(&iter->list, iter_list); } - if (list_empty(iter_list)) { - ret = -ENOENT; - goto out; - } + + kfree(ments); + ments = NULL; } + memset(&least_hole, ~0, sizeof(least_hole)); least = NULL; - ret = 0; list_for_each_entry_safe(iter, pos, iter_list, list) { locked = try_lock_dirty_mutex(sb, iter->blkno); - /* search towards the first key if we haven't yet */ + /* search towards the key if we haven't yet */ if (!iter->item) { - ret = scoutfs_skip_search(sb, iter->blkno, first, + ret = scoutfs_skip_search(sb, iter->blkno, &key, &iter->bh, &iter->item); + } else { + ret = 0; } - /* then iterate until we find or pass the first key */ - while (!ret && scoutfs_key_cmp(&iter->item->key, first) < 0) { + /* then iterate until we find or pass the key */ + while (!ret && scoutfs_key_cmp(&iter->item->key, &key) < 0) { ret = scoutfs_skip_next(sb, iter->blkno, &iter->bh, &iter->item); } @@ -751,44 +752,54 @@ restart: if (locked) mutex_unlock(&sbi->dirty_mutex); - /* we're done with this block if we past the last key */ - while (!ret && scoutfs_key_cmp(&iter->item->key, last) > 0) { + /* we're done with this segment if it has an item after last */ + if (!ret && scoutfs_key_cmp(&iter->item->key, last) > 0) { + list_del_init(&iter->list); brelse(iter->bh); - iter->bh = NULL; - iter->item = NULL; - ret = -ENOENT; + kfree(iter); + continue; } + /* + * If we run out of keys in the segment then we don't know + * the state of keys after this segment in this level. If + * the hole after the segment is still inside the search + * range then we might need to search it for the next + * item if the least item of the remaining blocks is + * greater than the hole. + */ if (ret == -ENOENT) { - if (iter->restart_after) { - /* need next block at this level */ - scoutfs_put_iter_list(iter_list); - goto restart; - } else { - /* this level is done */ - list_del_init(&iter->list); - brelse(iter->bh); - kfree(iter); - continue; - } - } - if (ret) - goto out; + if (scoutfs_key_cmp(&iter->after_seg, last) <= 0 && + scoutfs_key_cmp(&iter->after_seg, &least_hole) < 0) + least_hole = iter->after_seg; - /* remember the most recent smallest key from the first */ + list_del_init(&iter->list); + brelse(iter->bh); + kfree(iter); + continue; + } + + /* remember the most recent smallest key */ if (!least || scoutfs_key_cmp(&iter->item->key, &least->item->key) < 0) least = iter; } + /* if we had a gap before the least then we need a new search */ + if (least && scoutfs_key_cmp(&least_hole, &least->item->key) < 0) { + scoutfs_put_iter_list(iter_list); + key = least_hole; + goto restart; + } + if (least) ret = populate_ref(sb, least->blkno, least->bh, least->item, ref); else ret = -ENOENT; out: + kfree(ments); if (ret) scoutfs_put_iter_list(iter_list); return ret; - }