scoutfs: add scoutfs_item_prev()

Add scoutfs_item_prev() for searching for an item before a given key.

This wasn't initially implemented because it's rarely needed and for a
long time the segment reading and item cache populating code had a
strong bias for iterating forward from the given search key.

Since we've added limiting item cache reading to the keys covered by
locks and reading in entire segments it's now very easy to iterate
backwards through keys just like scoutfs_item_next() iterates forwards.

The only remaining forward iteration bias was in check_range().  It had
to give callers the start of the cached range that it found.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2018-06-20 15:31:24 -07:00
committed by Zach Brown
parent 600ecd9fad
commit d53ec115bc
3 changed files with 255 additions and 29 deletions

View File

@@ -242,6 +242,15 @@ static struct cached_item *next_item(struct rb_root *root,
return walk_items(root, key, &prev, &next) ?: next;
}
static struct cached_item *prev_item(struct rb_root *root,
struct scoutfs_key *key)
{
struct cached_item *prev;
struct cached_item *next;
return walk_items(root, key, &prev, &next) ?: prev;
}
/*
* We store the dirty bits in a single value so that the simple
* augmented rbtree implementation gets a single scalar value to compare
@@ -581,36 +590,35 @@ static struct cached_range *walk_ranges(struct rb_root *root,
}
/*
* Return true if the given key is covered by a cached range. end is
* set to the end of the cached range.
* Return true if the given key is covered by a cached range. start and
* end are set to the existing cached range.
*
* Return false if the given key isn't covered by a cached range and is
* instead in an uncached hole. end is set to the start of the next
* cached range.
* Return false if the key is not covered by a range. start and end are
* set to zero. (Nothing uses these today, this is to avoid tracing
* uninitialized keys in this case.)
*/
static bool check_range(struct super_block *sb, struct rb_root *root,
struct scoutfs_key *key,
struct scoutfs_key *key, struct scoutfs_key *start,
struct scoutfs_key *end)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct item_cache *cac = sbi->item_cache;
struct cached_range *next;
struct cached_range *rng;
rng = walk_ranges(&cac->ranges, key, NULL, &next);
rng = walk_ranges(&cac->ranges, key, NULL, NULL);
if (rng) {
scoutfs_inc_counter(sb, item_range_hit);
if (start)
*start = rng->start;
if (end)
*end = rng->end;
return true;
}
if (end) {
if (next)
*end = next->start;
else
scoutfs_key_set_ones(end);
}
if (start)
scoutfs_key_set_zeros(start);
if (end)
scoutfs_key_set_zeros(end);
scoutfs_inc_counter(sb, item_range_miss);
return false;
@@ -823,7 +831,7 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key,
ret = copy_item_val(val, item);
else
ret = 0;
} else if (check_range(sb, &cac->ranges, key, NULL)) {
} else if (check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENOENT;
} else {
ret = -ENODATA;
@@ -929,14 +937,14 @@ static struct cached_item *item_for_next(struct rb_root *root,
* Return the next item starting with the given key and returning the
* last key at most.
*
* If the end key is specified then it limits items that can be read
* into the cache. If it's less than the last key then it also limits
* iteration. These are different values because locking granularity
* can be smaller or larger than the iteration. Callers shouldn't have
* to be aware of that relationship.
* The range covered by the lock also limits the last item that can be
* returned. -ENOENT can be returned when there are no next items
* covered by the lock but there are still items before the last key
* outside of the lock. The caller needs to know to reacquire the next
* lock to continue iteration.
*
* -ENOENT is returned if there are no items between the given and
* last/end keys.
* -ENOENT is returned if there are no items between the given and last
* keys inside the range covered by the lock.
*
* The next item's key is copied to the caller's key. The caller is
* responsible for dealing with key lengths and truncation.
@@ -979,7 +987,7 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key,
for(;;) {
/* see if we have cache coverage of our iterator pos */
cached = check_range(sb, &cac->ranges, &pos, &range_end);
cached = check_range(sb, &cac->ranges, &pos, NULL, &range_end);
trace_scoutfs_item_next_range_check(sb, !!cached, key,
&pos, last, &lock->end,
@@ -1033,6 +1041,173 @@ out:
return ret;
}
/*
* Return the prev linked node in the tree that isn't a deletion item
* and which is still within the first allowed key value.
*/
static struct cached_item *prev_item_node(struct rb_root *root,
struct cached_item *item,
struct scoutfs_key *first)
{
struct rb_node *node;
while (item) {
node = rb_prev(&item->node);
if (!node) {
item = NULL;
break;
}
item = container_of(node, struct cached_item, node);
if (scoutfs_key_compare(&item->key, first) < 0) {
item = NULL;
break;
}
if (!item->deletion)
break;
}
return item;
}
/*
* Find the prev item to return from the "_prev" item interface. It's the
* prev item from the key that isn't a deletion item and is within the
* bounds of the start of the cache and the caller's first key.
*/
static struct cached_item *item_for_prev(struct rb_root *root,
struct scoutfs_key *key,
struct scoutfs_key *range_start,
struct scoutfs_key *first)
{
struct cached_item *item;
/* limit by the greater of the two */
if (range_start && scoutfs_key_compare(range_start, first) > 0)
first = range_start;
item = prev_item(root, key);
if (item) {
if (scoutfs_key_compare(&item->key, first) < 0)
item = NULL;
else if (item->deletion)
item = prev_item_node(root, item, first);
}
return item;
}
/*
* Return the prev item starting with the given key and returning the
* first key at least.
*
* The range covered by the lock also limits the first item that can be
* returned. -ENOENT can be returned when there are no prev items
* covered by the lock but there are still items after the first key
* outside of the lock. The caller needs to know to reacquire the next
* lock to continue iteration.
*
* -ENOENT is returned if there are no items between the given and
* first key inside the range covered by the lock.
*
* The prev item's key is copied to the caller's key. The caller is
* responsible for dealing with key lengths and truncation.
*
* The prev item's value is copied into the callers value. The number
* of value bytes copied is returned. The copied value can be truncated
* by the caller's value buffer length.
*/
int scoutfs_item_prev(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_key *first, struct kvec *val,
struct scoutfs_lock *lock)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct item_cache *cac = sbi->item_cache;
struct scoutfs_key range_start;
struct scoutfs_key pos;
struct cached_item *item;
unsigned long flags;
bool cached;
int ret;
/* use the start key as the first key if it's closer */
if (scoutfs_key_compare(&lock->start, first) > 0)
first = &lock->start;
/* convenience to avoid searching if caller iterates past their last */
if (scoutfs_key_compare(key, first) < 0) {
ret = -ENOENT;
goto out;
}
if (WARN_ON_ONCE(!lock_coverage(lock, key, DLM_LOCK_PR))) {
ret = -EINVAL;
goto out;
}
pos = *key;
spin_lock_irqsave(&cac->lock, flags);
for(;;) {
/* see if we have cache coverage of our iterator pos */
cached = check_range(sb, &cac->ranges, &pos,
&range_start, NULL);
trace_scoutfs_item_prev_range_check(sb, !!cached, key,
&pos, first, &lock->start,
&range_start);
if (!cached) {
/* populate missing cached range starting at pos */
spin_unlock_irqrestore(&cac->lock, flags);
ret = scoutfs_manifest_read_items(sb, &pos,
&lock->start,
&lock->end);
spin_lock_irqsave(&cac->lock, flags);
if (ret)
break;
else
continue;
}
/* see if there's an item in the cached range from pos */
item = item_for_prev(&cac->items, &pos, &range_start, first);
if (!item) {
if (scoutfs_key_compare(&range_start, first) > 0) {
/* keep searching before empty cached range */
pos = range_start;
scoutfs_key_dec(&pos);
continue;
}
/* no item and cache covers first, done */
ret = -ENOENT;
break;
}
/* we have a prev item inside the cached range, done */
*key = item->key;
if (val) {
item_referenced(cac, item);
ret = copy_item_val(val, item);
} else {
ret = 0;
}
break;
}
spin_unlock_irqrestore(&cac->lock, flags);
out:
trace_scoutfs_item_prev_ret(sb, ret);
return ret;
}
/*
* Create a new dirty item in the cache. Returns -EEXIST if an item
* already exists with the given key.
@@ -1061,7 +1236,7 @@ int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key,
do {
spin_lock_irqsave(&cac->lock, flags);
if (!check_range(sb, &cac->ranges, key, NULL)) {
if (!check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENODATA;
} else {
ret = insert_item(sb, cac, item, false, false);
@@ -1263,7 +1438,7 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key,
if (item) {
mark_item_dirty(sb, cac, item);
ret = 0;
} else if (check_range(sb, &cac->ranges, key, NULL)) {
} else if (check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENOENT;
} else {
ret = -ENODATA;
@@ -1320,7 +1495,7 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key,
item->val_len = val ? val->iov_len : 0;
mark_item_dirty(sb, cac, item);
ret = 0;
} else if (check_range(sb, &cac->ranges, key, NULL)) {
} else if (check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENOENT;
} else {
ret = -ENODATA;
@@ -1371,7 +1546,7 @@ int scoutfs_item_delete(struct super_block *sb, struct scoutfs_key *key,
if (item) {
delete_item(sb, cac, item);
ret = 0;
} else if (check_range(sb, &cac->ranges, key, NULL)) {
} else if (check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENOENT;
} else {
ret = -ENODATA;
@@ -1483,7 +1658,7 @@ int scoutfs_item_delete_save(struct super_block *sb,
delete_item(sb, cac, del);
del = NULL;
ret = 0;
} else if (check_range(sb, &cac->ranges, key, NULL)) {
} else if (check_range(sb, &cac->ranges, key, NULL, NULL)) {
ret = -ENOENT;
} else {
ret = -ENODATA;
@@ -1532,7 +1707,7 @@ int scoutfs_item_restore(struct super_block *sb, struct list_head *list,
mode = item_is_dirty(item) ? DLM_LOCK_EX : DLM_LOCK_PR;
if (WARN_ON_ONCE(!lock_coverage(lock, &item->key, mode)) ||
WARN_ON_ONCE(!check_range(sb, &cac->ranges, &item->key,
NULL))) {
NULL, NULL))) {
ret = -EINVAL;
goto out;
}

View File

@@ -14,6 +14,9 @@ int scoutfs_item_lookup_exact(struct super_block *sb,
int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_key *last, struct kvec *val,
struct scoutfs_lock *lock);
int scoutfs_item_prev(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_key *first, struct kvec *val,
struct scoutfs_lock *lock);
int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key,
struct kvec *val, struct scoutfs_lock *lock);
int scoutfs_item_create_force(struct super_block *sb,

View File

@@ -352,6 +352,24 @@ TRACE_EVENT(scoutfs_item_next_ret,
TP_printk(FSID_FMT" ret %d", __entry->fsid, __entry->ret)
);
TRACE_EVENT(scoutfs_item_prev_ret,
TP_PROTO(struct super_block *sb, int ret),
TP_ARGS(sb, ret),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(int, ret)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->ret = ret;
),
TP_printk("fsid "FSID_FMT" ret %d", __entry->fsid, __entry->ret)
);
TRACE_EVENT(scoutfs_erase_item,
TP_PROTO(struct super_block *sb, void *item),
@@ -1865,6 +1883,36 @@ TRACE_EVENT(scoutfs_item_next_range_check,
SK_ARG(&__entry->end), SK_ARG(&__entry->range_end))
);
TRACE_EVENT(scoutfs_item_prev_range_check,
TP_PROTO(struct super_block *sb, int cached,
struct scoutfs_key *key, struct scoutfs_key *pos,
struct scoutfs_key *first, struct scoutfs_key *start,
struct scoutfs_key *range_start),
TP_ARGS(sb, cached, key, pos, first, start, range_start),
TP_STRUCT__entry(
__field(void *, sb)
__field(int, cached)
__field_struct(struct scoutfs_key, key)
__field_struct(struct scoutfs_key, pos)
__field_struct(struct scoutfs_key, first)
__field_struct(struct scoutfs_key, start)
__field_struct(struct scoutfs_key, range_start)
),
TP_fast_assign(
__entry->sb = sb;
__entry->cached = cached;
scoutfs_key_copy_or_zeros(&__entry->key, key);
scoutfs_key_copy_or_zeros(&__entry->pos, pos);
scoutfs_key_copy_or_zeros(&__entry->first, first);
scoutfs_key_copy_or_zeros(&__entry->start, start);
scoutfs_key_copy_or_zeros(&__entry->range_start, range_start);
),
TP_printk("sb %p cached %d key "SK_FMT" pos "SK_FMT" first "SK_FMT" start "SK_FMT" range_start "SK_FMT,
__entry->sb, __entry->cached, SK_ARG(&__entry->key),
SK_ARG(&__entry->pos), SK_ARG(&__entry->first),
SK_ARG(&__entry->start), SK_ARG(&__entry->range_start))
);
DECLARE_EVENT_CLASS(scoutfs_shrink_exit_class,
TP_PROTO(struct super_block *sb, unsigned long nr_to_scan, int ret),
TP_ARGS(sb, nr_to_scan, ret),