scoutfs: store allocator regions in btree

Convert the segment allocator to store its free region bitmaps in the
btree.

This is a very straight forward mechanical transformation.  We split the
allocator region into a big-endian index key and the bitmap value
payload.  We're careful to operate on aligned copies of the bitmaps so
that they're long aligned.

We can remove all the funky functions that were needed when writing the
ring.  All we're left with is a call to apply the pending allocations to
dirty btree blocks before writing the btree.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-06-29 15:57:44 -07:00
parent fc50072cf9
commit ff5a094833
6 changed files with 105 additions and 144 deletions

View File

@@ -17,14 +17,14 @@
#include "super.h"
#include "format.h"
#include "ring.h"
#include "btree.h"
#include "cmp.h"
#include "alloc.h"
#include "counters.h"
/*
* scoutfs allocates segments by storing regions of a bitmap in ring
* nodes.
* scoutfs allocates segments using regions of an allocation bitmap
* stored in btree items.
*
* Freed segments are recorded in nodes in an rbtree. The frees can't
* satisfy allocation until they're committed to prevent overwriting
@@ -40,7 +40,6 @@
struct seg_alloc {
struct rw_semaphore rwsem;
struct rb_root pending_root;
struct scoutfs_ring_info ring;
u64 next_segno;
};
@@ -49,7 +48,8 @@ struct seg_alloc {
struct pending_region {
struct rb_node node;
struct scoutfs_alloc_region reg;
u64 ind;
struct scoutfs_alloc_region_btree_val reg_val;
};
static struct pending_region *find_pending(struct rb_root *root, u64 ind)
@@ -60,9 +60,9 @@ static struct pending_region *find_pending(struct rb_root *root, u64 ind)
while (node) {
pend = container_of(node, struct pending_region, node);
if (ind < le64_to_cpu(pend->reg.index))
if (ind < pend->ind)
node = node->rb_left;
else if (ind > le64_to_cpu(pend->reg.index))
else if (ind > pend->ind)
node = node->rb_right;
else
return pend;
@@ -76,15 +76,14 @@ static void insert_pending(struct rb_root *root, struct pending_region *ins)
struct rb_node **node = &root->rb_node;
struct rb_node *parent = NULL;
struct pending_region *pend;
u64 ind = le64_to_cpu(ins->reg.index);
while (*node) {
parent = *node;
pend = container_of(*node, struct pending_region, node);
if (ind < le64_to_cpu(pend->reg.index))
if (ins->ind < pend->ind)
node = &(*node)->rb_left;
else if (ind > le64_to_cpu(pend->reg.index))
else if (ins->ind > pend->ind)
node = &(*node)->rb_right;
else
BUG();
@@ -94,23 +93,29 @@ static void insert_pending(struct rb_root *root, struct pending_region *ins)
rb_insert_color(&ins->node, root);
}
static bool empty_region(struct scoutfs_alloc_region *reg)
static int copy_region_item(struct scoutfs_alloc_region_btree_key *reg_key,
struct scoutfs_alloc_region_btree_val *reg_val,
struct scoutfs_btree_item_ref *iref)
{
int i;
if (iref->key_len != sizeof(struct scoutfs_alloc_region_btree_key) ||
iref->val_len != sizeof(struct scoutfs_alloc_region_btree_val))
return -EIO;
for (i = 0; i < ARRAY_SIZE(reg->bits); i++) {
if (reg->bits[i])
return false;
}
return true;
memcpy(reg_key, iref->key, iref->key_len);
memcpy(reg_val, iref->val, iref->val_len);
return 0;
}
/*
* We're careful to copy the bitmaps out to aligned versions so that
* we can use native bitops that require aligned longs.
*/
int scoutfs_alloc_segno(struct super_block *sb, u64 *segno)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_alloc_region *reg;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_alloc_region_btree_key reg_key;
struct scoutfs_alloc_region_btree_val __aligned(sizeof(long)) reg_val;
SCOUTFS_BTREE_ITEM_REF(iref);
DECLARE_SEG_ALLOC(sb, sal);
u64 ind;
int ret;
@@ -132,36 +137,47 @@ int scoutfs_alloc_segno(struct super_block *sb, u64 *segno)
nr = sal->next_segno & SCOUTFS_ALLOC_REGION_MASK;
for (;;) {
reg = scoutfs_ring_lookup_next(&sal->ring, &ind);
if (reg == NULL && ind != 0) {
reg_key.index = cpu_to_be64(ind);
ret = scoutfs_btree_next(sb, &super->alloc_root,
&reg_key, sizeof(reg_key), &iref);
if (ret == -ENOENT && ind != 0) {
ind = 0;
nr = 0;
continue;
}
if (IS_ERR_OR_NULL(reg)) {
if (IS_ERR(reg))
ret = PTR_ERR(reg);
else
if (ret < 0) {
if (ret == -ENOENT)
ret = -ENOSPC;
goto out;
}
nr = find_next_bit_le(reg->bits, SCOUTFS_ALLOC_REGION_BITS, nr);
if (nr < SCOUTFS_ALLOC_REGION_BITS)
ret = copy_region_item(&reg_key, &reg_val, &iref);
scoutfs_btree_put_iref(&iref);
if (ret)
goto out;
ind = be64_to_cpu(reg_key.index);
nr = find_next_bit_le(reg_val.bits, SCOUTFS_ALLOC_REGION_BITS, nr);
if (nr < SCOUTFS_ALLOC_REGION_BITS) {
break;
}
/* possible for nr to be after all free bits, keep going */
ind++;
nr = 0;
}
scoutfs_ring_dirty(&sal->ring, reg);
clear_bit_le(nr, reg_val.bits);
ind = le64_to_cpu(reg->index);
clear_bit_le(nr, reg->bits);
if (empty_region(reg))
scoutfs_ring_delete(&sal->ring, reg);
if (bitmap_empty((long *)reg_val.bits, SCOUTFS_ALLOC_REGION_BITS))
ret = scoutfs_btree_delete(sb, &super->alloc_root,
&reg_key, sizeof(reg_key));
else
ret = scoutfs_btree_update(sb, &super->alloc_root,
&reg_key, sizeof(reg_key),
&reg_val, sizeof(reg_val));
if (ret)
goto out;
*segno = (ind << SCOUTFS_ALLOC_REGION_SHIFT) + nr;
sal->next_segno = *segno + 1;
@@ -180,12 +196,11 @@ out:
/*
* Record newly freed sgements in pending regions. These are applied to
* ring nodes as the transaction commits.
* persistent regions in btree items as the transaction commits.
*/
int scoutfs_alloc_free(struct super_block *sb, u64 segno)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct pending_region *pend;
DECLARE_SEG_ALLOC(sb, sal);
u64 ind;
@@ -205,11 +220,11 @@ int scoutfs_alloc_free(struct super_block *sb, u64 segno)
goto out;
}
pend->reg.index = cpu_to_le64(ind);
pend->ind = ind;
insert_pending(&sal->pending_root, pend);
}
set_bit_le(nr, pend->reg.bits);
set_bit_le(nr, pend->reg_val.bits);
scoutfs_inc_counter(sb, alloc_free);
le64_add_cpu(&super->free_segs, 1);
ret = 0;
@@ -221,91 +236,73 @@ out:
return ret;
}
static void or_region_bits(struct scoutfs_alloc_region *dst,
struct scoutfs_alloc_region *src)
{
int i;
for (i = 0; i < ARRAY_SIZE(dst->bits); i++)
dst->bits[i] |= src->bits[i];
}
int scoutfs_alloc_has_dirty(struct super_block *sb)
{
DECLARE_SEG_ALLOC(sb, sal);
int ret;
down_write(&sal->rwsem);
ret = !!(scoutfs_ring_has_dirty(&sal->ring) ||
!RB_EMPTY_ROOT(&sal->pending_root));
up_write(&sal->rwsem);
return ret;
}
/*
* First we apply the pending frees to create the final set of dirty
* region nodes and then ask the ring to write them to the ring.
* Apply the pending frees to create the final set of dirty btree
* blocks. The caller will write the btree blocks. We're destroying
* the pending free record here so from this point on the pending free
* blocks could be visible to allocation. The caller can't finish with
* the transaction until the btree is written successfully.
*/
int scoutfs_alloc_submit_write(struct super_block *sb,
struct scoutfs_bio_completion *comp)
int scoutfs_alloc_apply_pending(struct super_block *sb)
{
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
DECLARE_SEG_ALLOC(sb, sal);
struct scoutfs_alloc_region *reg;
struct pending_region *pend;
struct rb_node *node;
u64 ind;
struct scoutfs_alloc_region_btree_key reg_key;
struct scoutfs_alloc_region_btree_val __aligned(sizeof(long)) reg_val;
SCOUTFS_BTREE_ITEM_REF(iref);
int ret;
down_write(&sal->rwsem);
ret = 0;
while ((node = rb_first(&sal->pending_root))) {
pend = container_of(node, struct pending_region, node);
ind = le64_to_cpu(pend->reg.index);
/* see if we have a region for this index */
reg_key.index = cpu_to_be64(pend->ind);
ret = scoutfs_btree_lookup(sb, &super->alloc_root,
&reg_key, sizeof(reg_key), &iref);
if (ret == -ENOENT) {
/* create a new item if we don't */
ret = scoutfs_btree_insert(sb, &super->alloc_root,
&reg_key, sizeof(reg_key),
&pend->reg_val,
sizeof(pend->reg_val));
} else if (ret == 0) {
/* and update the existing item if we do */
ret = copy_region_item(&reg_key, &reg_val, &iref);
scoutfs_btree_put_iref(&iref);
if (ret)
break;
reg = scoutfs_ring_lookup(&sal->ring, &ind);
if (!reg) {
reg = scoutfs_ring_insert(&sal->ring, &ind,
sizeof(struct scoutfs_alloc_region));
if (!reg) {
ret = -ENOMEM;
goto out;
}
bitmap_or((long *)reg_val.bits, (long *)reg_val.bits,
(long *)pend->reg_val.bits,
SCOUTFS_ALLOC_REGION_BITS);
memset(reg, 0, sizeof(struct scoutfs_alloc_region));
reg->index = cpu_to_le64(ind);
ret = scoutfs_btree_update(sb, &super->alloc_root,
&reg_key, sizeof(reg_key),
&reg_val, sizeof(reg_val));
}
or_region_bits(reg, &pend->reg);
scoutfs_ring_dirty(&sal->ring, reg);
if (ret < 0)
break;
rb_erase(&pend->node, &sal->pending_root);
kfree(pend);
}
ret = scoutfs_ring_submit_write(sb, &sal->ring, comp);
out:
up_write(&sal->rwsem);
return ret;
}
void scoutfs_alloc_write_complete(struct super_block *sb)
{
DECLARE_SEG_ALLOC(sb, sal);
down_write(&sal->rwsem);
scoutfs_ring_write_complete(&sal->ring);
up_write(&sal->rwsem);
}
/*
* Return the number of blocks free for statfs.
*/
u64 scoutfs_alloc_bfree(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
DECLARE_SEG_ALLOC(sb, sal);
u64 bfree;
@@ -316,31 +313,13 @@ u64 scoutfs_alloc_bfree(struct super_block *sb)
return bfree;
}
static int alloc_ring_compare_key(void *key, void *data)
{
u64 *ind = key;
struct scoutfs_alloc_region *reg = data;
return scoutfs_cmp_u64s(*ind, le64_to_cpu(reg->index));
}
static int alloc_ring_compare_data(void *A, void *B)
{
struct scoutfs_alloc_region *a = A;
struct scoutfs_alloc_region *b = B;
return scoutfs_cmp_u64s(le64_to_cpu(a->index), le64_to_cpu(b->index));
}
int scoutfs_alloc_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct seg_alloc *sal;
int ret;
/* bits need to be aligned so hosts can use native bitops */
BUILD_BUG_ON(offsetof(struct scoutfs_alloc_region, bits) &
BUILD_BUG_ON(offsetof(struct scoutfs_alloc_region_btree_val, bits) &
(sizeof(long) - 1));
sal = kzalloc(sizeof(struct seg_alloc), GFP_KERNEL);
@@ -349,14 +328,6 @@ int scoutfs_alloc_setup(struct super_block *sb)
init_rwsem(&sal->rwsem);
sal->pending_root = RB_ROOT;
scoutfs_ring_init(&sal->ring, &super->alloc_ring,
alloc_ring_compare_key, alloc_ring_compare_data);
ret = scoutfs_ring_load(sb, &sal->ring);
if (ret) {
kfree(sal);
return ret;
}
/* XXX read next_segno from super? */
@@ -373,7 +344,6 @@ void scoutfs_alloc_destroy(struct super_block *sb)
struct rb_node *node;
if (sal) {
scoutfs_ring_destroy(&sal->ring);
while ((node = rb_first(&sal->pending_root))) {
pend = container_of(node, struct pending_region, node);
rb_erase(&pend->node, &sal->pending_root);

View File

@@ -2,15 +2,11 @@
#define _SCOUTFS_ALLOC_H_
struct scoutfs_alloc_region;
struct scoutfs_bio_completion;
int scoutfs_alloc_segno(struct super_block *sb, u64 *segno);
int scoutfs_alloc_free(struct super_block *sb, u64 segno);
int scoutfs_alloc_has_dirty(struct super_block *sb);
int scoutfs_alloc_submit_write(struct super_block *sb,
struct scoutfs_bio_completion *comp);
void scoutfs_alloc_write_complete(struct super_block *sb);
int scoutfs_alloc_apply_pending(struct super_block *sb);
u64 scoutfs_alloc_bfree(struct super_block *sb);
int scoutfs_alloc_setup(struct super_block *sb);

View File

@@ -1769,6 +1769,7 @@ int scoutfs_btree_write_dirty(struct super_block *sb)
struct scoutfs_btree_ring *bring = &super->bring;
struct scoutfs_btree_root *roots[] = {
&super->manifest.root,
&super->alloc_root,
NULL,
};
struct scoutfs_btree_root *root;

View File

@@ -208,12 +208,12 @@ struct scoutfs_manifest_btree_val {
#define SCOUTFS_ALLOC_REGION_BITS (1 << SCOUTFS_ALLOC_REGION_SHIFT)
#define SCOUTFS_ALLOC_REGION_MASK (SCOUTFS_ALLOC_REGION_BITS - 1)
/*
* The bits need to be aligned so that the host can use native long
* bitops on the bits in memory.
*/
struct scoutfs_alloc_region {
__le64 index;
struct scoutfs_alloc_region_btree_key {
__be64 index;
} __packed;
/* The bits need to be aligned so that the hosts can use native long bit ops */
struct scoutfs_alloc_region_btree_val {
__le64 bits[SCOUTFS_ALLOC_REGION_BITS / 64];
} __packed;
@@ -421,7 +421,7 @@ struct scoutfs_super_block {
__le64 ring_gen;
struct scoutfs_btree_ring bring;
__le64 next_seg_seq;
struct scoutfs_ring_descriptor alloc_ring;
struct scoutfs_btree_root alloc_root;
struct scoutfs_manifest manifest;
struct scoutfs_inet_addr server_addr;
} __packed;

View File

@@ -322,27 +322,22 @@ static void scoutfs_net_ring_commit_func(struct work_struct *work)
struct net_info *nti = container_of(work, struct net_info,
ring_commit_work);
struct super_block *sb = nti->sb;
struct scoutfs_bio_completion comp;
struct commit_waiter *cw;
struct commit_waiter *pos;
struct llist_node *node;
int ret;
scoutfs_bio_init_comp(&comp);
down_write(&nti->ring_commit_rwsem);
if (scoutfs_btree_has_dirty(sb)) {
ret = scoutfs_btree_write_dirty(sb) ?:
scoutfs_alloc_submit_write(sb, &comp) ?:
scoutfs_bio_wait_comp(sb, &comp) ?:
ret = scoutfs_alloc_apply_pending(sb) ?:
scoutfs_btree_write_dirty(sb) ?:
scoutfs_write_dirty_super(sb);
/* we'd need to loop or something */
BUG_ON(ret);
scoutfs_btree_write_complete(sb);
scoutfs_alloc_write_complete(sb);
scoutfs_advance_dirty_super(sb);
} else {

View File

@@ -357,8 +357,7 @@ static unsigned most_blocks(unsigned long bytes)
unsigned long space;
space = SCOUTFS_BLOCK_SIZE -
sizeof(struct scoutfs_ring_block) -
sizeof(struct scoutfs_alloc_region);
sizeof(struct scoutfs_ring_block);
return DIV_ROUND_UP(bytes, space);
}