diff --git a/kmod/src/alloc.c b/kmod/src/alloc.c index 6fef77c5..0a938174 100644 --- a/kmod/src/alloc.c +++ b/kmod/src/alloc.c @@ -17,14 +17,14 @@ #include "super.h" #include "format.h" -#include "ring.h" +#include "btree.h" #include "cmp.h" #include "alloc.h" #include "counters.h" /* - * scoutfs allocates segments by storing regions of a bitmap in ring - * nodes. + * scoutfs allocates segments using regions of an allocation bitmap + * stored in btree items. * * Freed segments are recorded in nodes in an rbtree. The frees can't * satisfy allocation until they're committed to prevent overwriting @@ -40,7 +40,6 @@ struct seg_alloc { struct rw_semaphore rwsem; struct rb_root pending_root; - struct scoutfs_ring_info ring; u64 next_segno; }; @@ -49,7 +48,8 @@ struct seg_alloc { struct pending_region { struct rb_node node; - struct scoutfs_alloc_region reg; + u64 ind; + struct scoutfs_alloc_region_btree_val reg_val; }; static struct pending_region *find_pending(struct rb_root *root, u64 ind) @@ -60,9 +60,9 @@ static struct pending_region *find_pending(struct rb_root *root, u64 ind) while (node) { pend = container_of(node, struct pending_region, node); - if (ind < le64_to_cpu(pend->reg.index)) + if (ind < pend->ind) node = node->rb_left; - else if (ind > le64_to_cpu(pend->reg.index)) + else if (ind > pend->ind) node = node->rb_right; else return pend; @@ -76,15 +76,14 @@ static void insert_pending(struct rb_root *root, struct pending_region *ins) struct rb_node **node = &root->rb_node; struct rb_node *parent = NULL; struct pending_region *pend; - u64 ind = le64_to_cpu(ins->reg.index); while (*node) { parent = *node; pend = container_of(*node, struct pending_region, node); - if (ind < le64_to_cpu(pend->reg.index)) + if (ins->ind < pend->ind) node = &(*node)->rb_left; - else if (ind > le64_to_cpu(pend->reg.index)) + else if (ins->ind > pend->ind) node = &(*node)->rb_right; else BUG(); @@ -94,23 +93,29 @@ static void insert_pending(struct rb_root *root, struct pending_region *ins) rb_insert_color(&ins->node, root); } -static bool empty_region(struct scoutfs_alloc_region *reg) +static int copy_region_item(struct scoutfs_alloc_region_btree_key *reg_key, + struct scoutfs_alloc_region_btree_val *reg_val, + struct scoutfs_btree_item_ref *iref) { - int i; + if (iref->key_len != sizeof(struct scoutfs_alloc_region_btree_key) || + iref->val_len != sizeof(struct scoutfs_alloc_region_btree_val)) + return -EIO; - for (i = 0; i < ARRAY_SIZE(reg->bits); i++) { - if (reg->bits[i]) - return false; - } - - return true; + memcpy(reg_key, iref->key, iref->key_len); + memcpy(reg_val, iref->val, iref->val_len); + return 0; } +/* + * We're careful to copy the bitmaps out to aligned versions so that + * we can use native bitops that require aligned longs. + */ int scoutfs_alloc_segno(struct super_block *sb, u64 *segno) { - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_super_block *super = &sbi->super; - struct scoutfs_alloc_region *reg; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + struct scoutfs_alloc_region_btree_key reg_key; + struct scoutfs_alloc_region_btree_val __aligned(sizeof(long)) reg_val; + SCOUTFS_BTREE_ITEM_REF(iref); DECLARE_SEG_ALLOC(sb, sal); u64 ind; int ret; @@ -132,36 +137,47 @@ int scoutfs_alloc_segno(struct super_block *sb, u64 *segno) nr = sal->next_segno & SCOUTFS_ALLOC_REGION_MASK; for (;;) { - reg = scoutfs_ring_lookup_next(&sal->ring, &ind); - if (reg == NULL && ind != 0) { + reg_key.index = cpu_to_be64(ind); + ret = scoutfs_btree_next(sb, &super->alloc_root, + ®_key, sizeof(reg_key), &iref); + if (ret == -ENOENT && ind != 0) { ind = 0; nr = 0; continue; } - if (IS_ERR_OR_NULL(reg)) { - if (IS_ERR(reg)) - ret = PTR_ERR(reg); - else + if (ret < 0) { + if (ret == -ENOENT) ret = -ENOSPC; goto out; } - nr = find_next_bit_le(reg->bits, SCOUTFS_ALLOC_REGION_BITS, nr); - if (nr < SCOUTFS_ALLOC_REGION_BITS) + ret = copy_region_item(®_key, ®_val, &iref); + scoutfs_btree_put_iref(&iref); + if (ret) + goto out; + + ind = be64_to_cpu(reg_key.index); + nr = find_next_bit_le(reg_val.bits, SCOUTFS_ALLOC_REGION_BITS, nr); + if (nr < SCOUTFS_ALLOC_REGION_BITS) { break; + } /* possible for nr to be after all free bits, keep going */ ind++; nr = 0; } - scoutfs_ring_dirty(&sal->ring, reg); + clear_bit_le(nr, reg_val.bits); - ind = le64_to_cpu(reg->index); - - clear_bit_le(nr, reg->bits); - if (empty_region(reg)) - scoutfs_ring_delete(&sal->ring, reg); + if (bitmap_empty((long *)reg_val.bits, SCOUTFS_ALLOC_REGION_BITS)) + ret = scoutfs_btree_delete(sb, &super->alloc_root, + ®_key, sizeof(reg_key)); + else + ret = scoutfs_btree_update(sb, &super->alloc_root, + ®_key, sizeof(reg_key), + ®_val, sizeof(reg_val)); + if (ret) + goto out; *segno = (ind << SCOUTFS_ALLOC_REGION_SHIFT) + nr; sal->next_segno = *segno + 1; @@ -180,12 +196,11 @@ out: /* * Record newly freed sgements in pending regions. These are applied to - * ring nodes as the transaction commits. + * persistent regions in btree items as the transaction commits. */ int scoutfs_alloc_free(struct super_block *sb, u64 segno) { - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct pending_region *pend; DECLARE_SEG_ALLOC(sb, sal); u64 ind; @@ -205,11 +220,11 @@ int scoutfs_alloc_free(struct super_block *sb, u64 segno) goto out; } - pend->reg.index = cpu_to_le64(ind); + pend->ind = ind; insert_pending(&sal->pending_root, pend); } - set_bit_le(nr, pend->reg.bits); + set_bit_le(nr, pend->reg_val.bits); scoutfs_inc_counter(sb, alloc_free); le64_add_cpu(&super->free_segs, 1); ret = 0; @@ -221,91 +236,73 @@ out: return ret; } -static void or_region_bits(struct scoutfs_alloc_region *dst, - struct scoutfs_alloc_region *src) -{ - int i; - - for (i = 0; i < ARRAY_SIZE(dst->bits); i++) - dst->bits[i] |= src->bits[i]; -} - -int scoutfs_alloc_has_dirty(struct super_block *sb) -{ - DECLARE_SEG_ALLOC(sb, sal); - int ret; - - down_write(&sal->rwsem); - ret = !!(scoutfs_ring_has_dirty(&sal->ring) || - !RB_EMPTY_ROOT(&sal->pending_root)); - up_write(&sal->rwsem); - - return ret; -} - /* - * First we apply the pending frees to create the final set of dirty - * region nodes and then ask the ring to write them to the ring. + * Apply the pending frees to create the final set of dirty btree + * blocks. The caller will write the btree blocks. We're destroying + * the pending free record here so from this point on the pending free + * blocks could be visible to allocation. The caller can't finish with + * the transaction until the btree is written successfully. */ -int scoutfs_alloc_submit_write(struct super_block *sb, - struct scoutfs_bio_completion *comp) +int scoutfs_alloc_apply_pending(struct super_block *sb) { + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SEG_ALLOC(sb, sal); - struct scoutfs_alloc_region *reg; struct pending_region *pend; struct rb_node *node; - u64 ind; + struct scoutfs_alloc_region_btree_key reg_key; + struct scoutfs_alloc_region_btree_val __aligned(sizeof(long)) reg_val; + SCOUTFS_BTREE_ITEM_REF(iref); int ret; down_write(&sal->rwsem); + ret = 0; while ((node = rb_first(&sal->pending_root))) { pend = container_of(node, struct pending_region, node); - ind = le64_to_cpu(pend->reg.index); + /* see if we have a region for this index */ + reg_key.index = cpu_to_be64(pend->ind); + ret = scoutfs_btree_lookup(sb, &super->alloc_root, + ®_key, sizeof(reg_key), &iref); + if (ret == -ENOENT) { + /* create a new item if we don't */ + ret = scoutfs_btree_insert(sb, &super->alloc_root, + ®_key, sizeof(reg_key), + &pend->reg_val, + sizeof(pend->reg_val)); + } else if (ret == 0) { + /* and update the existing item if we do */ + ret = copy_region_item(®_key, ®_val, &iref); + scoutfs_btree_put_iref(&iref); + if (ret) + break; - reg = scoutfs_ring_lookup(&sal->ring, &ind); - if (!reg) { - reg = scoutfs_ring_insert(&sal->ring, &ind, - sizeof(struct scoutfs_alloc_region)); - if (!reg) { - ret = -ENOMEM; - goto out; - } + bitmap_or((long *)reg_val.bits, (long *)reg_val.bits, + (long *)pend->reg_val.bits, + SCOUTFS_ALLOC_REGION_BITS); - memset(reg, 0, sizeof(struct scoutfs_alloc_region)); - reg->index = cpu_to_le64(ind); + ret = scoutfs_btree_update(sb, &super->alloc_root, + ®_key, sizeof(reg_key), + ®_val, sizeof(reg_val)); } - - or_region_bits(reg, &pend->reg); - scoutfs_ring_dirty(&sal->ring, reg); + if (ret < 0) + break; rb_erase(&pend->node, &sal->pending_root); kfree(pend); } - ret = scoutfs_ring_submit_write(sb, &sal->ring, comp); -out: up_write(&sal->rwsem); + return ret; } -void scoutfs_alloc_write_complete(struct super_block *sb) -{ - DECLARE_SEG_ALLOC(sb, sal); - - down_write(&sal->rwsem); - scoutfs_ring_write_complete(&sal->ring); - up_write(&sal->rwsem); -} - /* * Return the number of blocks free for statfs. */ u64 scoutfs_alloc_bfree(struct super_block *sb) { - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SEG_ALLOC(sb, sal); u64 bfree; @@ -316,31 +313,13 @@ u64 scoutfs_alloc_bfree(struct super_block *sb) return bfree; } -static int alloc_ring_compare_key(void *key, void *data) -{ - u64 *ind = key; - struct scoutfs_alloc_region *reg = data; - - return scoutfs_cmp_u64s(*ind, le64_to_cpu(reg->index)); -} - -static int alloc_ring_compare_data(void *A, void *B) -{ - struct scoutfs_alloc_region *a = A; - struct scoutfs_alloc_region *b = B; - - return scoutfs_cmp_u64s(le64_to_cpu(a->index), le64_to_cpu(b->index)); -} - int scoutfs_alloc_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct scoutfs_super_block *super = &sbi->super; struct seg_alloc *sal; - int ret; /* bits need to be aligned so hosts can use native bitops */ - BUILD_BUG_ON(offsetof(struct scoutfs_alloc_region, bits) & + BUILD_BUG_ON(offsetof(struct scoutfs_alloc_region_btree_val, bits) & (sizeof(long) - 1)); sal = kzalloc(sizeof(struct seg_alloc), GFP_KERNEL); @@ -349,14 +328,6 @@ int scoutfs_alloc_setup(struct super_block *sb) init_rwsem(&sal->rwsem); sal->pending_root = RB_ROOT; - scoutfs_ring_init(&sal->ring, &super->alloc_ring, - alloc_ring_compare_key, alloc_ring_compare_data); - - ret = scoutfs_ring_load(sb, &sal->ring); - if (ret) { - kfree(sal); - return ret; - } /* XXX read next_segno from super? */ @@ -373,7 +344,6 @@ void scoutfs_alloc_destroy(struct super_block *sb) struct rb_node *node; if (sal) { - scoutfs_ring_destroy(&sal->ring); while ((node = rb_first(&sal->pending_root))) { pend = container_of(node, struct pending_region, node); rb_erase(&pend->node, &sal->pending_root); diff --git a/kmod/src/alloc.h b/kmod/src/alloc.h index bb185d90..d6c8a5b0 100644 --- a/kmod/src/alloc.h +++ b/kmod/src/alloc.h @@ -2,15 +2,11 @@ #define _SCOUTFS_ALLOC_H_ struct scoutfs_alloc_region; -struct scoutfs_bio_completion; int scoutfs_alloc_segno(struct super_block *sb, u64 *segno); int scoutfs_alloc_free(struct super_block *sb, u64 segno); -int scoutfs_alloc_has_dirty(struct super_block *sb); -int scoutfs_alloc_submit_write(struct super_block *sb, - struct scoutfs_bio_completion *comp); -void scoutfs_alloc_write_complete(struct super_block *sb); +int scoutfs_alloc_apply_pending(struct super_block *sb); u64 scoutfs_alloc_bfree(struct super_block *sb); int scoutfs_alloc_setup(struct super_block *sb); diff --git a/kmod/src/btree.c b/kmod/src/btree.c index 55a2a0a8..e6ad7f71 100644 --- a/kmod/src/btree.c +++ b/kmod/src/btree.c @@ -1769,6 +1769,7 @@ int scoutfs_btree_write_dirty(struct super_block *sb) struct scoutfs_btree_ring *bring = &super->bring; struct scoutfs_btree_root *roots[] = { &super->manifest.root, + &super->alloc_root, NULL, }; struct scoutfs_btree_root *root; diff --git a/kmod/src/format.h b/kmod/src/format.h index 7ca2f2e3..0f5b8638 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -208,12 +208,12 @@ struct scoutfs_manifest_btree_val { #define SCOUTFS_ALLOC_REGION_BITS (1 << SCOUTFS_ALLOC_REGION_SHIFT) #define SCOUTFS_ALLOC_REGION_MASK (SCOUTFS_ALLOC_REGION_BITS - 1) -/* - * The bits need to be aligned so that the host can use native long - * bitops on the bits in memory. - */ -struct scoutfs_alloc_region { - __le64 index; +struct scoutfs_alloc_region_btree_key { + __be64 index; +} __packed; + +/* The bits need to be aligned so that the hosts can use native long bit ops */ +struct scoutfs_alloc_region_btree_val { __le64 bits[SCOUTFS_ALLOC_REGION_BITS / 64]; } __packed; @@ -421,7 +421,7 @@ struct scoutfs_super_block { __le64 ring_gen; struct scoutfs_btree_ring bring; __le64 next_seg_seq; - struct scoutfs_ring_descriptor alloc_ring; + struct scoutfs_btree_root alloc_root; struct scoutfs_manifest manifest; struct scoutfs_inet_addr server_addr; } __packed; diff --git a/kmod/src/net.c b/kmod/src/net.c index 09a4d8b1..53a5e6f3 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -322,27 +322,22 @@ static void scoutfs_net_ring_commit_func(struct work_struct *work) struct net_info *nti = container_of(work, struct net_info, ring_commit_work); struct super_block *sb = nti->sb; - struct scoutfs_bio_completion comp; struct commit_waiter *cw; struct commit_waiter *pos; struct llist_node *node; int ret; - scoutfs_bio_init_comp(&comp); - down_write(&nti->ring_commit_rwsem); if (scoutfs_btree_has_dirty(sb)) { - ret = scoutfs_btree_write_dirty(sb) ?: - scoutfs_alloc_submit_write(sb, &comp) ?: - scoutfs_bio_wait_comp(sb, &comp) ?: + ret = scoutfs_alloc_apply_pending(sb) ?: + scoutfs_btree_write_dirty(sb) ?: scoutfs_write_dirty_super(sb); /* we'd need to loop or something */ BUG_ON(ret); scoutfs_btree_write_complete(sb); - scoutfs_alloc_write_complete(sb); scoutfs_advance_dirty_super(sb); } else { diff --git a/kmod/src/ring.c b/kmod/src/ring.c index 26e256f6..1e5f8e78 100644 --- a/kmod/src/ring.c +++ b/kmod/src/ring.c @@ -357,8 +357,7 @@ static unsigned most_blocks(unsigned long bytes) unsigned long space; space = SCOUTFS_BLOCK_SIZE - - sizeof(struct scoutfs_ring_block) - - sizeof(struct scoutfs_alloc_region); + sizeof(struct scoutfs_ring_block); return DIV_ROUND_UP(bytes, space); }