diff --git a/kmod/src/item.c b/kmod/src/item.c index ca9a555d..86a7e1a5 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -22,10 +22,22 @@ #include "manifest.h" #include "item.h" #include "seg.h" +#include "scoutfs_trace.h" + +/* + * A simple rbtree of cached items isolates the item API callers from + * the relatively expensive segment searches. + * + * The item cache uses an rbtree of key ranges to record regions of keys + * that are completely described by the items. This lets it return + * negative lookups cache hits for items that don't exist without having + * to constantly perform expensive segment searches. + */ struct item_cache { spinlock_t lock; - struct rb_root root; + struct rb_root items; + struct rb_root ranges; long nr_dirty_items; long dirty_key_bytes; @@ -35,38 +47,77 @@ struct item_cache { /* * The dirty bits track if the given item is dirty and if its child * subtrees contain any dirty items. + * + * The entry is only used when the items are in a private batch list + * before insertion. */ struct cached_item { - struct rb_node node; + union { + struct rb_node node; + struct list_head entry; + }; long dirty; SCOUTFS_DECLARE_KVEC(key); SCOUTFS_DECLARE_KVEC(val); }; -static struct cached_item *find_item(struct rb_root *root, struct kvec *key) +struct cached_range { + struct rb_node node; + + SCOUTFS_DECLARE_KVEC(start); + SCOUTFS_DECLARE_KVEC(end); +}; + +/* + * Walk the item rbtree and return the item found and the next and + * prev items. + */ +static struct cached_item *walk_items(struct rb_root *root, struct kvec *key, + struct cached_item **prev, + struct cached_item **next) { struct rb_node *node = root->rb_node; - struct rb_node *parent = NULL; struct cached_item *item; int cmp; + *prev = NULL; + *next = NULL; + while (node) { - parent = node; item = container_of(node, struct cached_item, node); cmp = scoutfs_kvec_memcmp(key, item->key); - if (cmp < 0) + if (cmp < 0) { + *next = item; node = node->rb_left; - else if (cmp > 0) + } else if (cmp > 0) { + *prev = item; node = node->rb_right; - else + } else { return item; + } } return NULL; } +static struct cached_item *find_item(struct rb_root *root, struct kvec *key) +{ + struct cached_item *prev; + struct cached_item *next; + + return walk_items(root, key, &prev, &next); +} + +static struct cached_item *next_item(struct rb_root *root, struct kvec *key) +{ + struct cached_item *prev; + struct cached_item *next; + + return walk_items(root, key, &prev, &next) ?: next; +} + /* * We store the dirty bits in a single value so that the simple * augmented rbtree implementation gets a single scalar value to compare @@ -159,16 +210,13 @@ static const struct rb_augment_callbacks scoutfs_item_rb_cb = { }; /* - * Always insert the given item. If there's an existing item it is - * returned. This can briefly leave duplicate items in the tree until - * the caller removes the existing item. + * Try to insert the given item. If there's already an item with the + * insertion key then return -EEXIST. */ -static struct cached_item *insert_item(struct rb_root *root, - struct cached_item *ins) +static int insert_item(struct rb_root *root, struct cached_item *ins) { struct rb_node **node = &root->rb_node; struct rb_node *parent = NULL; - struct cached_item *existing = NULL; struct cached_item *item; int cmp; @@ -177,57 +225,176 @@ static struct cached_item *insert_item(struct rb_root *root, item = container_of(*node, struct cached_item, node); cmp = scoutfs_kvec_memcmp(ins->key, item->key); - if (cmp == 0) { - BUG_ON(existing); - existing = item; - } - if (cmp < 0) { if (ins->dirty) item->dirty |= LEFT_DIRTY; node = &(*node)->rb_left; - } else { + } else if (cmp > 0) { if (ins->dirty) item->dirty |= RIGHT_DIRTY; node = &(*node)->rb_right; + } else { + return -EEXIST; } } rb_link_node(&ins->node, parent, node); rb_insert_augmented(&ins->node, root, &scoutfs_item_rb_cb); - return existing; + return 0; +} + +/* + * Return true if the given key is covered by a cached range. end is + * set to the end of the cached range. + * + * Return false if the given key isn't covered by a cached range and is + * instead in an uncached hole. end is set to the start of the next + * cached range. + */ +static bool check_range(struct rb_root *root, struct kvec *key, + struct kvec *end) +{ + struct rb_node *node = root->rb_node; + struct cached_range *next = NULL; + struct cached_range *rng; + int cmp; + + while (node) { + rng = container_of(node, struct cached_range, node); + + cmp = scoutfs_kvec_cmp_overlap(key, key, + rng->start, rng->end); + if (cmp < 0) { + next = rng; + node = node->rb_left; + } else if (cmp > 0) { + node = node->rb_right; + } else { + scoutfs_kvec_memcpy_truncate(end, rng->end); + return true; + } + } + + if (next) + scoutfs_kvec_memcpy_truncate(end, next->start); + else + scoutfs_kvec_set_max_key(end); + + return false; +} + +static void free_range(struct cached_range *rng) +{ + if (!IS_ERR_OR_NULL(rng)) { + scoutfs_kvec_kfree(rng->start); + scoutfs_kvec_kfree(rng->end); + kfree(rng); + } +} + +/* + * Insert a new cached range. It might overlap with any number of + * existing cached ranges. As we descend we combine with and free any + * overlapping ranges before restarting the descent. + * + * We're responsible for the ins allocation. We free it if we don't + * insert it in the tree. + */ +static void insert_range(struct rb_root *root, struct cached_range *ins) +{ + struct cached_range *rng; + struct rb_node *parent; + struct rb_node **node; + int start_cmp; + int end_cmp; + int cmp; + +restart: + parent = NULL; + node = &root->rb_node; + while (*node) { + parent = *node; + rng = container_of(*node, struct cached_range, node); + + cmp = scoutfs_kvec_cmp_overlap(ins->start, ins->end, + rng->start, rng->end); + /* simple iteration until we overlap */ + if (cmp < 0) { + node = &(*node)->rb_left; + continue; + } else if (cmp > 0) { + node = &(*node)->rb_right; + continue; + } + + start_cmp = scoutfs_kvec_memcmp(ins->start, rng->start); + end_cmp = scoutfs_kvec_memcmp(ins->end, rng->end); + + /* free our insertion if we're entirely within an existing */ + if (start_cmp >= 0 && end_cmp <= 0) { + free_range(ins); + return; + } + + /* expand to cover partial overlap before freeing */ + if (start_cmp < 0 && end_cmp < 0) + scoutfs_kvec_swap(ins->end, rng->end); + else if (start_cmp > 0 && end_cmp > 0) + scoutfs_kvec_swap(ins->start, rng->start); + + /* remove and free all overlaps and restart the descent */ + rb_erase(&rng->node, root); + free_range(rng); + goto restart; + } + + rb_link_node(&ins->node, parent, node); + rb_insert_color(&ins->node, root); } /* * Find an item with the given key and copy its value into the caller's - * value vector. The amount of bytes copied is returned which can be - * 0 or truncated if the caller's buffer isn't big enough. + * value vector. The amount of bytes copied is returned which can be 0 + * or truncated if the caller's buffer isn't big enough. */ int scoutfs_item_lookup(struct super_block *sb, struct kvec *key, struct kvec *val) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; + SCOUTFS_DECLARE_KVEC(end); struct cached_item *item; unsigned long flags; int ret; + trace_scoutfs_item_lookup(sb, key, val); + + ret = scoutfs_kvec_alloc_key(end); + if (ret) + goto out; + do { + scoutfs_kvec_init_key(end); + spin_lock_irqsave(&cac->lock, flags); - item = find_item(&cac->root, key); + item = find_item(&cac->items, key); if (item) ret = scoutfs_kvec_memcpy(val, item->val); - else + else if (check_range(&cac->ranges, key, end)) ret = -ENOENT; + else + ret = -ENODATA; spin_unlock_irqrestore(&cac->lock, flags); - } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + } while (ret == -ENODATA && + (ret = scoutfs_manifest_read_items(sb, key, end)) == 0); + scoutfs_kvec_kfree(end); +out: trace_printk("ret %d\n", ret); - return ret; } @@ -256,59 +423,98 @@ int scoutfs_item_lookup_exact(struct super_block *sb, struct kvec *key, } /* - * Return the next cached item starting with the given key. + * Return the next item starting with the given key, returning the last + * key at the most. * - * -ENOENT is returned if there are no cached items past the given key. - * If the last key is specified then -ENOENT is returned if there are no - * cached items up until that last key, inclusive. + * -ENOENT is returned if there are no items between the given and last + * keys. * - * The found key is copied to the caller's key. -ENOBUFS is returned if - * the found key didn't fit in the caller's key. + * The next item's key is copied to the caller's key. -ENOBUFS is + * returned if the item's key didn't fit in the caller's key. * - * The found value is copied into the callers value. The number of - * value bytes copied is returned. The copied value can be truncated by - * the caller's value buffer length. + * The next item's value is copied into the callers value. The number + * of value bytes copied is returned. The copied value can be truncated + * by the caller's value buffer length. */ int scoutfs_item_next(struct super_block *sb, struct kvec *key, struct kvec *last, struct kvec *val) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; + SCOUTFS_DECLARE_KVEC(read_start); + SCOUTFS_DECLARE_KVEC(read_end); + SCOUTFS_DECLARE_KVEC(range_end); struct cached_item *item; unsigned long flags; + bool cached; int ret; - /* - * This partial copy and paste of lookup is stubbed out for now. - * we'll want the negative caching fixes to be able to iterate - * without constantly searching the manifest between cached - * items. - */ - return -EINVAL; + /* convenience to avoid searching if caller iterates past their last */ + if (scoutfs_kvec_length(key) > scoutfs_kvec_length(last)) { + ret = -ENOENT; + goto out; + } - do { - spin_lock_irqsave(&cac->lock, flags); + ret = scoutfs_kvec_alloc_key(range_end); + if (ret) + goto out; + + spin_lock_irqsave(&cac->lock, flags); + + for(;;) { + scoutfs_kvec_init_key(range_end); + + /* see if we have a usable item in cache and before last */ + cached = check_range(&cac->ranges, key, range_end); + + if (cached && (item = next_item(&cac->items, key)) && + scoutfs_kvec_memcmp(item->key, range_end) <= 0 && + scoutfs_kvec_memcmp(item->key, last) <= 0) { + + if (scoutfs_kvec_length(item->key) > + scoutfs_kvec_length(key)) { + ret = -ENOBUFS; + break; + } - item = find_item(&cac->root, key); - if (!item) { - ret = -ENOENT; - } else if (scoutfs_kvec_length(item->key) > - scoutfs_kvec_length(key)) { - ret = -ENOBUFS; - } else { scoutfs_kvec_memcpy_truncate(key, item->key); if (val) ret = scoutfs_kvec_memcpy(val, item->val); else ret = 0; + break; + } + + if (!cached) { + /* missing cache starts at key */ + scoutfs_kvec_clone(read_start, key); + scoutfs_kvec_clone(read_end, range_end); + + } else if (scoutfs_kvec_memcmp(range_end, last) < 0) { + /* missing cache starts at range_end */ + scoutfs_kvec_clone(read_start, range_end); + scoutfs_kvec_clone(read_end, last); + + } else { + /* no items and we have cache between key and last */ + ret = -ENOENT; + break; } spin_unlock_irqrestore(&cac->lock, flags); - } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + ret = scoutfs_manifest_read_items(sb, read_start, read_end); + spin_lock_irqsave(&cac->lock, flags); + if (ret) + break; + } + + spin_unlock_irqrestore(&cac->lock, flags); + + scoutfs_kvec_kfree(range_end); +out: trace_printk("ret %d\n", ret); - return ret; } @@ -396,94 +602,188 @@ static void clear_item_dirty(struct item_cache *cac, update_dirty_parents(item); } -/* - * Add an item with the key and value to the item cache. The new item - * is clean. Any existing item at the key will be removed and freed. - */ -static int add_item(struct super_block *sb, struct kvec *key, struct kvec *val, - bool dirty) +static struct cached_item *alloc_item(struct kvec *key, struct kvec *val) { - struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - struct item_cache *cac = sbi->item_cache; - struct cached_item *existing; struct cached_item *item; - unsigned long flags; - int ret; item = kzalloc(sizeof(struct cached_item), GFP_NOFS); - if (!item) - return -ENOMEM; - - ret = scoutfs_kvec_dup_flatten(item->key, key) ?: - scoutfs_kvec_dup_flatten(item->val, val); - if (ret) { - free_item(item); - return ret; + if (item) { + if (scoutfs_kvec_dup_flatten(item->key, key) || + scoutfs_kvec_dup_flatten(item->val, val)) { + free_item(item); + item = NULL; + } } - spin_lock_irqsave(&cac->lock, flags); - existing = insert_item(&cac->root, item); - if (existing) { - clear_item_dirty(cac, existing); - rb_erase_augmented(&existing->node, &cac->root, - &scoutfs_item_rb_cb); - } - if (dirty) - mark_item_dirty(cac, item); - spin_unlock_irqrestore(&cac->lock, flags); - free_item(existing); - - return 0; + return item; } /* - * Add a clean item to the cache. This is used to populate items while - * reading segments. - */ -int scoutfs_item_insert(struct super_block *sb, struct kvec *key, - struct kvec *val) -{ - return add_item(sb, key, val, false); -} - -/* - * Create a new dirty item in the cache. + * Create a new dirty item in the cache. Returns -EEXIST if an item + * already exists with the given key. + * + * XXX but it doesn't read.. is that weird? Seems weird. */ int scoutfs_item_create(struct super_block *sb, struct kvec *key, struct kvec *val) { - return add_item(sb, key, val, true); + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *item; + unsigned long flags; + int ret; + + item = alloc_item(key, val); + if (!item) + return -ENOMEM; + + spin_lock_irqsave(&cac->lock, flags); + ret = insert_item(&cac->items, item); + if (!ret) + mark_item_dirty(cac, item); + spin_unlock_irqrestore(&cac->lock, flags); + + if (ret) + free_item(item); + + return ret; } /* - * If the item with the key exists make sure it's cached and dirty. -ENOENT - * will be returned if it doesn't exist. + * Allocate an item with the key and value and add it to the list of + * items to be inserted as a batch later. The caller adds in sort order + * and we add with _tail to maintain that order. + */ +int scoutfs_item_add_batch(struct super_block *sb, struct list_head *list, + struct kvec *key, struct kvec *val) +{ + struct cached_item *item; + int ret; + + item = alloc_item(key, val); + if (item) { + list_add_tail(&item->entry, list); + ret = 0; + } else { + ret = -ENOMEM; + } + + return ret; +} + + +/* + * Insert a batch of clean read items from segments into the item cache. + * + * The caller hasn't been locked so the cached items could have changed + * since they were asked to read. If there are duplicates in the item + * cache they might be newer than what was read so we must drop them on + * the floor. + * + * The batch atomically adds the items and updates the cached range to + * include the callers range that covers the items. + * + * It's safe to re-add items to the batch list after they aren't + * inserted because _safe iteration will always be past the head entry + * that will be inserted. + */ +int scoutfs_item_insert_batch(struct super_block *sb, struct list_head *list, + struct kvec *start, struct kvec *end) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_range *rng; + struct cached_item *item; + struct cached_item *tmp; + unsigned long flags; + int ret; + + trace_scoutfs_item_insert_batch(sb, start, end); + + if (WARN_ON_ONCE(scoutfs_kvec_memcmp(start, end) > 0)) + return -EINVAL; + + rng = kzalloc(sizeof(struct cached_range), GFP_NOFS); + if (rng && (scoutfs_kvec_dup_flatten(rng->start, start) || + scoutfs_kvec_dup_flatten(rng->end, end))) { + free_range(rng); + rng = NULL; + } + if (!rng) { + ret = -ENOMEM; + goto out; + } + + spin_lock_irqsave(&cac->lock, flags); + + insert_range(&cac->ranges, rng); + + list_for_each_entry_safe(item, tmp, list, entry) { + list_del(&item->entry); + if (insert_item(&cac->items, item)) + list_add(&item->entry, list); + } + + spin_unlock_irqrestore(&cac->lock, flags); + + ret = 0; +out: + scoutfs_item_free_batch(list); + return ret; +} + +void scoutfs_item_free_batch(struct list_head *list) +{ + struct cached_item *item; + struct cached_item *tmp; + + list_for_each_entry_safe(item, tmp, list, entry) { + list_del_init(&item->entry); + free_item(item); + } +} + + +/* + * If the item exists make sure it's dirty and pinned. It can be read + * if it wasn't cached. -ENOENT is returned if the item doesn't exist. */ int scoutfs_item_dirty(struct super_block *sb, struct kvec *key) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; + SCOUTFS_DECLARE_KVEC(end); struct cached_item *item; unsigned long flags; int ret; + ret = scoutfs_kvec_alloc_key(end); + if (ret) + goto out; + do { + scoutfs_kvec_init_key(end); + spin_lock_irqsave(&cac->lock, flags); - item = find_item(&cac->root, key); + item = find_item(&cac->items, key); if (item) { mark_item_dirty(cac, item); ret = 0; - } else { + } else if (check_range(&cac->ranges, key, end)) { ret = -ENOENT; + } else { + ret = -ENODATA; } spin_unlock_irqrestore(&cac->lock, flags); - } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + } while (ret == -ENODATA && + (ret = scoutfs_manifest_read_items(sb, key, end)) == 0); + scoutfs_kvec_kfree(end); +out: trace_printk("ret %d\n", ret); - return ret; } @@ -499,37 +799,49 @@ int scoutfs_item_update(struct super_block *sb, struct kvec *key, struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; SCOUTFS_DECLARE_KVEC(up_val); + SCOUTFS_DECLARE_KVEC(end); struct cached_item *item; unsigned long flags; int ret; + ret = scoutfs_kvec_alloc_key(end); + if (ret) + goto out; + if (val) { ret = scoutfs_kvec_dup_flatten(up_val, val); if (ret) - return -ENOMEM; + goto out; } else { scoutfs_kvec_init_null(up_val); } - spin_lock_irqsave(&cac->lock, flags); + do { + scoutfs_kvec_init_key(end); - /* XXX update seq */ - item = find_item(&cac->root, key); - if (item) { - /* keep dirty counters in sync */ - clear_item_dirty(cac, item); - scoutfs_kvec_swap(up_val, item->val); - mark_item_dirty(cac, item); - } else { - ret = -ENOENT; - } + spin_lock_irqsave(&cac->lock, flags); - spin_unlock_irqrestore(&cac->lock, flags); + item = find_item(&cac->items, key); + if (item) { + clear_item_dirty(cac, item); + scoutfs_kvec_swap(up_val, item->val); + mark_item_dirty(cac, item); + ret = 0; + } else if (check_range(&cac->ranges, key, end)) { + ret = -ENOENT; + } else { + ret = -ENODATA; + } + spin_unlock_irqrestore(&cac->lock, flags); + + } while (ret == -ENODATA && + (ret = scoutfs_manifest_read_items(sb, key, end)) == 0); +out: + scoutfs_kvec_kfree(end); scoutfs_kvec_kfree(up_val); trace_printk("ret %d\n", ret); - return ret; } @@ -645,7 +957,7 @@ static void count_seg_items(struct item_cache *cac, u32 *nr_items, *key_bytes = 0; total = sizeof(struct scoutfs_segment_block); - for (item = first_dirty(cac->root.rb_node); item; + for (item = first_dirty(cac->items.rb_node); item; item = next_dirty(item)) { total += sizeof(struct scoutfs_segment_item) + @@ -676,7 +988,7 @@ int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg) count_seg_items(cac, &nr_items, &key_bytes); if (nr_items) { - item = first_dirty(cac->root.rb_node); + item = first_dirty(cac->items.rb_node); scoutfs_seg_first_item(sb, seg, item->key, item->val, nr_items, key_bytes); clear_item_dirty(cac, item); @@ -701,7 +1013,8 @@ int scoutfs_item_setup(struct super_block *sb) sbi->item_cache = cac; spin_lock_init(&cac->lock); - cac->root = RB_ROOT; + cac->items = RB_ROOT; + cac->ranges = RB_ROOT; return 0; } @@ -711,16 +1024,24 @@ void scoutfs_item_destroy(struct super_block *sb) struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; struct cached_item *item; + struct cached_range *rng; struct rb_node *node; if (cac) { - for (node = rb_first(&cac->root); node; ) { + for (node = rb_first(&cac->items); node; ) { item = container_of(node, struct cached_item, node); node = rb_next(node); - rb_erase(&item->node, &cac->root); + rb_erase(&item->node, &cac->items); free_item(item); } + for (node = rb_first(&cac->ranges); node; ) { + rng = container_of(node, struct cached_range, node); + node = rb_next(node); + rb_erase(&rng->node, &cac->items); + free_range(rng); + } + kfree(cac); } } diff --git a/kmod/src/item.h b/kmod/src/item.h index 62d93815..81746822 100644 --- a/kmod/src/item.h +++ b/kmod/src/item.h @@ -22,6 +22,12 @@ int scoutfs_item_update(struct super_block *sb, struct kvec *key, struct kvec *val); int scoutfs_item_delete(struct super_block *sb, struct kvec *key); +int scoutfs_item_add_batch(struct super_block *sb, struct list_head *list, + struct kvec *key, struct kvec *val); +int scoutfs_item_insert_batch(struct super_block *sb, struct list_head *list, + struct kvec *start, struct kvec *end); +void scoutfs_item_free_batch(struct list_head *list); + long scoutfs_item_dirty_bytes(struct super_block *sb); int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg); diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c index cfc46d0b..4980109c 100644 --- a/kmod/src/manifest.c +++ b/kmod/src/manifest.c @@ -22,6 +22,7 @@ #include "item.h" #include "ring.h" #include "manifest.h" +#include "scoutfs_trace.h" struct manifest { spinlock_t lock; @@ -51,15 +52,26 @@ struct manifest_entry { }; /* - * A path tracks all the segments from level 0 to the last level that - * overlap with the search key. + * A reader uses references to segments copied from a walk of the + * manifest. The references are a point in time sample of the manifest. + * The manifest and segments can change while the reader uses their + * references. Locking ensures that the items they're reading will be + * stable while the manifest and segments change, and the segment + * allocator gives readers time to use immutable stale segments before + * their reallocated and reused. */ struct manifest_ref { + struct list_head entry; + u64 segno; u64 seq; struct scoutfs_segment *seg; + int found_ctr; int pos; + u16 first_key_len; + u16 last_key_len; u8 level; + u8 keys[SCOUTFS_MAX_KEY_SIZE * 2]; }; static void init_ment_keys(struct manifest_entry *ment, struct kvec *first, @@ -72,20 +84,25 @@ static void init_ment_keys(struct manifest_entry *ment, struct kvec *first, le16_to_cpu(ment->am.last_key_len)); } -/* - * returns: - * < 0 : key < ment->first_key - * > 0 : key > ment->first_key - * == 0 : ment->first_key <= key <= ment->last_key - */ -static bool cmp_key_ment(struct kvec *key, struct manifest_entry *ment) +static void init_ref_keys(struct manifest_ref *ref, struct kvec *first, + struct kvec *last) +{ + if (first) + scoutfs_kvec_init(first, ref->keys, ref->first_key_len); + if (last) + scoutfs_kvec_init(last, ref->keys + ref->first_key_len, + ref->last_key_len); +} + +static bool cmp_range_ment(struct kvec *key, struct kvec *end, + struct manifest_entry *ment) { SCOUTFS_DECLARE_KVEC(first); SCOUTFS_DECLARE_KVEC(last); init_ment_keys(ment, first, last); - return scoutfs_kvec_cmp_overlap(key, key, first, last); + return scoutfs_kvec_cmp_overlap(key, end, first, last); } static struct manifest_entry *find_ment(struct rb_root *root, struct kvec *key) @@ -97,7 +114,7 @@ static struct manifest_entry *find_ment(struct rb_root *root, struct kvec *key) while (node) { ment = container_of(node, struct manifest_entry, node); - cmp = cmp_key_ment(key, ment); + cmp = cmp_range_ment(key, key, ment); if (cmp < 0) node = node->rb_left; else if (cmp > 0) @@ -119,16 +136,16 @@ static int insert_ment(struct rb_root *root, struct manifest_entry *ins) struct rb_node *parent = NULL; struct manifest_entry *ment; SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(end); int cmp; - /* either first or last works */ - init_ment_keys(ins, key, key); + init_ment_keys(ins, key, end); while (*node) { parent = *node; ment = container_of(*node, struct manifest_entry, node); - cmp = cmp_key_ment(key, ment); + cmp = cmp_range_ment(key, end, ment); if (cmp < 0) { node = &(*node)->rb_left; } else if (cmp > 0) { @@ -215,6 +232,8 @@ int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, int key_bytes; int ret; + trace_scoutfs_manifest_add(sb, first, last, segno, seq, level, dirty); + key_bytes = scoutfs_kvec_length(first) + scoutfs_kvec_length(last); ment = kmalloc(sizeof(struct manifest_entry) + key_bytes, GFP_NOFS); if (!ment) @@ -249,57 +268,97 @@ int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, return ret; } -static void set_ref(struct manifest_ref *ref, struct manifest_entry *ment) +/* + * Grab an allocated ref from the src list, fill it with the details + * from the ment, and add it to the dst list. The ref is added to the + * tail of the dst list so that we maintain the caller's manifest walk + * order. + */ +static void fill_ref_tail(struct list_head *dst, struct list_head *src, + struct manifest_entry *ment) { + SCOUTFS_DECLARE_KVEC(ment_first); + SCOUTFS_DECLARE_KVEC(ment_last); + SCOUTFS_DECLARE_KVEC(first); + SCOUTFS_DECLARE_KVEC(last); + struct manifest_ref *ref; + + ref = list_first_entry(src, struct manifest_ref, entry); + ref->segno = le64_to_cpu(ment->am.segno); ref->seq = le64_to_cpu(ment->am.seq); ref->level = ment->am.level; + ref->first_key_len = le16_to_cpu(ment->am.first_key_len); + ref->last_key_len = le16_to_cpu(ment->am.last_key_len); + + init_ment_keys(ment, ment_first, ment_last); + init_ref_keys(ref, first, last); + + scoutfs_kvec_memcpy(first, ment_first); + scoutfs_kvec_memcpy(last, ment_last); + + list_move_tail(&ref->entry, dst); } /* - * Returns refs if intersecting segments are found, NULL if none intersect, - * and PTR_ERR on failure. + * Get refs on all the segments in the manifest that we'll need to + * search to populate the cache with the given range. + * + * We have to get all the level 0 segments that intersect with the range + * of items that we want to search because the level 0 segments can + * arbitrarily overlap with each other. + * + * We only need to search for the starting key in all the higher order + * levels. They do not overlap so we can iterate through the key space + * in each segment starting with the key. */ -static struct manifest_ref *get_key_refs(struct manifest *mani, - struct kvec *key, - unsigned int *nr_ret) +static int get_range_refs(struct manifest *mani, struct kvec *key, + struct kvec *end, struct list_head *ref_list) { - struct manifest_ref *refs = NULL; struct manifest_entry *ment; + struct manifest_ref *ref; + struct manifest_ref *tmp; struct rb_root *root; unsigned long flags; unsigned int total; - unsigned int nr; + unsigned int nr = 0; + LIST_HEAD(alloced); + int ret; int i; trace_printk("getting refs\n"); spin_lock_irqsave(&mani->lock, flags); + /* allocate enough refs for the of segments */ total = mani->level0_nr + mani->last_level; - while (nr != total) { - nr = total; + while (nr < total) { spin_unlock_irqrestore(&mani->lock, flags); - kfree(refs); - refs = kcalloc(total, sizeof(struct manifest_ref), GFP_NOFS); - trace_printk("alloc refs %p total %u\n", refs, total); - if (!refs) - return ERR_PTR(-ENOMEM); + for (i = nr; i < total; i++) { + ref = kmalloc(sizeof(struct manifest_ref), GFP_NOFS); + if (!ref) { + ret = -ENOMEM; + goto out; + } + + memset(ref, 0, offsetof(struct manifest_ref, keys)); + list_add(&ref->entry, &alloced); + } + nr = total; spin_lock_irqsave(&mani->lock, flags); } - nr = 0; - + /* find all the overlapping level 0 segments */ list_for_each_entry(ment, &mani->level0_list, level0_entry) { - trace_printk("trying l0 ment %p\n", ment); - if (cmp_key_ment(key, ment)) + if (cmp_range_ment(key, end, ment)) continue; - set_ref(&refs[nr++], ment); + fill_ref_tail(ref_list, &alloced, ment); } + /* find each segment containing the key at the higher orders */ for (i = 1; i <= mani->last_level; i++) { root = &mani->level_roots[i]; if (RB_EMPTY_ROOT(root)) @@ -307,119 +366,151 @@ static struct manifest_ref *get_key_refs(struct manifest *mani, ment = find_ment(root, key); if (ment) - set_ref(&refs[nr++], ment); + fill_ref_tail(ref_list, &alloced, ment); } spin_unlock_irqrestore(&mani->lock, flags); + ret = 0; - *nr_ret = nr; - if (!nr) { - kfree(refs); - refs = NULL; +out: + if (ret) { + list_splice_init(ref_list, &alloced); + list_for_each_entry_safe(ref, tmp, &alloced, entry) { + list_del_init(&ref->entry); + kfree(ref); + } } - - trace_printk("refs %p (err %ld)\n", - refs, IS_ERR(refs) ? PTR_ERR(refs) : 0); - - return refs; + trace_printk("ret %d\n", ret); + return ret; } /* - * The caller didn't find an item for the given key in the item cache - * and wants us to search for it in the lsm segments. We search the - * manifest for all the segments that contain the key. We then read the - * segments and iterate over their items looking for ours. We insert it - * and some number of other surrounding items to amortize the relatively - * expensive multi-segment searches. + * The caller found a hole in the item cache that they'd like populated. + * + * We search the manifest for all the segments we'll need to iterate + * from the key to the end key. We walk the segments and insert as many + * items as we can from the segments, trying to amortize the per-item + * cost of segment searching. + * + * As we insert the batch of items we give the item cache the range of + * keys that contain these items. This lets the cache return negative + * cache lookups for missing items within the range. + * + * Returns 0 if we inserted items with a range covering the starting + * key. The caller should be able to make progress. + * + * Returns -errno if we failed to make any change in the cache. * * This is asking the seg code to read each entire segment. The seg * code could give it it helpers to submit and wait on blocks within the - * segment so that we don't have wild bandwidth amplification in the - * cold random read case. + * segment so that we don't have wild bandwidth amplification for cold + * random reads. * * The segments are immutable at this point so we can use their contents * as long as we hold refs. */ -int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key) +#define MAX_ITEMS_READ 32 + +int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key, + struct kvec *end) { DECLARE_MANIFEST(sb, mani); SCOUTFS_DECLARE_KVEC(item_key); SCOUTFS_DECLARE_KVEC(item_val); SCOUTFS_DECLARE_KVEC(found_key); SCOUTFS_DECLARE_KVEC(found_val); + SCOUTFS_DECLARE_KVEC(batch_end); + SCOUTFS_DECLARE_KVEC(seg_end); struct scoutfs_segment *seg; - struct manifest_ref *refs; - unsigned long had_found; + struct manifest_ref *ref; + struct manifest_ref *tmp; + LIST_HEAD(ref_list); + LIST_HEAD(batch); + int found_ctr; bool found; int ret = 0; int err; - int nr_refs; int cmp; - int last; - int i; int n; trace_printk("reading items\n"); - refs = get_key_refs(mani, key, &nr_refs); - if (IS_ERR(refs)) - return PTR_ERR(refs); - if (!refs) - return -ENOENT; + /* get refs on all the segments */ + ret = get_range_refs(mani, key, end, &ref_list); + if (ret) + return ret; /* submit reads for all the segments */ - for (i = 0; i < nr_refs; i++) { - seg = scoutfs_seg_submit_read(sb, refs[i].segno); + list_for_each_entry(ref, &ref_list, entry) { + seg = scoutfs_seg_submit_read(sb, ref->segno); if (IS_ERR(seg)) { ret = PTR_ERR(seg); break; } - refs[i].seg = seg; + ref->seg = seg; } - last = i; - /* wait for submitted segments and search if we haven't seen failure */ - for (i = 0; i < last; i++) { - seg = refs[i].seg; + /* wait for submitted segments and search for starting pos */ + list_for_each_entry(ref, &ref_list, entry) { + if (!ref->seg) + break; - err = scoutfs_seg_wait(sb, seg); + err = scoutfs_seg_wait(sb, ref->seg); if (err && !ret) ret = err; - if (!ret) - refs[i].pos = scoutfs_seg_find_pos(seg, key); + if (ret == 0) + ref->pos = scoutfs_seg_find_pos(ref->seg, key); } - - /* done if we saw errors */ if (ret) goto out; - /* walk sorted items, resolving across segments, and insert */ - for (n = 0; n < 16; n++) { + scoutfs_kvec_init_null(batch_end); + scoutfs_kvec_init_null(seg_end); + found_ctr = 0; + + for (n = 0; n < MAX_ITEMS_READ; n++) { found = false; + found_ctr++; - /* find the most recent least key */ - for (i = 0; i < nr_refs; i++) { - seg = refs[i].seg; - if (!seg) - continue; + /* find the next least key from the pos in each segment */ + list_for_each_entry_safe(ref, tmp, &ref_list, entry) { - /* get kvecs, removing if we ran out of items */ - ret = scoutfs_seg_item_kvecs(seg, refs[i].pos, + /* + * Check the next item in the segment. We're + * done with the segment if there are no more + * items or if the next item is past the + * caller's end. We record either the caller's + * end or the segment end if it's a l1+ segment for + * use as the batch end if we don't see more items. + */ + ret = scoutfs_seg_item_kvecs(ref->seg, ref->pos, item_key, item_val); + if (ret < 0) { + if (ref->level > 0) { + init_ref_keys(ref, NULL, item_key); + scoutfs_kvec_clone_less(seg_end, + item_key); + } + } else if (scoutfs_kvec_memcmp(item_key, end) > 0) { + scoutfs_kvec_clone_less(seg_end, end); + ret = -ENOENT; + } if (ret < 0) { - scoutfs_seg_put(seg); - refs[i].seg = NULL; + list_del_init(&ref->entry); + scoutfs_seg_put(ref->seg); + kfree(ref); continue; } + /* see if it's the new least item */ if (found) { cmp = scoutfs_kvec_memcmp(item_key, found_key); if (cmp >= 0) { if (cmp == 0) - set_bit(i, &had_found); + ref->found_ctr = found_ctr; continue; } } @@ -427,37 +518,58 @@ int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key) /* remember new least key */ scoutfs_kvec_clone(found_key, item_key); scoutfs_kvec_clone(found_val, item_val); + ref->found_ctr = ++found_ctr; found = true; - had_found = 0; - set_bit(i, &had_found); - } - - /* return -ENOENT if we didn't find any or the callers item */ - if (n == 0 && - (!found || scoutfs_kvec_memcmp(key, found_key))) { - ret = -ENOENT; - break; } + /* ran out of keys in segs, range extends to seg end */ if (!found) { + scoutfs_kvec_clone(batch_end, seg_end); ret = 0; break; } - ret = scoutfs_item_insert(sb, item_key, item_val); - if (ret) + /* + * If we fail to add an item we're done. If we already + * have items it's not a failure and the end of the cached + * range is the last successfully added item. + */ + ret = scoutfs_item_add_batch(sb, &batch, found_key, found_val); + if (ret) { + if (n > 0) + ret = 0; break; + } - /* advance all the positions past the found key */ - for_each_set_bit(i, &had_found, BITS_PER_LONG) - refs[i].pos++; + /* the last successful key determines the range */ + scoutfs_kvec_clone(batch_end, found_key); + + /* if we just saw the end key then we're done */ + if (scoutfs_kvec_memcmp(found_key, end) == 0) { + ret = 0; + break; + } + + /* advance all the positions that had the found key */ + list_for_each_entry(ref, &ref_list, entry) { + if (ref->found_ctr == found_ctr) + ref->pos++; + } + + ret = 0; } + if (ret) + scoutfs_item_free_batch(&batch); + else + ret = scoutfs_item_insert_batch(sb, &batch, key, batch_end); out: - for (i = 0; i < nr_refs; i++) - scoutfs_seg_put(refs[i].seg); + list_for_each_entry_safe(ref, tmp, &ref_list, entry) { + list_del_init(&ref->entry); + scoutfs_seg_put(ref->seg); + kfree(ref); + } - kfree(refs); return ret; } diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h index f3bea21a..9f1477b4 100644 --- a/kmod/src/manifest.h +++ b/kmod/src/manifest.h @@ -7,7 +7,8 @@ int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, int scoutfs_manifest_has_dirty(struct super_block *sb); int scoutfs_manifest_dirty_ring(struct super_block *sb); -int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key); +int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key, + struct kvec *until); int scoutfs_manifest_setup(struct super_block *sb); void scoutfs_manifest_destroy(struct super_block *sb); diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index a9a18118..8793a544 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -27,6 +27,7 @@ #include "key.h" #include "format.h" +#include "kvec.h" struct scoutfs_sb_info; @@ -346,6 +347,57 @@ DEFINE_EVENT(scoutfs_btree_ranged_op, scoutfs_btree_since, TP_ARGS(sb, first, last) ); +TRACE_EVENT(scoutfs_manifest_add, + TP_PROTO(struct super_block *sb, struct kvec *first, + struct kvec *last, u64 segno, u64 seq, u8 level, bool dirty), + TP_ARGS(sb, first, last, segno, seq, level, dirty), + TP_STRUCT__entry( + __dynamic_array(char, first, scoutfs_kvec_key_strlen(first)) + __dynamic_array(char, last, scoutfs_kvec_key_strlen(last)) + __field(u64, segno) + __field(u64, seq) + __field(u8, level) + __field(u8, dirty) + ), + TP_fast_assign( + scoutfs_kvec_key_sprintf(__get_dynamic_array(first), first); + scoutfs_kvec_key_sprintf(__get_dynamic_array(last), last); + __entry->segno = segno; + __entry->seq = seq; + __entry->level = level; + __entry->dirty = dirty; + ), + TP_printk("first %s last %s segno %llu seq %llu level %u dirty %u", + __get_str(first), __get_str(last), __entry->segno, + __entry->seq, __entry->level, __entry->dirty) +); + +TRACE_EVENT(scoutfs_item_lookup, + TP_PROTO(struct super_block *sb, struct kvec *key, struct kvec *val), + TP_ARGS(sb, key, val), + TP_STRUCT__entry( + __dynamic_array(char, key, scoutfs_kvec_key_strlen(key)) + ), + TP_fast_assign( + scoutfs_kvec_key_sprintf(__get_dynamic_array(key), key); + ), + TP_printk("key %s", __get_str(key)) +); + +TRACE_EVENT(scoutfs_item_insert_batch, + TP_PROTO(struct super_block *sb, struct kvec *start, struct kvec *end), + TP_ARGS(sb, start, end), + TP_STRUCT__entry( + __dynamic_array(char, start, scoutfs_kvec_key_strlen(start)) + __dynamic_array(char, end, scoutfs_kvec_key_strlen(end)) + ), + TP_fast_assign( + scoutfs_kvec_key_sprintf(__get_dynamic_array(start), start); + scoutfs_kvec_key_sprintf(__get_dynamic_array(end), end); + ), + TP_printk("start %s end %s", __get_str(start), __get_str(end)) +); + #endif /* _TRACE_SCOUTFS_H */ /* This part must be outside protection */