scoutfs: store the manifest in an interval tree

Now that we have the interval tree we can use it to store the manifest.
Instead of having different indexes for each level we store all the
levels in one index.

This simplifies the code quite a bit.  In particular, we won't have to
special case merging between level 0 and 1 quite as much because level 0
is no longer a special list.

We have a strong motivation to keep the manifest small.  So we get
rid of the blkno radix.  It wasn't wise to trade off more manifest
storage to make the ring a bit smaller.  We can store full manifests
in the ring instead of just the block numbers.

We rework the new_manifest interace that adds a final manifest entry
and logs it.  The ring entry addition and manifest update are atomic.

We're about to implement merging which will permute the manifest.  Read
methods won't be able to iterate over levels while racing with merging.
We change the manifest key search interface to return a full set of all
the segments that intersect the key.

The next item interface now knows how to restart the search if hits
the end of a segment on one level and the next least key is in another
segment and greater than the end of that completed segment.

There was also a very crazy cut+paste bug where next item was testing
that the item is past the last search key with a while instead of an if.
It'd spin throwing list_del_init() and brelse() debugging warnings.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2016-04-02 17:27:58 -07:00
parent 20cc8c220c
commit a07b41fa8b
7 changed files with 333 additions and 418 deletions

View File

@@ -153,7 +153,7 @@ enum {
* isn't unused key space between blocks in a level. We might search
* blocks when we didn't need to.
*/
struct scoutfs_ring_manifest_entry {
struct scoutfs_manifest_entry {
__le64 blkno;
__le64 seq;
__u8 level;
@@ -163,10 +163,6 @@ struct scoutfs_ring_manifest_entry {
#define SCOUTFS_MANIFESTS_PER_LEVEL 10
struct scoutfs_ring_del_manifest {
__le64 blkno;
} __packed;
/* 2^22 * 10^13 > 2^64 */
#define SCOUTFS_MAX_LEVEL 13

View File

@@ -5,6 +5,11 @@ struct scoutfs_ival_tree {
struct rb_root root;
};
static inline void scoutfs_init_ival_tree(struct scoutfs_ival_tree *tree)
{
tree->root = RB_ROOT;
}
struct scoutfs_ival {
struct rb_node node;
struct scoutfs_key start;
@@ -21,6 +26,16 @@ struct scoutfs_ival *scoutfs_next_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_key *end,
struct scoutfs_ival *ival);
/*
* Walk all the intervals in postorder. This lets us free each ival we
* see without erasing and rebalancing.
*/
#define foreach_postorder_ival_safe(itree, ival, node, tmp) \
for (node = rb_first_postorder(&(itree)->root); \
ival = container_of(node, struct scoutfs_ival, node), \
(node && (tmp = *node, 1)), node; \
node = rb_next_postorder(&tmp))
// struct rb_node {
// long unsigned int __rb_parent_color; /* 0 8 */
// struct rb_node * rb_right; /* 8 8 */

View File

@@ -13,366 +13,272 @@
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
#include <linux/sort.h>
#include "super.h"
#include "format.h"
#include "manifest.h"
#include "key.h"
#include "ring.h"
#include "ival.h"
#include "scoutfs_trace.h"
/*
* The manifest organizes log segment blocks into a tree structure.
* The manifest organizes log segments into levels of item indexes. New
* segments arrive at level 0 which can have many segments with
* overlapping keys. Then segments are merged into progressively larger
* higher levels which do not have segments with overlapping keys.
*
* Each level of the tree contains an ordered list of log segments whose
* item keys don't overlap. The first level (level 0) of the tree is
* the exception whose segments can have key ranges that overlap.
*
* We also store pointers to the manifest entries in a radix tree
* indexed by their block number so that we can easily update existing
* entries.
*
* Level 0 segments are stored in the list with the most recent at the
* head of the list. Level 0's rb tree will always be empty.
* All the segments for all the levels are stored in one interval tree.
* This lets reads find all the overlapping segments in all levels with
* one tree walk instead of walks per level. It also lets us move
* segments around the levels by updating their level field rather than
* removing them from one level index and adding them to another.
*/
struct scoutfs_manifest {
spinlock_t lock;
struct radix_tree_root blkno_radix;
struct list_head level_zero;
struct scoutfs_level {
struct rb_root root;
u64 count;
} levels[SCOUTFS_MAX_LEVEL + 1];
struct scoutfs_ival_tree itree;
};
/*
* There's some redundancy between the interval struct and the manifest
* entry struct. If we re-use both we duplicate fields and memory
* pressure is precious here. So we have a native combination of the
* two.
*/
struct scoutfs_manifest_node {
struct rb_node node;
struct list_head head;
struct scoutfs_ring_manifest_entry ment;
struct scoutfs_ival ival;
u64 blkno;
u64 seq;
unsigned char level;
};
static void insert_mnode(struct rb_root *root,
struct scoutfs_manifest_node *ins)
{
struct rb_node **node = &root->rb_node;
struct scoutfs_manifest_node *mnode;
struct rb_node *parent = NULL;
int cmp;
while (*node) {
parent = *node;
mnode = rb_entry(*node, struct scoutfs_manifest_node, node);
cmp = scoutfs_key_cmp(&ins->ment.first, &mnode->ment.first);
if (cmp < 0)
node = &(*node)->rb_left;
else
node = &(*node)->rb_right;
}
rb_link_node(&ins->node, parent, node);
rb_insert_color(&ins->node, root);
}
static struct scoutfs_manifest_node *find_mnode(struct rb_root *root,
struct scoutfs_key *key)
{
struct rb_node *node = root->rb_node;
struct scoutfs_manifest_node *mnode;
int cmp;
while (node) {
mnode = rb_entry(node, struct scoutfs_manifest_node, node);
cmp = scoutfs_cmp_key_range(key, &mnode->ment.first,
&mnode->ment.last);
if (cmp < 0)
node = node->rb_left;
else if (cmp > 0)
node = node->rb_right;
else
return mnode;
}
return NULL;
}
/*
* Find a manifest node at the given block number and return it after
* removing it from either the level 0 list or level rb trees. It's
* left in the blkno radix.
* Remove an exact match of the entry from the manifest. It's normal
* for ring replay can try to remove an entry that doesn't exist if ring
* wrapping and manifest deletion combine in just the right way.
*/
static struct scoutfs_manifest_node *unlink_mnode(struct scoutfs_manifest *mani,
u64 blkno)
{
struct scoutfs_manifest_node *mnode;
mnode = radix_tree_lookup(&mani->blkno_radix, blkno);
if (mnode) {
trace_scoutfs_delete_manifest(&mnode->ment);
if (!list_empty(&mnode->head))
list_del_init(&mnode->head);
if (!RB_EMPTY_NODE(&mnode->node)) {
rb_erase(&mnode->node,
&mani->levels[mnode->ment.level].root);
mani->levels[mnode->ment.level].count--;
RB_CLEAR_NODE(&mnode->node);
}
}
return mnode;
}
/*
* This is called during ring replay. Because of the way the ring works
* we can get deletion entries for segments that we don't yet have
* in the replayed ring state.
*/
void scoutfs_delete_manifest(struct super_block *sb, u64 blkno)
static void delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
struct scoutfs_ival *ival;
ival = NULL;
while ((ival = scoutfs_next_ival(&mani->itree, &ment->first,
&ment->last, ival))) {
mnode = container_of(ival, struct scoutfs_manifest_node, ival);
if (mnode->blkno == le64_to_cpu(ment->blkno) &&
mnode->seq == le64_to_cpu(ment->seq) &&
!scoutfs_key_cmp(&ment->first, &mnode->ival.start) &&
!scoutfs_key_cmp(&ment->last, &mnode->ival.end))
break;
}
if (ival) {
trace_scoutfs_delete_manifest(ment);
scoutfs_remove_ival(&mani->itree, &mnode->ival);
kfree(mnode);
}
}
void scoutfs_delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
spin_lock(&mani->lock);
mnode = unlink_mnode(mani, blkno);
if (mnode)
radix_tree_delete(&mani->blkno_radix, blkno);
delete_manifest(sb, ment);
spin_unlock(&mani->lock);
if (mnode)
kfree(mnode);
}
/*
* A newly inserted manifest can be inserted at the level
* above the first block that it intersects.
*/
static u8 insertion_level(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment)
static void insert_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment,
struct scoutfs_manifest_node *mnode)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
trace_scoutfs_insert_manifest(ment);
mnode->ival.start = ment->first;
mnode->ival.end = ment->last;
mnode->blkno = le64_to_cpu(ment->blkno);
mnode->seq = le64_to_cpu(ment->seq);
mnode->level = ment->level;
scoutfs_insert_ival(&mani->itree, &mnode->ival);
}
int scoutfs_insert_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
int i;
list_for_each_entry(mnode, &mani->level_zero, head) {
if (scoutfs_cmp_key_ranges(&ment->first, &ment->last,
&mnode->ment.first,
&mnode->ment.last) == 0)
return 0;
}
/* XXX this <= looks fishy :/ */
for (i = 1; i <= SCOUTFS_MAX_LEVEL; i++) {
mnode = find_mnode(&mani->levels[i].root, &ment->first);
if (mnode)
break;
if (mani->levels[i].count < SCOUTFS_MANIFESTS_PER_LEVEL)
return i;
}
return i - 1;
}
/*
* Insert an manifest entry into the blkno radix and either level 0 list
* or greater level rbtrees as appropriate. The new entry will replace
* any existing entry at its blkno, perhaps with different keys and
* level.
*
* The caller can ask that we find the highest level that the entry can
* be inserted into before it intersects with an existing entry. The
* caller's entry is updated with the new level so they can store it in
* the ring. Doing so here avoids extra ring churn of doing it later in
* merging.
*/
static int insert_manifest(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment,
bool find_level)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
struct scoutfs_manifest_node *found;
u64 blkno = le64_to_cpu(ment->blkno);
int ret = 0;
/* allocation/preloading should be cheap enough to always try */
mnode = kmalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS);
mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS);
if (!mnode)
return -ENOMEM; /* XXX hmm, fatal? prealloc?*/
ret = radix_tree_preload(GFP_NOFS & ~__GFP_HIGHMEM);
spin_lock(&mani->lock);
insert_manifest(sb, ment, mnode);
spin_unlock(&mani->lock);
return 0;
}
/*
* The caller has inserted a temporary manifest entry while they were
* dirtying a segment. It's done now and they want the final segment
* range stored in the manifest and logged in the ring.
*
* If this returns an error then nothing has changed.
*
* XXX we'd also need to add stale manifest entry's to the ring
* XXX In the future we'd send it to the leader
*/
int scoutfs_finalize_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *existing,
struct scoutfs_manifest_entry *updated)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
int ret;
mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS);
if (!mnode)
return -ENOMEM; /* XXX hmm, fatal? prealloc?*/
ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST,
updated,
sizeof(struct scoutfs_manifest_entry));
if (ret) {
kfree(mnode);
return ret;
}
INIT_LIST_HEAD(&mnode->head);
RB_CLEAR_NODE(&mnode->node);
spin_lock(&mani->lock);
/* reuse found to avoid radix delete/insert churn */
found = unlink_mnode(mani, blkno);
if (!found) {
radix_tree_insert(&mani->blkno_radix, blkno, mnode);
} else {
swap(found, mnode);
}
/* careful to find our level after deleting old blkno ment */
if (find_level)
ment->level = insertion_level(sb, ment);
trace_scoutfs_insert_manifest(ment);
mnode->ment = *ment;
if (ment->level) {
insert_mnode(&mani->levels[ment->level].root, mnode);
mani->levels[ment->level].count++;
} else {
list_add(&mnode->head, &mani->level_zero);
}
delete_manifest(sb, existing);
insert_manifest(sb, updated, mnode);
spin_unlock(&mani->lock);
radix_tree_preload_end();
kfree(found);
return 0;
}
/* Index an existing entry */
int scoutfs_insert_manifest(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment)
/* sorted by increasing level then decreasing seq */
static int cmp_ments(const void *A, const void *B)
{
return insert_manifest(sb, ment, false);
const struct scoutfs_manifest_entry *a = A;
const struct scoutfs_manifest_entry *b = B;
int cmp;
cmp = (int)a->level - (int)b->level;
if (cmp)
return cmp;
if (le64_to_cpu(a->seq) > le64_to_cpu(b->seq))
return -1;
if (le64_to_cpu(a->seq) < le64_to_cpu(b->seq))
return 1;
return 0;
}
static void swap_ments(void *A, void *B, int size)
{
struct scoutfs_manifest_entry *a = A;
struct scoutfs_manifest_entry *b = B;
swap(*a, *b);
}
/*
* Add an entry for a newly written segment to the indexes and record it
* in the ring. The entry can be modified by insertion.
* Give the caller an allocated array of manifest entries that intersect
* their search key. The array is sorted in the order for searching for
* the most recent item: decreasing sequence in level 0 then increasing
* levels.
*
* XXX we'd also need to add stale manifest entry's to the ring
* XXX In the future we'd send it to the leader
* The live manifest can change while the caller walks their array but
* the segments will not be reclaimed and the caller has grants that
* protect their items in the segments even if the segments shift over
* time.
*
* The number of elements in the array is returned, or negative errors,
* and the array is not allocated if 0 is returned.
*
* XXX need to actually keep the segments from being reclaimed
*/
int scoutfs_new_manifest(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment)
{
int ret;
ret = insert_manifest(sb, ment, true);
if (!ret) {
ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST,
ment, sizeof(*ment));
if (ret)
scoutfs_delete_manifest(sb, le64_to_cpu(ment->blkno));
}
return ret;
}
/*
* Fill the caller's ment with the next log segment in the manifest that
* might contain the given range. The caller initializes the ment to
* zeros to find the first log segment.
*
* This can return multiple log segments from level 0 in decreasing age.
* Then it can return at most one log segment in each level that
* intersects the given range.
*
* Returns true if an entry was found and is now described in ment,
* false when there are no more segments that contain the range.
*
* XXX could use the l0 seq to walk the list and skipb locks we've
* already seen. I'm not sure that we'll be able to keep manifest
* entries pinned while we're away blocking. We might fail to find the
* last entry's block in the radix when we return.
*/
bool scoutfs_foreach_range_segment(struct super_block *sb,
struct scoutfs_key *first,
struct scoutfs_key *last,
struct scoutfs_ring_manifest_entry *ment)
int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_manifest_entry **ments_ret)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_entry *ments;
struct scoutfs_manifest_node *mnode;
bool found = false;
struct scoutfs_ival *ival;
unsigned nr;
int i;
if (ment->level >= SCOUTFS_MAX_LEVEL)
return false;
/* make a reasonably large initial guess */
i = 16;
ments = NULL;
do {
kfree(ments);
nr = i;
ments = kmalloc(nr * sizeof(struct scoutfs_manifest_entry),
GFP_NOFS);
if (!ments)
return -ENOMEM;
spin_lock(&mani->lock);
if (ment->level == 0) {
if (ment->blkno) {
mnode = radix_tree_lookup(&mani->blkno_radix,
le64_to_cpu(ment->blkno));
mnode = list_next_entry(mnode, head);
} else {
mnode = list_first_entry(&mani->level_zero,
struct scoutfs_manifest_node,
head);
}
list_for_each_entry_from(mnode, &mani->level_zero, head) {
if (scoutfs_cmp_key_ranges(first, last,
&mnode->ment.first,
&mnode->ment.last) == 0) {
*ment = mnode->ment;
found = true;
break;
spin_lock(&mani->lock);
i = 0;
ival = NULL;
while ((ival = scoutfs_next_ival(&mani->itree, key, key,
ival))) {
if (i < nr) {
mnode = container_of(ival,
struct scoutfs_manifest_node, ival);
ments[i].blkno = cpu_to_le64(mnode->blkno);
ments[i].seq = cpu_to_le64(mnode->seq);
ments[i].level = mnode->level;
ments[i].first = ival->start;
ments[i].last = ival->end;
}
i++;
}
spin_unlock(&mani->lock);
} while (i > nr);
if (i) {
sort(ments, i, sizeof(struct scoutfs_manifest_entry),
cmp_ments, swap_ments);
} else {
kfree(ments);
ments = NULL;
}
if (!found) {
/*
* The log segments in the each level fully cover the
* key range and don't overlap. So we will always find
* a segment that matches whatever key we look for. We
* look for the start of the range because iterators are
* walk the keyspace sequentially.
*/
for (i = ment->level + 1; i <= SCOUTFS_MAX_LEVEL; i++) {
mnode = find_mnode(&mani->levels[i].root, first);
if (mnode) {
*ment = mnode->ment;
found = true;
break;
}
}
if (!found)
ment->level = SCOUTFS_MAX_LEVEL;
}
spin_unlock(&mani->lock);
return found;
*ments_ret = ments;
return i;
}
int scoutfs_setup_manifest(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani;
int i;
mani = kmalloc(sizeof(struct scoutfs_manifest), GFP_KERNEL);
mani = kzalloc(sizeof(struct scoutfs_manifest), GFP_KERNEL);
if (!mani)
return -ENOMEM;
spin_lock_init(&mani->lock);
INIT_RADIX_TREE(&mani->blkno_radix, GFP_NOFS);
INIT_LIST_HEAD(&mani->level_zero);
for (i = 0; i < ARRAY_SIZE(mani->levels); i++)
mani->levels[i].root = RB_ROOT;
scoutfs_init_ival_tree(&mani->itree);
sbi->mani = mani;
@@ -380,34 +286,21 @@ int scoutfs_setup_manifest(struct super_block *sb)
}
/*
* This is called once the manifest will no longer be used. We iterate
* over the blkno radix deleting radix entries and freeing manifest
* nodes.
* This is called once the manifest will no longer be used.
*/
void scoutfs_destroy_manifest(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnodes[16];
unsigned long first_index = 0;
int ret;
int i;
struct scoutfs_ival *ival;
struct rb_node *node;
struct rb_node tmp;
for (;;) {
ret = radix_tree_gang_lookup(&mani->blkno_radix,
(void **)mnodes, first_index,
ARRAY_SIZE(mnodes));
if (!ret)
break;
if (mani) {
foreach_postorder_ival_safe(&mani->itree, ival, node, tmp)
kfree(ival);
for (i = 0; i < ret; i++) {
first_index = le64_to_cpu(mnodes[i]->ment.blkno);
radix_tree_delete(&mani->blkno_radix, first_index);
kfree(mnodes[i]);
}
first_index++;
kfree(mani);
sbi->mani = NULL;
}
kfree(sbi->mani);
sbi->mani = NULL;
}

View File

@@ -5,14 +5,14 @@ int scoutfs_setup_manifest(struct super_block *sb);
void scoutfs_destroy_manifest(struct super_block *sb);
int scoutfs_insert_manifest(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment);
int scoutfs_new_manifest(struct super_block *sb,
struct scoutfs_ring_manifest_entry *ment);
void scoutfs_delete_manifest(struct super_block *sb, u64 blkno);
struct scoutfs_manifest_entry *ment);
void scoutfs_delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment);
int scoutfs_finalize_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *existing,
struct scoutfs_manifest_entry *updated);
bool scoutfs_foreach_range_segment(struct super_block *sb,
struct scoutfs_key *first,
struct scoutfs_key *last,
struct scoutfs_ring_manifest_entry *ment);
int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_manifest_entry **ments_ret);
#endif

View File

@@ -28,8 +28,7 @@ static int replay_ring_block(struct super_block *sb, struct buffer_head *bh)
{
struct scoutfs_ring_block *ring = (void *)bh->b_data;
struct scoutfs_ring_entry *ent = (void *)(ring + 1);
struct scoutfs_ring_manifest_entry *ment;
struct scoutfs_ring_del_manifest *del;
struct scoutfs_manifest_entry *ment;
struct scoutfs_ring_bitmap *bm;
int ret = 0;
int i;
@@ -43,8 +42,8 @@ static int replay_ring_block(struct super_block *sb, struct buffer_head *bh)
ret = scoutfs_insert_manifest(sb, ment);
break;
case SCOUTFS_RING_DEL_MANIFEST:
del = (void *)(ent + 1);
scoutfs_delete_manifest(sb, le64_to_cpu(del->blkno));
ment = (void *)(ent + 1);
scoutfs_delete_manifest(sb, ment);
break;
case SCOUTFS_RING_BITMAP:
bm = (void *)(ent + 1);

View File

@@ -25,6 +25,7 @@
#include <linux/tracepoint.h>
#include "key.h"
#include "format.h"
TRACE_EVENT(scoutfs_bloom_hit,
TP_PROTO(struct scoutfs_key *key),
@@ -189,7 +190,7 @@ TRACE_EVENT(scoutfs_write_super,
);
TRACE_EVENT(scoutfs_insert_manifest,
TP_PROTO(struct scoutfs_ring_manifest_entry *ment),
TP_PROTO(struct scoutfs_manifest_entry *ment),
TP_ARGS(ment),
@@ -225,7 +226,7 @@ TRACE_EVENT(scoutfs_insert_manifest,
);
TRACE_EVENT(scoutfs_delete_manifest,
TP_PROTO(struct scoutfs_ring_manifest_entry *ment),
TP_PROTO(struct scoutfs_manifest_entry *ment),
TP_ARGS(ment),

View File

@@ -66,7 +66,7 @@ struct scoutfs_item_iter {
struct buffer_head *bh;
struct scoutfs_item *item;
u64 blkno;
bool restart_after;
struct scoutfs_key after_seg;
};
void scoutfs_put_iter_list(struct list_head *list)
@@ -138,8 +138,6 @@ static bool try_lock_dirty_mutex(struct super_block *sb, u64 blkno)
* then search through the item keys. The first matching item we find
* is returned.
*
* XXX lock the dirty log segment?
*
* -ENOENT is returned if the item isn't present. The caller needs to put
* the ref if we return success.
*/
@@ -147,12 +145,15 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_ring_manifest_entry ment;
struct scoutfs_item *item = NULL;
struct scoutfs_bloom_bits bits;
struct scoutfs_manifest_entry *ments;
struct buffer_head *bh;
bool locked;
u64 blkno;
int ret;
int nr;
int i;
/* XXX hold manifest */
@@ -160,13 +161,19 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
item = NULL;
ret = -ENOENT;
memset(&ment, 0, sizeof(struct scoutfs_ring_manifest_entry));
while (scoutfs_foreach_range_segment(sb, key, key, &ment)) {
nr = scoutfs_manifest_find_key(sb, key, &ments);
if (nr < 0)
return nr;
if (nr == 0)
return -ENOENT;
for (i = 0; i < nr; i++) {
/* XXX read-ahead all bloom blocks */
blkno = le64_to_cpu(ments[i].blkno);
/* XXX verify seqs */
ret = scoutfs_test_bloom_bits(sb, le64_to_cpu(ment.blkno),
key, &bits);
ret = scoutfs_test_bloom_bits(sb, blkno, key, &bits);
if (ret < 0)
break;
if (!ret) {
@@ -176,9 +183,8 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
/* XXX read-ahead all item header blocks */
locked = try_lock_dirty_mutex(sb, le64_to_cpu(ment.blkno));
ret = scoutfs_skip_lookup(sb, le64_to_cpu(ment.blkno), key,
&bh, &item);
locked = try_lock_dirty_mutex(sb, blkno);
ret = scoutfs_skip_lookup(sb, blkno, key, &bh, &item);
if (locked)
mutex_unlock(&sbi->dirty_mutex);
if (ret) {
@@ -189,12 +195,14 @@ int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
break;
}
kfree(ments);
/* XXX release manifest */
/* XXX read-ahead all value blocks? */
if (!ret) {
ret = populate_ref(sb, le64_to_cpu(ment.blkno), bh, item, ref);
ret = populate_ref(sb, blkno, bh, item, ref);
brelse(bh);
}
@@ -312,49 +320,49 @@ static int start_dirty_segment(struct super_block *sb, u64 blkno)
}
/*
* As we fill a dirty segment we don't know which keys it's going to
* contain. We add a manifest entry in memory that has it contain all
* items so that reading will know to search the dirty segment.
* As we start to fill a dirty segment we don't know which keys it's
* going to contain. We add a manifest entry in memory that has it
* contain all items so that reading will know to search the dirty
* segment.
*
* Once it's finalized we know the specific range of items it contains
* and we update the manifest entry in memory for that range and write
* that to the ring.
*
* Inserting the updated segment can fail. If we deleted the segment,
* then insertion failed, then reinserting the original entry could fail.
* Instead we briefly allow two manifest entries for the same segment.
*/
static int update_dirty_segment_manifest(struct super_block *sb, u64 blkno,
bool all_items)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_ring_manifest_entry ment;
struct scoutfs_manifest_entry ment;
struct scoutfs_manifest_entry updated;
struct scoutfs_item_block *iblk;
struct buffer_head *bh;
int ret;
ment.blkno = cpu_to_le64(blkno);
ment.seq = sbi->super.hdr.seq;
ment.level = 0;
if (all_items) {
memset(&ment.first, 0, sizeof(struct scoutfs_key));
memset(&ment.last, ~0, sizeof(struct scoutfs_key));
} else {
bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS);
if (!bh) {
ret = -EIO;
goto out;
}
iblk = (void *)bh->b_data;
ment.first = iblk->first;
ment.last = iblk->last;
brelse(bh);
}
memset(&ment.first, 0, sizeof(struct scoutfs_key));
memset(&ment.last, ~0, sizeof(struct scoutfs_key));
if (all_items)
ret = scoutfs_insert_manifest(sb, &ment);
else
ret = scoutfs_new_manifest(sb, &ment);
out:
return ret;
return scoutfs_insert_manifest(sb, &ment);
bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS);
if (!bh)
return -EIO;
updated = ment;
iblk = (void *)bh->b_data;
updated.first = iblk->first;
updated.last = iblk->last;
brelse(bh);
return scoutfs_finalize_manifest(sb, &ment, &updated);
}
/*
@@ -670,41 +678,40 @@ int scoutfs_delete_item(struct super_block *sb, struct scoutfs_item_ref *ref)
*
* We put the segment references and iteration cursors in a list in the
* caller so that they can find many next items by advancing the cursors
* without having to walk the manifest and perform initial binary
* without having to walk the manifest and perform initial skip list
* searches in each segment.
*
* The caller is responsible for putting the item ref if we return
* success. -ENOENT is returned if there are no more items in the
* search range.
*
* XXX this is wonky. We don't want to search the manifest for the
* range, just the initial value. Then we record the last key in
* segments we finish and only restart if least is > that or there are
* no least. We have to advance the first key when restarting the
* search.
*/
int scoutfs_next_item(struct super_block *sb, struct scoutfs_key *first,
struct scoutfs_key *last, struct list_head *iter_list,
struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_ring_manifest_entry ment;
struct scoutfs_manifest_entry *ments = NULL;
struct scoutfs_key key = *first;
struct scoutfs_key least_hole;
struct scoutfs_item_iter *least;
struct scoutfs_item_iter *iter;
struct scoutfs_item_iter *pos;
bool locked;
int ret;
int nr;
int i;
restart:
if (list_empty(iter_list)) {
/* find all the segments that may contain the key */
ret = scoutfs_manifest_find_key(sb, &key, &ments);
if (ret == 0)
ret = -ENOENT;
if (ret < 0)
goto out;
nr = ret;
/*
* Find all the segments that intersect the search range
* and find the next item in the block from the start
* of the range.
*/
memset(&ment, 0, sizeof(struct scoutfs_ring_manifest_entry));
while (scoutfs_foreach_range_segment(sb, first, last, &ment)) {
for (i = 0; i < nr; i++) {
iter = kzalloc(sizeof(struct scoutfs_item_iter),
GFP_NOFS);
if (!iter) {
@@ -712,38 +719,32 @@ restart:
goto out;
}
/*
* We will restart the walk of the manifest blocks if
* we iterate over all the items in this block without
* exhausting the search range.
*/
if (ment.level > 0 &&
scoutfs_key_cmp(&ment.last, last) < 0)
iter->restart_after = true;
iter->blkno = le64_to_cpu(ment.blkno);
iter->blkno = le64_to_cpu(ments[i].blkno);
iter->after_seg = ments[i].last;
scoutfs_inc_key(&iter->after_seg);
list_add_tail(&iter->list, iter_list);
}
if (list_empty(iter_list)) {
ret = -ENOENT;
goto out;
}
kfree(ments);
ments = NULL;
}
memset(&least_hole, ~0, sizeof(least_hole));
least = NULL;
ret = 0;
list_for_each_entry_safe(iter, pos, iter_list, list) {
locked = try_lock_dirty_mutex(sb, iter->blkno);
/* search towards the first key if we haven't yet */
/* search towards the key if we haven't yet */
if (!iter->item) {
ret = scoutfs_skip_search(sb, iter->blkno, first,
ret = scoutfs_skip_search(sb, iter->blkno, &key,
&iter->bh, &iter->item);
} else {
ret = 0;
}
/* then iterate until we find or pass the first key */
while (!ret && scoutfs_key_cmp(&iter->item->key, first) < 0) {
/* then iterate until we find or pass the key */
while (!ret && scoutfs_key_cmp(&iter->item->key, &key) < 0) {
ret = scoutfs_skip_next(sb, iter->blkno,
&iter->bh, &iter->item);
}
@@ -751,44 +752,54 @@ restart:
if (locked)
mutex_unlock(&sbi->dirty_mutex);
/* we're done with this block if we past the last key */
while (!ret && scoutfs_key_cmp(&iter->item->key, last) > 0) {
/* we're done with this segment if it has an item after last */
if (!ret && scoutfs_key_cmp(&iter->item->key, last) > 0) {
list_del_init(&iter->list);
brelse(iter->bh);
iter->bh = NULL;
iter->item = NULL;
ret = -ENOENT;
kfree(iter);
continue;
}
/*
* If we run out of keys in the segment then we don't know
* the state of keys after this segment in this level. If
* the hole after the segment is still inside the search
* range then we might need to search it for the next
* item if the least item of the remaining blocks is
* greater than the hole.
*/
if (ret == -ENOENT) {
if (iter->restart_after) {
/* need next block at this level */
scoutfs_put_iter_list(iter_list);
goto restart;
} else {
/* this level is done */
list_del_init(&iter->list);
brelse(iter->bh);
kfree(iter);
continue;
}
}
if (ret)
goto out;
if (scoutfs_key_cmp(&iter->after_seg, last) <= 0 &&
scoutfs_key_cmp(&iter->after_seg, &least_hole) < 0)
least_hole = iter->after_seg;
/* remember the most recent smallest key from the first */
list_del_init(&iter->list);
brelse(iter->bh);
kfree(iter);
continue;
}
/* remember the most recent smallest key */
if (!least ||
scoutfs_key_cmp(&iter->item->key, &least->item->key) < 0)
least = iter;
}
/* if we had a gap before the least then we need a new search */
if (least && scoutfs_key_cmp(&least_hole, &least->item->key) < 0) {
scoutfs_put_iter_list(iter_list);
key = least_hole;
goto restart;
}
if (least)
ret = populate_ref(sb, least->blkno, least->bh, least->item,
ref);
else
ret = -ENOENT;
out:
kfree(ments);
if (ret)
scoutfs_put_iter_list(iter_list);
return ret;
}