diff --git a/kmod/src/item.c b/kmod/src/item.c index 15f4d326..60c461fb 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -242,6 +242,15 @@ static struct cached_item *next_item(struct rb_root *root, return walk_items(root, key, &prev, &next) ?: next; } +static struct cached_item *prev_item(struct rb_root *root, + struct scoutfs_key *key) +{ + struct cached_item *prev; + struct cached_item *next; + + return walk_items(root, key, &prev, &next) ?: prev; +} + /* * We store the dirty bits in a single value so that the simple * augmented rbtree implementation gets a single scalar value to compare @@ -581,36 +590,35 @@ static struct cached_range *walk_ranges(struct rb_root *root, } /* - * Return true if the given key is covered by a cached range. end is - * set to the end of the cached range. + * Return true if the given key is covered by a cached range. start and + * end are set to the existing cached range. * - * Return false if the given key isn't covered by a cached range and is - * instead in an uncached hole. end is set to the start of the next - * cached range. + * Return false if the key is not covered by a range. start and end are + * set to zero. (Nothing uses these today, this is to avoid tracing + * uninitialized keys in this case.) */ static bool check_range(struct super_block *sb, struct rb_root *root, - struct scoutfs_key *key, + struct scoutfs_key *key, struct scoutfs_key *start, struct scoutfs_key *end) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; - struct cached_range *next; struct cached_range *rng; - rng = walk_ranges(&cac->ranges, key, NULL, &next); + rng = walk_ranges(&cac->ranges, key, NULL, NULL); if (rng) { scoutfs_inc_counter(sb, item_range_hit); + if (start) + *start = rng->start; if (end) *end = rng->end; return true; } - if (end) { - if (next) - *end = next->start; - else - scoutfs_key_set_ones(end); - } + if (start) + scoutfs_key_set_zeros(start); + if (end) + scoutfs_key_set_zeros(end); scoutfs_inc_counter(sb, item_range_miss); return false; @@ -823,7 +831,7 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key, ret = copy_item_val(val, item); else ret = 0; - } else if (check_range(sb, &cac->ranges, key, NULL)) { + } else if (check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENOENT; } else { ret = -ENODATA; @@ -929,14 +937,14 @@ static struct cached_item *item_for_next(struct rb_root *root, * Return the next item starting with the given key and returning the * last key at most. * - * If the end key is specified then it limits items that can be read - * into the cache. If it's less than the last key then it also limits - * iteration. These are different values because locking granularity - * can be smaller or larger than the iteration. Callers shouldn't have - * to be aware of that relationship. + * The range covered by the lock also limits the last item that can be + * returned. -ENOENT can be returned when there are no next items + * covered by the lock but there are still items before the last key + * outside of the lock. The caller needs to know to reacquire the next + * lock to continue iteration. * - * -ENOENT is returned if there are no items between the given and - * last/end keys. + * -ENOENT is returned if there are no items between the given and last + * keys inside the range covered by the lock. * * The next item's key is copied to the caller's key. The caller is * responsible for dealing with key lengths and truncation. @@ -979,7 +987,7 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, for(;;) { /* see if we have cache coverage of our iterator pos */ - cached = check_range(sb, &cac->ranges, &pos, &range_end); + cached = check_range(sb, &cac->ranges, &pos, NULL, &range_end); trace_scoutfs_item_next_range_check(sb, !!cached, key, &pos, last, &lock->end, @@ -1033,6 +1041,173 @@ out: return ret; } +/* + * Return the prev linked node in the tree that isn't a deletion item + * and which is still within the first allowed key value. + */ +static struct cached_item *prev_item_node(struct rb_root *root, + struct cached_item *item, + struct scoutfs_key *first) +{ + struct rb_node *node; + + while (item) { + node = rb_prev(&item->node); + if (!node) { + item = NULL; + break; + } + + item = container_of(node, struct cached_item, node); + + if (scoutfs_key_compare(&item->key, first) < 0) { + item = NULL; + break; + } + + if (!item->deletion) + break; + } + + return item; +} + +/* + * Find the prev item to return from the "_prev" item interface. It's the + * prev item from the key that isn't a deletion item and is within the + * bounds of the start of the cache and the caller's first key. + */ +static struct cached_item *item_for_prev(struct rb_root *root, + struct scoutfs_key *key, + struct scoutfs_key *range_start, + struct scoutfs_key *first) +{ + struct cached_item *item; + + /* limit by the greater of the two */ + if (range_start && scoutfs_key_compare(range_start, first) > 0) + first = range_start; + + item = prev_item(root, key); + if (item) { + if (scoutfs_key_compare(&item->key, first) < 0) + item = NULL; + else if (item->deletion) + item = prev_item_node(root, item, first); + } + + return item; +} + +/* + * Return the prev item starting with the given key and returning the + * first key at least. + * + * The range covered by the lock also limits the first item that can be + * returned. -ENOENT can be returned when there are no prev items + * covered by the lock but there are still items after the first key + * outside of the lock. The caller needs to know to reacquire the next + * lock to continue iteration. + * + * -ENOENT is returned if there are no items between the given and + * first key inside the range covered by the lock. + * + * The prev item's key is copied to the caller's key. The caller is + * responsible for dealing with key lengths and truncation. + * + * The prev item's value is copied into the callers value. The number + * of value bytes copied is returned. The copied value can be truncated + * by the caller's value buffer length. + */ +int scoutfs_item_prev(struct super_block *sb, struct scoutfs_key *key, + struct scoutfs_key *first, struct kvec *val, + struct scoutfs_lock *lock) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct scoutfs_key range_start; + struct scoutfs_key pos; + struct cached_item *item; + unsigned long flags; + bool cached; + int ret; + + /* use the start key as the first key if it's closer */ + if (scoutfs_key_compare(&lock->start, first) > 0) + first = &lock->start; + + /* convenience to avoid searching if caller iterates past their last */ + if (scoutfs_key_compare(key, first) < 0) { + ret = -ENOENT; + goto out; + } + + if (WARN_ON_ONCE(!lock_coverage(lock, key, DLM_LOCK_PR))) { + ret = -EINVAL; + goto out; + } + + pos = *key; + + spin_lock_irqsave(&cac->lock, flags); + + for(;;) { + /* see if we have cache coverage of our iterator pos */ + cached = check_range(sb, &cac->ranges, &pos, + &range_start, NULL); + + trace_scoutfs_item_prev_range_check(sb, !!cached, key, + &pos, first, &lock->start, + &range_start); + + if (!cached) { + /* populate missing cached range starting at pos */ + spin_unlock_irqrestore(&cac->lock, flags); + + ret = scoutfs_manifest_read_items(sb, &pos, + &lock->start, + &lock->end); + + spin_lock_irqsave(&cac->lock, flags); + if (ret) + break; + else + continue; + } + + /* see if there's an item in the cached range from pos */ + item = item_for_prev(&cac->items, &pos, &range_start, first); + if (!item) { + if (scoutfs_key_compare(&range_start, first) > 0) { + /* keep searching before empty cached range */ + pos = range_start; + scoutfs_key_dec(&pos); + continue; + } + + /* no item and cache covers first, done */ + ret = -ENOENT; + break; + } + + /* we have a prev item inside the cached range, done */ + *key = item->key; + if (val) { + item_referenced(cac, item); + ret = copy_item_val(val, item); + } else { + ret = 0; + } + break; + } + + spin_unlock_irqrestore(&cac->lock, flags); +out: + + trace_scoutfs_item_prev_ret(sb, ret); + return ret; +} + /* * Create a new dirty item in the cache. Returns -EEXIST if an item * already exists with the given key. @@ -1061,7 +1236,7 @@ int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key, do { spin_lock_irqsave(&cac->lock, flags); - if (!check_range(sb, &cac->ranges, key, NULL)) { + if (!check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENODATA; } else { ret = insert_item(sb, cac, item, false, false); @@ -1263,7 +1438,7 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key, if (item) { mark_item_dirty(sb, cac, item); ret = 0; - } else if (check_range(sb, &cac->ranges, key, NULL)) { + } else if (check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENOENT; } else { ret = -ENODATA; @@ -1320,7 +1495,7 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key, item->val_len = val ? val->iov_len : 0; mark_item_dirty(sb, cac, item); ret = 0; - } else if (check_range(sb, &cac->ranges, key, NULL)) { + } else if (check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENOENT; } else { ret = -ENODATA; @@ -1371,7 +1546,7 @@ int scoutfs_item_delete(struct super_block *sb, struct scoutfs_key *key, if (item) { delete_item(sb, cac, item); ret = 0; - } else if (check_range(sb, &cac->ranges, key, NULL)) { + } else if (check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENOENT; } else { ret = -ENODATA; @@ -1483,7 +1658,7 @@ int scoutfs_item_delete_save(struct super_block *sb, delete_item(sb, cac, del); del = NULL; ret = 0; - } else if (check_range(sb, &cac->ranges, key, NULL)) { + } else if (check_range(sb, &cac->ranges, key, NULL, NULL)) { ret = -ENOENT; } else { ret = -ENODATA; @@ -1532,7 +1707,7 @@ int scoutfs_item_restore(struct super_block *sb, struct list_head *list, mode = item_is_dirty(item) ? DLM_LOCK_EX : DLM_LOCK_PR; if (WARN_ON_ONCE(!lock_coverage(lock, &item->key, mode)) || WARN_ON_ONCE(!check_range(sb, &cac->ranges, &item->key, - NULL))) { + NULL, NULL))) { ret = -EINVAL; goto out; } diff --git a/kmod/src/item.h b/kmod/src/item.h index 328345b9..c281b895 100644 --- a/kmod/src/item.h +++ b/kmod/src/item.h @@ -14,6 +14,9 @@ int scoutfs_item_lookup_exact(struct super_block *sb, int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_key *last, struct kvec *val, struct scoutfs_lock *lock); +int scoutfs_item_prev(struct super_block *sb, struct scoutfs_key *key, + struct scoutfs_key *first, struct kvec *val, + struct scoutfs_lock *lock); int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key, struct kvec *val, struct scoutfs_lock *lock); int scoutfs_item_create_force(struct super_block *sb, diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 9eb5a75e..b665dbab 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -352,6 +352,24 @@ TRACE_EVENT(scoutfs_item_next_ret, TP_printk(FSID_FMT" ret %d", __entry->fsid, __entry->ret) ); +TRACE_EVENT(scoutfs_item_prev_ret, + TP_PROTO(struct super_block *sb, int ret), + + TP_ARGS(sb, ret), + + TP_STRUCT__entry( + __field(__u64, fsid) + __field(int, ret) + ), + + TP_fast_assign( + __entry->fsid = FSID_ARG(sb); + __entry->ret = ret; + ), + + TP_printk("fsid "FSID_FMT" ret %d", __entry->fsid, __entry->ret) +); + TRACE_EVENT(scoutfs_erase_item, TP_PROTO(struct super_block *sb, void *item), @@ -1865,6 +1883,36 @@ TRACE_EVENT(scoutfs_item_next_range_check, SK_ARG(&__entry->end), SK_ARG(&__entry->range_end)) ); +TRACE_EVENT(scoutfs_item_prev_range_check, + TP_PROTO(struct super_block *sb, int cached, + struct scoutfs_key *key, struct scoutfs_key *pos, + struct scoutfs_key *first, struct scoutfs_key *start, + struct scoutfs_key *range_start), + TP_ARGS(sb, cached, key, pos, first, start, range_start), + TP_STRUCT__entry( + __field(void *, sb) + __field(int, cached) + __field_struct(struct scoutfs_key, key) + __field_struct(struct scoutfs_key, pos) + __field_struct(struct scoutfs_key, first) + __field_struct(struct scoutfs_key, start) + __field_struct(struct scoutfs_key, range_start) + ), + TP_fast_assign( + __entry->sb = sb; + __entry->cached = cached; + scoutfs_key_copy_or_zeros(&__entry->key, key); + scoutfs_key_copy_or_zeros(&__entry->pos, pos); + scoutfs_key_copy_or_zeros(&__entry->first, first); + scoutfs_key_copy_or_zeros(&__entry->start, start); + scoutfs_key_copy_or_zeros(&__entry->range_start, range_start); + ), + TP_printk("sb %p cached %d key "SK_FMT" pos "SK_FMT" first "SK_FMT" start "SK_FMT" range_start "SK_FMT, + __entry->sb, __entry->cached, SK_ARG(&__entry->key), + SK_ARG(&__entry->pos), SK_ARG(&__entry->first), + SK_ARG(&__entry->start), SK_ARG(&__entry->range_start)) +); + DECLARE_EVENT_CLASS(scoutfs_shrink_exit_class, TP_PROTO(struct super_block *sb, unsigned long nr_to_scan, int ret), TP_ARGS(sb, nr_to_scan, ret),