diff --git a/kmod/src/btree.c b/kmod/src/btree.c index 9807c8c5..30e85b10 100644 --- a/kmod/src/btree.c +++ b/kmod/src/btree.c @@ -1875,12 +1875,11 @@ out: * set in btree items. They're only used for fs items written through * the item cache and forest of log btrees. */ -int scoutfs_btree_insert_list(struct super_block *sb, - struct scoutfs_alloc *alloc, - struct scoutfs_block_writer *wri, - struct scoutfs_btree_root *root, - struct scoutfs_btree_item_list *lst) +int scoutfs_btree_insert_list(struct super_block *sb, struct scoutfs_alloc *alloc, + struct scoutfs_block_writer *wri, struct scoutfs_btree_root *root, + scoutfs_btree_item_iter_cb iter_cb, void *pos, void *arg) { + struct scoutfs_btree_item_desc desc; struct scoutfs_btree_item *item; struct btree_walk_key_range kr; struct scoutfs_btree_block *bt; @@ -1889,44 +1888,46 @@ int scoutfs_btree_insert_list(struct super_block *sb, int cmp; int ret = 0; - while (lst) { + pos = iter_cb(sb, &desc, pos, arg); + + while (pos) { ret = btree_walk(sb, alloc, wri, root, BTW_DIRTY | BTW_INSERT, - &lst->key, lst->val_len, &bl, &kr, NULL); + desc.key, desc.val_len, &bl, &kr, NULL); if (ret < 0) goto out; bt = bl->data; do { - item = leaf_item_hash_search(sb, bt, &lst->key); + item = leaf_item_hash_search(sb, bt, desc.key); if (item) { /* try to merge delta values, _NULL not deleted; merge will */ - ret = scoutfs_forest_combine_deltas(&lst->key, + ret = scoutfs_forest_combine_deltas(desc.key, item_val(bt, item), item_val_len(item), - lst->val, lst->val_len); + desc.val, desc.val_len); if (ret < 0) { scoutfs_block_put(sb, bl); goto out; } - item->seq = cpu_to_le64(lst->seq); - item->flags = lst->flags; + item->seq = cpu_to_le64(desc.seq); + item->flags = desc.flags; if (ret == 0) - update_item_value(bt, item, lst->val, lst->val_len); + update_item_value(bt, item, desc.val, desc.val_len); else ret = 0; } else { scoutfs_avl_search(&bt->item_root, - cmp_key_item, &lst->key, + cmp_key_item, desc.key, &cmp, &par, NULL, NULL); - create_item(bt, &lst->key, lst->seq, lst->flags, lst->val, - lst->val_len, par, cmp); + create_item(bt, desc.key, desc.seq, desc.flags, desc.val, + desc.val_len, par, cmp); } - lst = lst->next; - } while (lst && scoutfs_key_compare(&lst->key, &kr.end) <= 0 && - mid_free_item_room(bt, lst->val_len)); + pos = iter_cb(sb, &desc, pos, arg); + } while (pos && scoutfs_key_compare(desc.key, &kr.end) <= 0 && + mid_free_item_room(bt, desc.val_len)); scoutfs_block_put(sb, bl); } diff --git a/kmod/src/btree.h b/kmod/src/btree.h index 057aa779..4233e912 100644 --- a/kmod/src/btree.h +++ b/kmod/src/btree.h @@ -18,11 +18,24 @@ struct scoutfs_btree_item_ref { #define SCOUTFS_BTREE_ITEM_REF(name) \ struct scoutfs_btree_item_ref name = {NULL,} -/* caller gives an item to the callback */ +/* btree gives an item to caller */ typedef int (*scoutfs_btree_item_cb)(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, void *val, int val_len, void *arg); +struct scoutfs_btree_item_desc { + struct scoutfs_key *key; + void *val; + u64 seq; + u8 flags; + unsigned val_len; +}; + +/* btree iterates through items from caller */ +typedef void *(*scoutfs_btree_item_iter_cb)(struct super_block *sb, + struct scoutfs_btree_item_desc *desc, + void *pos, void *arg); + /* simple singly-linked list of items */ struct scoutfs_btree_item_list { struct scoutfs_btree_item_list *next; @@ -78,11 +91,9 @@ int scoutfs_btree_read_items(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end, scoutfs_btree_item_cb cb, void *arg); -int scoutfs_btree_insert_list(struct super_block *sb, - struct scoutfs_alloc *alloc, - struct scoutfs_block_writer *wri, - struct scoutfs_btree_root *root, - struct scoutfs_btree_item_list *lst); +int scoutfs_btree_insert_list(struct super_block *sb, struct scoutfs_alloc *alloc, + struct scoutfs_block_writer *wri, struct scoutfs_btree_root *root, + scoutfs_btree_item_iter_cb iter_cb, void *pos, void *arg); int scoutfs_btree_parent_range(struct super_block *sb, struct scoutfs_btree_root *root, diff --git a/kmod/src/counters.h b/kmod/src/counters.h index b3e68bd4..f8645749 100644 --- a/kmod/src/counters.h +++ b/kmod/src/counters.h @@ -90,36 +90,27 @@ EXPAND_COUNTER(forest_read_items) \ EXPAND_COUNTER(forest_roots_next_hint) \ EXPAND_COUNTER(forest_set_bloom_bits) \ + EXPAND_COUNTER(item_alloc_bytes) \ EXPAND_COUNTER(item_clear_dirty) \ EXPAND_COUNTER(item_create) \ EXPAND_COUNTER(item_delete) \ EXPAND_COUNTER(item_delta) \ EXPAND_COUNTER(item_delta_written) \ EXPAND_COUNTER(item_dirty) \ + EXPAND_COUNTER(item_free_bytes) \ EXPAND_COUNTER(item_invalidate) \ - EXPAND_COUNTER(item_invalidate_page) \ + EXPAND_COUNTER(item_invalidate_item) \ EXPAND_COUNTER(item_lookup) \ EXPAND_COUNTER(item_mark_dirty) \ EXPAND_COUNTER(item_next) \ - EXPAND_COUNTER(item_page_accessed) \ - EXPAND_COUNTER(item_page_alloc) \ - EXPAND_COUNTER(item_page_clear_dirty) \ - EXPAND_COUNTER(item_page_compact) \ - EXPAND_COUNTER(item_page_free) \ - EXPAND_COUNTER(item_page_lru_add) \ - EXPAND_COUNTER(item_page_lru_remove) \ - EXPAND_COUNTER(item_page_mark_dirty) \ - EXPAND_COUNTER(item_page_rbtree_walk) \ - EXPAND_COUNTER(item_page_split) \ - EXPAND_COUNTER(item_pcpu_add_replaced) \ - EXPAND_COUNTER(item_pcpu_page_hit) \ - EXPAND_COUNTER(item_pcpu_page_miss) \ - EXPAND_COUNTER(item_pcpu_page_miss_keys) \ - EXPAND_COUNTER(item_read_pages_split) \ - EXPAND_COUNTER(item_shrink_page) \ - EXPAND_COUNTER(item_shrink_page_dirty) \ - EXPAND_COUNTER(item_shrink_page_reader) \ - EXPAND_COUNTER(item_shrink_page_trylock) \ + EXPAND_COUNTER(item_shrink) \ + EXPAND_COUNTER(item_shrink_all) \ + EXPAND_COUNTER(item_shrink_exhausted) \ + EXPAND_COUNTER(item_shrink_read_search) \ + EXPAND_COUNTER(item_shrink_removed) \ + EXPAND_COUNTER(item_shrink_searched) \ + EXPAND_COUNTER(item_shrink_skipped) \ + EXPAND_COUNTER(item_shrink_write_search) \ EXPAND_COUNTER(item_update) \ EXPAND_COUNTER(item_write_dirty) \ EXPAND_COUNTER(lock_alloc) \ diff --git a/kmod/src/forest.c b/kmod/src/forest.c index 1b4c9c4b..a9b33236 100644 --- a/kmod/src/forest.c +++ b/kmod/src/forest.c @@ -494,13 +494,13 @@ out: return ret; } -int scoutfs_forest_insert_list(struct super_block *sb, - struct scoutfs_btree_item_list *lst) +int scoutfs_forest_insert_list(struct super_block *sb, scoutfs_btree_item_iter_cb cb, + void *pos, void *arg) { DECLARE_FOREST_INFO(sb, finf); return scoutfs_btree_insert_list(sb, finf->alloc, finf->wri, - &finf->our_log.item_root, lst); + &finf->our_log.item_root, cb, pos, arg); } /* diff --git a/kmod/src/forest.h b/kmod/src/forest.h index 30564a11..e699d4bc 100644 --- a/kmod/src/forest.h +++ b/kmod/src/forest.h @@ -29,8 +29,8 @@ void scoutfs_forest_set_max_seq(struct super_block *sb, u64 max_seq); int scoutfs_forest_get_max_seq(struct super_block *sb, struct scoutfs_super_block *super, u64 *seq); -int scoutfs_forest_insert_list(struct super_block *sb, - struct scoutfs_btree_item_list *lst); +int scoutfs_forest_insert_list(struct super_block *sb, scoutfs_btree_item_iter_cb cb, + void *pos, void *arg); int scoutfs_forest_srch_add(struct super_block *sb, u64 hash, u64 ino, u64 id); void scoutfs_forest_inc_inode_count(struct super_block *sb); diff --git a/kmod/src/item.c b/kmod/src/item.c index 7151b380..267ec9cf 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -15,16 +15,18 @@ #include #include #include -#include #include +#include #include #include #include "super.h" +#include "cwskip.h" #include "item.h" #include "forest.h" #include "block.h" #include "trans.h" +#include "cwskip.h" #include "counters.h" #include "scoutfs_trace.h" @@ -33,1245 +35,330 @@ * from and written to the forest of btrees under the protection of * cluster locks. * - * The cache is built around pages of items. A page has the range of - * keys that it caches and the items that are present in that range. - * Pages are non-overlapping, there is only one page that can contain a - * given key at a time. The pages are tracked by an rbtree, and each - * page has an rbtree of items. + * The cache is built around a concurrent skip list of items. Readers + * are protected by per-item seqlocks and retry if their items are + * modified while they're being referenced. Writers use trylock to + * acquire locks on adjacent pairs of items and retry if they encounter + * contention. + * + * The item cache has to support negative caches of ranges of keys that + * contain no items. This is done by marking a node as having a "hole" + * in the cache following its key. Searches that hit keys in these + * hole regions read items from btree blocks and insert the resulting + * key range into the cache. Searches that end after items without the + * following hole marker know that the item doesn't exist and can act + * accordingly, say by returning enoent. Working with this space + * between items is why the skip list interface is built around + * returning the pair of items that surround a key. * * The cache is populated by reading items from the forest of btrees - * into a private set of pages. The regions of those pages which - * weren't already cached are then inserted into the cache. + * into a private list. The range of keys in the list that didn't exist + * in the cache are inserted into the list, maintaining the negative + * cached range around the read items. * - * CPUs can concurrently modify items that are in different pages. The - * page rbtree can be read locked to find a page, and then the page is - * locked to work with its items. We then add per-cpu references to - * recently used pages so that the global page rbtree can be skipped in - * the typical case of repeated calls to localized portions of the key - * space. + * Dirty items are kept in per-cpu lists to reduce global contention, + * loads where all cpus are only creating dirty items are common. The + * dirty items are only combined and sorted when it comes to to commit + * them. * - * Dirty items are kept in a per-page dirty list, and pages with dirty - * items are kept in a global dirty list. This reduces contention on - * the global list by accessing it at page granularity instead of every - * time an item is dirtied. The dirty items are not sorted until it - * comes time to commit them to the btrees. This reduces the cost of - * tracking dirty items during the transaction, particularly moving them - * between pages as pages are split to make room for new items. - * - * The size of the cache is only limited by memory reclaim. Pages are - * kept in a very coarse lru. Dirtying doesn't remove pages from the - * lru, and is operating against lock ordering with trylocks, so - * shrinking can rarely have to skip pages in the LRU. - * - * The locking is built around the fast path of everyone checking the - * the page rbtree, then locking pages, and then adding or removing - * pages from the lru or dirty lists. Writing and the shrinker work - * work in reverse, starting with the dirty or lru lists and have to use - * trylock to lock the pages. When we split we have to lock multiple - * pages and we use trylock which is guaranteed to succeed because the - * pages are private. + * The size of the cache is only limited by memory reclaim. We try to + * group items into coarse ages by how recently they were accessed. We + * don't precisely order items by access time to avoid contention. + * Shrinking randomly walks all items looking for items that weren't + * accessed recently. */ +struct pcpu_age_counters { + atomic64_t age_marked; + atomic64_t total; +}; + +struct pcpu_dirty_list { + struct list_head list; + spinlock_t lock; +}; + struct item_cache_info { /* almost always read, barely written */ struct super_block *sb; - struct item_percpu_pages __percpu *pcpu_pages; + struct pcpu_age_counters __percpu *pcpu_age; + struct pcpu_dirty_list __percpu *pcpu_dirty; struct shrinker shrinker; struct notifier_block notifier; - /* often walked, but per-cpu refs are fast path */ - rwlock_t rwlock; - struct rb_root pg_root; + /* read for every op, rarely written by tall or early nodes */ + ____cacheline_aligned_in_smp struct scoutfs_cwskip_root item_root; - /* page-granular modification by writers, then exclusive to commit */ - spinlock_t dirty_lock; - struct list_head dirty_list; - atomic_t dirty_pages; + /* often read, rarely written as ages advance */ + atomic64_t current_age; + atomic64_t age_marked; + atomic64_t age_total; - /* page-granular modification by readers */ - spinlock_t lru_lock; - struct list_head lru_list; - unsigned long lru_pages; + /* written by every dirty item change */ + ____cacheline_aligned_in_smp atomic64_t dirty_bytes; - /* written by page readers, read by shrink */ - spinlock_t active_lock; + /* written by readers, read by shrink */ + ____cacheline_aligned_in_smp spinlock_t active_lock; struct list_head active_list; }; #define DECLARE_ITEM_CACHE_INFO(sb, name) \ struct item_cache_info *name = SCOUTFS_SB(sb)->item_cache_info -#define PG_PER_CPU 32 -struct item_percpu_pages { - struct rb_root root; - struct list_head list; - struct pcpu_page_ref { - struct scoutfs_key start; - struct scoutfs_key end; - struct cached_page *pg; - struct rb_node node; - struct list_head head; - } refs[PG_PER_CPU]; -}; - -struct cached_page { - /* often read by concurrent rbtree walks */ - struct rb_node node; - struct scoutfs_key start; - struct scoutfs_key end; - - /* often modified by page rwlock holder */ - rwlock_t rwlock; - struct rb_root item_root; - struct list_head lru_head; - unsigned long lru_time; - struct list_head dirty_list; - struct list_head dirty_head; - u64 max_seq; - struct page *page; - unsigned int page_off; - unsigned int erased_bytes; - atomic_t refcount; -}; - struct cached_item { - struct rb_node node; + struct scoutfs_cwskip_node *node; struct list_head dirty_head; + struct rcu_head rcu_head; + atomic64_t age; unsigned int dirty:1, /* needs to be written */ persistent:1, /* in btrees, needs deletion item */ deletion:1, /* negative del item for writing */ - delta:1; /* item vales are combined, freed after write */ + delta:1, /* item vales are combined, freed after write */ + negative:1, /* no item, marks hole_after boundary */ + hole_after:1; /* no cache until next item */ + unsigned int alloc_bytes; unsigned int val_len; + int dirty_cpu; struct scoutfs_key key; u64 seq; - char val[0]; + char *val; }; -#define CACHED_ITEM_ALIGN 8 - -static int item_val_bytes(int val_len) +static int key_item_cmp(void *K, void *C) { - return round_up(offsetof(struct cached_item, val[val_len]), - CACHED_ITEM_ALIGN); + struct scoutfs_key *key = K; + struct cached_item *item = C; + + return scoutfs_key_compare(key, &item->key); +} + +static int item_alloc_bytes(int height, int val_len) +{ + return sizeof(struct cached_item) + + offsetof(struct scoutfs_cwskip_node, links[height]) + + val_len; } /* - * Return if the page has room to allocate an item with the given value - * length at its free page offset. This must be called with the page - * writelock held because it can modify the page to reclaim free space - * to mkae room for the allocation. Today all it does is recognize that - * the page is empty and reset the page_off. + * Allocate and initialize a new item. These can be freed directly + * until they're inserted into the item list. The moment they're + * visible via the list they have to be freed with call_free_item within + * an RCU read lock. */ -static bool page_has_room(struct cached_page *pg, int val_len) -{ - if (RB_EMPTY_ROOT(&pg->item_root)) - pg->page_off = 0; - - return pg->page_off + item_val_bytes(val_len) <= PAGE_SIZE; -} - -static struct cached_page *first_page(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_first(root))) - return NULL; - - return rb_entry(node, struct cached_page, node); -} - -static struct cached_item *first_item(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_first(root))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *last_item(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_last(root))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *next_item(struct cached_item *item) -{ - struct rb_node *node; - - if (!item || !(node = rb_next(&item->node))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *prev_item(struct cached_item *item) -{ - struct rb_node *node; - - if (!item || !(node = rb_prev(&item->node))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static void rbtree_insert(struct rb_node *node, struct rb_node *par, - struct rb_node **pnode, struct rb_root *root) -{ - rb_link_node(node, par, pnode); - rb_insert_color(node, root); -} - -static void rbtree_erase(struct rb_node *node, struct rb_root *root) -{ - rb_erase(node, root); - RB_CLEAR_NODE(node); -} - -static void rbtree_replace_node(struct rb_node *victim, struct rb_node *new, - struct rb_root *root) -{ - rb_replace_node(victim, new, root); - RB_CLEAR_NODE(victim); -} - -/* - * This is far too expensive to use regularly, but it's very helpful for - * discovering corruption after modifications to cached pages. - */ -static __attribute__((unused)) void verify_page_rbtree(struct rb_root *root) -{ - struct cached_item *item; - struct cached_page *par; - struct cached_page *pg; - struct cached_page *n; - char *reason = NULL; - struct rb_node *p; - int cmp; - - rbtree_postorder_for_each_entry_safe(pg, n, root, node) { - - item = NULL; - par = NULL; - - if (scoutfs_key_compare(&pg->start, &pg->end) > 0) { - reason = "start > end"; - break; - } - - item = first_item(&pg->item_root); - if (item && scoutfs_key_compare(&item->key, &pg->start) < 0) { - reason = "first item < start"; - break; - } - - item = last_item(&pg->item_root); - if (item && scoutfs_key_compare(&item->key, &pg->end) > 0) { - reason = "last item > end"; - break; - } - - p = rb_parent(&pg->node); - if (!p) - continue; - par = rb_entry(p, struct cached_page, node); - - cmp = scoutfs_key_compare_ranges(&pg->start, &pg->end, - &par->start, &par->end); - if (cmp == 0) { - reason = "parent and child overlap"; - break; - } - - if (par->node.rb_right == &pg->node && cmp < 0) { - reason = "right child < parent"; - break; - } - - if (par->node.rb_left == &pg->node && cmp > 0) { - reason = "left child > parent"; - break; - } - } - - if (!reason) - return; - - printk("bad item page rbtree: %s\n", reason); - printk("pg %p start "SK_FMT" end "SK_FMT"\n", - pg, SK_ARG(&pg->start), SK_ARG(&pg->end)); - if (par) - printk("par %p start "SK_FMT" end "SK_FMT"\n", - par, SK_ARG(&par->start), SK_ARG(&par->end)); - if (item) - printk("item %p key "SK_FMT"\n", item, SK_ARG(&item->key)); - - rbtree_postorder_for_each_entry_safe(pg, n, root, node) { - printk(" pg %p left %p right %p start "SK_FMT" end "SK_FMT"\n", - pg, - pg->node.rb_left ? rb_entry(pg->node.rb_left, - struct cached_page, node) : - NULL, - pg->node.rb_right ? rb_entry(pg->node.rb_right, - struct cached_page, node) : - NULL, - SK_ARG(&pg->start), - SK_ARG(&pg->end)); - } - - BUG(); -} - - -/* - * This lets us lock newly allocated pages without having to add nesting - * annotation. The non-acquired path is never executed. - */ -static void write_trylock_will_succeed(rwlock_t *rwlock) -__acquires(rwlock) -{ - while (!write_trylock(rwlock)) - BUG(); -} - -static struct cached_page *alloc_pg(struct super_block *sb, gfp_t gfp) -{ - struct cached_page *pg; - struct page *page; - - pg = kzalloc(sizeof(struct cached_page), GFP_NOFS | gfp); - page = alloc_page(GFP_NOFS | gfp); - if (!page || !pg) { - kfree(pg); - if (page) - __free_page(page); - return NULL; - } - - scoutfs_inc_counter(sb, item_page_alloc); - - RB_CLEAR_NODE(&pg->node); - rwlock_init(&pg->rwlock); - pg->item_root = RB_ROOT; - INIT_LIST_HEAD(&pg->lru_head); - INIT_LIST_HEAD(&pg->dirty_list); - INIT_LIST_HEAD(&pg->dirty_head); - pg->page = page; - atomic_set(&pg->refcount, 1); - - return pg; -} - -static void get_pg(struct cached_page *pg) -{ - atomic_inc(&pg->refcount); -} - -static void put_pg(struct super_block *sb, struct cached_page *pg) -{ - if (pg && atomic_dec_and_test(&pg->refcount)) { - scoutfs_inc_counter(sb, item_page_free); - - BUG_ON(!RB_EMPTY_NODE(&pg->node)); - BUG_ON(!list_empty(&pg->lru_head)); - BUG_ON(!list_empty(&pg->dirty_list)); - BUG_ON(!list_empty(&pg->dirty_head)); - - __free_page(pg->page); - kfree(pg); - } -} - -static void update_pg_max_seq(struct cached_page *pg, struct cached_item *item) -{ - if (item->seq > pg->max_seq) - pg->max_seq = item->seq; -} - -/* - * Allocate space for a new item from the free offset at the end of a - * cached page. This isn't a blocking allocation, and it's likely that - * the caller has ensured it will succeed by allocating from a new empty - * page or checking the free space first. - */ -static struct cached_item *alloc_item(struct cached_page *pg, +static struct cached_item *alloc_item(struct super_block *sb, struct scoutfs_key *key, u64 seq, bool deletion, void *val, int val_len) { struct cached_item *item; + int height; + int bytes; - if (!page_has_room(pg, val_len)) + height = scoutfs_cwskip_rand_height(); + bytes = item_alloc_bytes(height, val_len); + item = kmalloc(bytes, GFP_NOFS); + if (!item) return NULL; - item = page_address(pg->page) + pg->page_off; - pg->page_off += item_val_bytes(val_len); + item->node = (void *)item + sizeof(struct cached_item); + item->val = (void *)&item->node->links[height]; - RB_CLEAR_NODE(&item->node); INIT_LIST_HEAD(&item->dirty_head); + atomic64_set(&item->age, 0); + item->dirty = 0; item->persistent = 0; item->deletion = !!deletion; item->delta = 0; + item->negative = 0; + item->hole_after = 0; + item->alloc_bytes = bytes; item->val_len = val_len; + item->dirty_cpu = -1; item->key = *key; item->seq = seq; + item->node->height = height; + item->node->write_seq = 0; + /* insert initializes all node links */ + if (val_len) memcpy(item->val, val, val_len); - update_pg_max_seq(pg, item); + scoutfs_add_counter(sb, item_alloc_bytes, bytes); return item; } -static void erase_item(struct cached_page *pg, struct cached_item *item) +static void call_free_item(struct super_block *sb, struct cached_item *item) { - rbtree_erase(&item->node, &pg->item_root); - pg->erased_bytes += item_val_bytes(item->val_len); -} - -static void lru_add(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - spin_lock(&cinf->lru_lock); - if (list_empty(&pg->lru_head)) { - scoutfs_inc_counter(sb, item_page_lru_add); - list_add_tail(&pg->lru_head, &cinf->lru_list); - cinf->lru_pages++; - } - spin_unlock(&cinf->lru_lock); -} - -static void __lru_remove(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - if (!list_empty(&pg->lru_head)) { - scoutfs_inc_counter(sb, item_page_lru_remove); - list_del_init(&pg->lru_head); - cinf->lru_pages--; + if (item) { + scoutfs_add_counter(sb, item_free_bytes, item->alloc_bytes); + kfree_rcu(item, rcu_head); } } -static void lru_remove(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - spin_lock(&cinf->lru_lock); - __lru_remove(sb, cinf, pg); - spin_unlock(&cinf->lru_lock); -} +#define ITEM_AGE_NR_SHIFT 3 +#define ITEM_AGE_NR (1 << ITEM_AGE_NR_SHIFT) +#define ITEM_AGE_HALF (ITEM_AGE_NR / 2) +#define ITEM_AGE_MARK_BATCH (256 * 1024) +#define ITEM_AGE_MARK_SHIFT 62 +#define ITEM_AGE_MARK_MASK ((1ULL << ITEM_AGE_MARK_SHIFT) - 1) + /* - * Make sure that the page the caller just accessed is reasonably close - * to the tail of the lru so it will be less likely to be reclaimed by - * the shrinker. - * - * We want to quickly determine that the page is close enough to the - * tail by only looking at the page. We use a coarse clock tick to - * determine if we've already moved the head to the tail sufficiently - * recently. We can't differentiate shrinking priority amongst the - * number of pages that the cpu can access within given chunk of time. - * - * We don't care that the lru_time accessed aren't locked and could see - * rare corruption. It's just a shrink priority heuristic. + * Add the caller's per-cpu marked count for their age to the global + * marked count for the current age. If the current age advances we + * drop the caller's marked count. */ -static void lru_accessed(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) +static u64 add_global_age_marked(struct item_cache_info *cinf, u64 age_marked) { - unsigned long time = jiffies_to_msecs(jiffies); + u64 old; + u64 new; - scoutfs_inc_counter(sb, item_page_accessed); + do { + old = atomic64_read(&cinf->age_marked); + if ((old & ~ITEM_AGE_MARK_MASK) != + (age_marked & ~ITEM_AGE_MARK_MASK)) + return 0; - if (pg->lru_time != time) { - lru_remove(sb, cinf, pg); - pg->lru_time = time; - lru_add(sb, cinf, pg); - } + new = old + (age_marked & ITEM_AGE_MARK_MASK); + } while (atomic64_cmpxchg(&cinf->age_marked, old, new) != old); + + return new & ITEM_AGE_MARK_MASK; } /* - * Return the pg that contains the key and set the parent nodes for insertion. - * When we find the pg we go right so that the caller can insert a new - * page to the right of the found page if it had to split the page. + * Make sure that a recently accessed item is marked with the current + * age to protect it from shrink. We record the total bytes we've + * marked in per-cpu counters. If the per-cpu marked count crosses a + * threshold we combine it with a global count. If the global count + * exceeds an age's fraction of the total then we increment the current + * age to mark and clear the marking counts. + * + * The result is that recent ages will have roughly a (1/ages) fraction + * of the total bytes of the cache. That gets less and less true over + * time as the old ages have items removed. + * + * This is very far from perfect, but we don't need perfect. We need to + * avoid creating read storms by shrinking active items while also not + * creating global contention by tracking items. + * + * This has to be a little fiddly to avoid a marked batch count on a cpu + * for age N being added to the global total for age N+1. We mark the + * high bits of the marked totals with the low two bits of the current + * age. cmpxchg then stops the total for an old age being added to a + * different age, within a short distance. */ -static struct cached_page *page_rbtree_walk(struct super_block *sb, - struct rb_root *root, - struct scoutfs_key *start, - struct scoutfs_key *end, - struct cached_page **prev, - struct cached_page **next, - struct rb_node **par, - struct rb_node ***pnode) +static void mark_item_age(struct item_cache_info *cinf, struct cached_item *item) { - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct cached_page *ret = NULL; - struct cached_page *pg; - int cmp; + struct pcpu_age_counters *pac; + u64 old_age; + u64 marked; + u64 limit; + u64 age; + u64 old; + u64 new; + u64 was; + int cpu; - scoutfs_inc_counter(sb, item_page_rbtree_walk); + old_age = atomic64_read(&item->age); + age = atomic64_read(&cinf->current_age); + if (old_age == age || + atomic64_cmpxchg(&item->age, old_age, age) != old_age) + return; - if (next) - *next = NULL; - if (prev) - *prev = NULL; + pac = get_cpu_ptr(cinf->pcpu_age); - while (*node) { - parent = *node; - pg = container_of(*node, struct cached_page, node); + old = atomic64_read(&pac->age_marked); + marked = (old & ITEM_AGE_MARK_MASK) + item->alloc_bytes; + new = (age << ITEM_AGE_MARK_SHIFT) + marked; - cmp = scoutfs_key_compare_ranges(start, end, &pg->start, - &pg->end); - if (cmp < 0) { - if (next) - *next = pg; - node = &(*node)->rb_left; - } else if (cmp > 0) { - if (prev) - *prev = pg; - node = &(*node)->rb_right; - } else { - ret = pg; - node = &(*node)->rb_right; - } + /* bail on the only failure case when the age advances */ + was = atomic64_cmpxchg(&pac->age_marked, old, new); + put_cpu_ptr(cinf->pcpu_age); + if (was != old) + return; + + if (marked < ITEM_AGE_MARK_BATCH) + return; + + /* adding to the global retries unless the age changes */ + marked = add_global_age_marked(cinf, atomic64_read(&pac->age_marked)); + limit = atomic64_read(&cinf->age_total) >> ITEM_AGE_NR_SHIFT; + if (marked < limit) + return; + + age = atomic64_inc_return(&cinf->current_age); + atomic64_set(&cinf->age_marked, age << ITEM_AGE_MARK_SHIFT); + + for_each_online_cpu(cpu) { + atomic64_set(&pac->age_marked, age << ITEM_AGE_MARK_SHIFT); + atomic64_add(atomic64_xchg(&pac->total, 0), &cinf->age_total); } - - if (par) - *par = parent; - if (pnode) - *pnode = node; - - return ret; } -#define for_each_page_safe(root, pg, tmp) \ - for (tmp = rb_first(root); \ - tmp && (pg = container_of(tmp, struct cached_page, node)) && \ - ((tmp = rb_next(tmp)), 1); ) - -static struct cached_item *item_rbtree_walk(struct rb_root *root, - struct scoutfs_key *key, - struct cached_item **next, - struct rb_node **par, - struct rb_node ***pnode) +static void update_age_total(struct item_cache_info *cinf, int upd) { - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct cached_item *ret = NULL; - struct cached_item *item; - int cmp; + struct pcpu_age_counters *pac = get_cpu_ptr(cinf->pcpu_age); - if (next) - *next = NULL; - - while (*node) { - parent = *node; - item = container_of(*node, struct cached_item, node); - - cmp = scoutfs_key_compare(key, &item->key); - if (cmp < 0) { - if (next) - *next = item; - node = &(*node)->rb_left; - } else if (cmp > 0) { - node = &(*node)->rb_right; - } else { - ret = item; - node = &(*node)->rb_left; - } - } - - if (par) - *par = parent; - if (pnode) - *pnode = node; - - return ret; + atomic64_add((s64)upd, &pac->total); + put_cpu_ptr(cinf->pcpu_age); } -#define for_each_item_from_safe(root, item, tmp, key) \ - for (item = item_rbtree_walk(root, key, &tmp, NULL, NULL) ?: tmp; \ - item && ((tmp = next_item(item)), 1); \ - item = tmp) - -#define for_each_item_safe(root, item, tmp) \ - for (tmp = rb_first(root); \ - tmp && (item = container_of(tmp, struct cached_item, node)) && \ - ((tmp = rb_next(tmp)), 1); ) - /* - * As we mark the first and clear the last items in a page, we add or - * delete the page from the dirty list. The caller can give us a page - * to add the newly dirtied page after, rather than at the tail of the - * list. + * Dirty items have a particular usage pattern. Many cpus can be + * creating them at full speed, they're almost never removed, their + * total number is limited by the size of a commit, and they're + * committed while protected from modification. We track the dirty + * items in per-cpu lists to avoid contention. They're later spliced + * and sorted when it's time to write. + * + * We're still using a global atomic for the ease and precision. If it + * becomes a problem we can degrade it to fuzzier use of percpu + * counters. */ static void mark_item_dirty(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg, - struct cached_page *after, struct cached_item *item) { + struct pcpu_dirty_list *pdlist; + int cpu; + if (!item->dirty) { - if (list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_page_mark_dirty); - spin_lock(&cinf->dirty_lock); - if (after) - list_add(&pg->dirty_head, &after->dirty_head); - else - list_add_tail(&pg->dirty_head, - &cinf->dirty_list); - atomic_inc(&cinf->dirty_pages); - spin_unlock(&cinf->dirty_lock); - } + cpu = get_cpu(); + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + spin_lock(&pdlist->lock); + list_add_tail(&item->dirty_head, &pdlist->list); + item->dirty_cpu = cpu; + spin_unlock(&pdlist->lock); + put_cpu(); scoutfs_inc_counter(sb, item_mark_dirty); - list_add_tail(&item->dirty_head, &pg->dirty_list); + atomic64_add(item->alloc_bytes, &cinf->dirty_bytes); item->dirty = 1; } - - update_pg_max_seq(pg, item); } static void clear_item_dirty(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg, struct cached_item *item) { + struct pcpu_dirty_list *pdlist; + if (item->dirty) { - scoutfs_inc_counter(sb, item_clear_dirty); - item->dirty = 0; + pdlist = get_cpu_ptr(cinf->pcpu_dirty); + spin_lock(&pdlist->lock); list_del_init(&item->dirty_head); + item->dirty_cpu = -1; + spin_unlock(&pdlist->lock); + put_cpu_ptr(cinf->pcpu_dirty); - if (list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_page_clear_dirty); - spin_lock(&cinf->dirty_lock); - list_del_init(&pg->dirty_head); - atomic_dec(&cinf->dirty_pages); - spin_unlock(&cinf->dirty_lock); - } + scoutfs_inc_counter(sb, item_clear_dirty); + atomic64_sub(item->alloc_bytes, &cinf->dirty_bytes); + item->dirty = 0; } } -static void erase_page_items(struct cached_page *pg, - struct scoutfs_key *start, - struct scoutfs_key *end) -{ - struct cached_item *item; - struct cached_item *tmp; - - for_each_item_from_safe(&pg->item_root, item, tmp, start) { - - /* only called in unused read regions or read_pages pages */ - BUG_ON(item->dirty); - - if (scoutfs_key_compare(&item->key, end) > 0) - break; - - erase_item(pg, item); - } -} - -/* - * Move all the items starting from the key and stopping before moving - * the stop key. The right destination page must be empty. Items are - * copied in tree order which lets us easily insert after each previous - * item. - * - * This preserves dirty page and item ordering by adding the right page - * to the dirty list after the left page, and by adding items to the - * tail of right's dirty list in key sort order. - * - * The caller is responsible for page locking and managing the lru. - */ -static void move_page_items(struct super_block *sb, - struct item_cache_info *cinf, - struct cached_page *left, - struct cached_page *right, - struct scoutfs_key *key, - struct scoutfs_key *stop) -{ - struct cached_item *from; - struct cached_item *to; - struct cached_item *tmp; - struct rb_node **pnode; - struct rb_node *par; - - /* really empty right destination? */ - BUG_ON(!RB_EMPTY_ROOT(&right->item_root)); - par = NULL; - pnode = &right->item_root.rb_node; - - for_each_item_from_safe(&left->item_root, from, tmp, key) { - - if (stop && scoutfs_key_compare(&from->key, stop) >= 0) - break; - - to = alloc_item(right, &from->key, from->seq, from->deletion, from->val, - from->val_len); - rbtree_insert(&to->node, par, pnode, &right->item_root); - par = &to->node; - pnode = &to->node.rb_right; - - if (from->dirty) { - mark_item_dirty(sb, cinf, right, left, to); - clear_item_dirty(sb, cinf, left, from); - } - - to->persistent = from->persistent; - to->delta = from->delta; - - erase_item(left, from); - } -} - -enum page_intersection_type { - PGI_DISJOINT, - PGI_INSIDE, - PGI_START_OLAP, - PGI_END_OLAP, - PGI_BISECT_NEEDED, - PGI_BISECT, -}; - -/* - * Remove items from the page with intersect with the range. We return - * a code to indicate which kind of intersection occurred. The caller - * provides the right page to move items to if the page is bisected by - * the range. - * - * This modifies the page keys so it needs to be held with a write page - * rbtree lock if the page is in the page rbtree. - */ -static int trim_page_intersection(struct super_block *sb, - struct item_cache_info *cinf, - struct cached_page *pg, - struct cached_page *right, - struct scoutfs_key *start, - struct scoutfs_key *end) -{ - int ps_e = scoutfs_key_compare(&pg->start, end); - int pe_s = scoutfs_key_compare(&pg->end, start); - int ps_s; - int pe_e; - - /* - * page and range don't intersect - * - * ps |----------| pe - * s |----------| e - * (or) - * ps |----------| pe - * s |----------| e - */ - if (ps_e > 0 || pe_s < 0) - return PGI_DISJOINT; - - ps_s = scoutfs_key_compare(&pg->start, start); - pe_e = scoutfs_key_compare(&pg->end, end); - - /* - * page entirely inside range - * - * ps |----------| pe - * s |----------| e - */ - if (ps_s >= 0 && pe_e <= 0) - return PGI_INSIDE; - - /* - * page surrounds range, and is bisected by it - * - * ps |----------| pe - * s |------| e - */ - if (ps_s < 0 && pe_e > 0) { - if (!right) - return PGI_BISECT_NEEDED; - - right->start = *end; - scoutfs_key_inc(&right->start); - right->end = pg->end; - pg->end = *start; - scoutfs_key_dec(&pg->end); - erase_page_items(pg, start, end); - move_page_items(sb, cinf, pg, right, &right->start, NULL); - return PGI_BISECT; - } - - /* - * start of page overlaps with range - * - * ps |----------| pe - * s |----------| e - */ - if (pe_e > 0) { - /* start of page overlaps range */ - pg->start = *end; - scoutfs_key_inc(&pg->start); - erase_page_items(pg, start, end); - return PGI_START_OLAP; - } - - /* - * end of page overlaps with range - * - * ps |----------| pe - * s |----------| e - */ - pg->end = *start; - scoutfs_key_dec(&pg->end); - erase_page_items(pg, start, end); - return PGI_END_OLAP; -} - -/* - * The caller wants to allocate an item in the page but there isn't room - * at the page_off. If erasing items has left sufficient internal free - * space we can pack the existing items to the start of the page to make - * room for the insertion. - * - * The caller's empty pg is only used for its page struct, which we swap - * with our old empty page. We don't touch its pg struct. - * - * This is a coarse bulk way of dealing with free space, as opposed to - * specifically tracking internal free regions and using them to satisfy - * item allocations. - */ -static void compact_page_items(struct super_block *sb, - struct cached_page *pg, - struct cached_page *empty) -{ - struct cached_item *from; - struct cached_item *to; - struct rb_root item_root = RB_ROOT; - struct rb_node *par = NULL; - struct rb_node **pnode = &item_root.rb_node; - unsigned int page_off = 0; - LIST_HEAD(dirty_list); - - if (pg->erased_bytes < item_val_bytes(SCOUTFS_MAX_VAL_SIZE)) - return; - - if (WARN_ON_ONCE(empty->page_off != 0) || - WARN_ON_ONCE(!RB_EMPTY_ROOT(&empty->item_root)) || - WARN_ON_ONCE(!list_empty(&empty->dirty_list))) - return; - - scoutfs_inc_counter(sb, item_page_compact); - - for (from = first_item(&pg->item_root); from; from = next_item(from)) { - to = page_address(empty->page) + page_off; - page_off += item_val_bytes(from->val_len); - - /* copy the entire item, struct members and all */ - memcpy(to, from, item_val_bytes(from->val_len)); - - rbtree_insert(&to->node, par, pnode, &item_root); - par = &to->node; - pnode = &to->node.rb_right; - - if (to->dirty) - list_add_tail(&to->dirty_head, &dirty_list); - } - - pg->item_root = item_root; - list_replace(&dirty_list, &pg->dirty_list); - swap(pg->page, empty->page); - pg->page_off = page_off; - pg->erased_bytes = 0; -} - -/* - * This behaves a little differently than the other walks because we - * want to minimize compares and there are only simple searching and - * inserting callers. - */ -static struct pcpu_page_ref *pcpu_page_rbtree_walk(struct rb_root *root, - struct scoutfs_key *key, - struct pcpu_page_ref *ins) -{ - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct pcpu_page_ref *ret = NULL; - struct pcpu_page_ref *ref; - int cmp; - - while (*node) { - parent = *node; - ref = container_of(*node, struct pcpu_page_ref, node); - - cmp = scoutfs_key_compare_ranges(key, key, - &ref->start, &ref->end); - if (cmp < 0) { - node = &(*node)->rb_left; - } else if (cmp > 0) { - node = &(*node)->rb_right; - } else { - ret = ref; - if (!ins) - return ret; - node = &(*node)->rb_right; - } - } - - if (ins) - rbtree_insert(&ins->node, parent, node, root); - - return ret; -} - -/* - * Search the per-cpu page references for a page that contains the key - * the caller needs. These lookups are very frequent and key - * comparisons are relatively expensive, so we use an rbtree to decrease - * the comparison costs, particularly of misses. - * - * All the references in all the cpus go stale as page key boundaries - * are modified by reading, insertion, and invalidation. If we find a - * stale ref we will drop it, but otherwise we let stale refs age out as - * new refs are inserted. - */ -static struct cached_page *get_pcpu_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_key *key, - bool write) -{ - struct item_percpu_pages *pages = get_cpu_ptr(cinf->pcpu_pages); - struct cached_page *pg = NULL; - struct pcpu_page_ref *ref; - - ref = pcpu_page_rbtree_walk(&pages->root, key, NULL); - if (ref) { - pg = ref->pg; - if (write) - write_lock(&pg->rwlock); - else - read_lock(&pg->rwlock); - - if (scoutfs_key_compare_ranges(key, key, - &pg->start, &pg->end)) { - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - - scoutfs_inc_counter(sb, item_pcpu_page_miss_keys); - rbtree_erase(&ref->node, &pages->root); - list_move_tail(&ref->head, &pages->list); - put_pg(sb, pg); - ref->pg = NULL; - pg = NULL; - } else { - if (pages->list.next != &ref->head) - list_move(&ref->head, &pages->list); - __release(pg_rwlock); - } - } - - put_cpu_ptr(cinf->pcpu_pages); - - if (pg) - scoutfs_inc_counter(sb, item_pcpu_page_hit); - else - scoutfs_inc_counter(sb, item_pcpu_page_miss); - - return pg; -} - -/* - * The caller has a locked page that it knows is authoritative for its - * range of keys. Add it to this cpu's cache and remove any other page - * in the pool which intersects with its range. - */ -static void add_pcpu_page(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - struct item_percpu_pages *pages = get_cpu_ptr(cinf->pcpu_pages); - struct pcpu_page_ref *old; - struct pcpu_page_ref *ref; - - ref = list_last_entry(&pages->list, struct pcpu_page_ref, head); - if (ref->pg) { - rbtree_erase(&ref->node, &pages->root); - put_pg(sb, ref->pg); - } - ref->start = pg->start; - ref->end = pg->end; - ref->pg = pg; - get_pg(pg); - - list_move(&ref->head, &pages->list); - - old = pcpu_page_rbtree_walk(&pages->root, &ref->end, ref); - if (old) { - scoutfs_inc_counter(sb, item_pcpu_add_replaced); - rbtree_erase(&old->node, &pages->root); - list_move_tail(&old->head, &pages->list); - put_pg(sb, old->pg); - old->pg = NULL; - } - - put_cpu_ptr(cinf->pcpu_pages); -} - -/* - * If a page is removed from the page rbtree we clear its keys so that percpu - * references won't use the page and will drop their reference. Must be - * called with a write page rwlock. - */ -static void invalidate_pcpu_page(struct cached_page *pg) -{ - scoutfs_key_set_zeros(&pg->start); - scoutfs_key_set_zeros(&pg->end); -} - -static void init_pcpu_pages(struct item_cache_info *cinf, int cpu) -{ - struct item_percpu_pages *pages = per_cpu_ptr(cinf->pcpu_pages, cpu); - struct pcpu_page_ref *ref; - int i; - - pages->root = RB_ROOT; - INIT_LIST_HEAD(&pages->list); - - for (i = 0; i < ARRAY_SIZE(pages->refs); i++) { - ref = &pages->refs[i]; - - ref->pg = NULL; - list_add_tail(&ref->head, &pages->list); - } -} - -static void drop_pcpu_pages(struct super_block *sb, - struct item_cache_info *cinf, int cpu) -{ - struct item_percpu_pages *pages = per_cpu_ptr(cinf->pcpu_pages, cpu); - struct pcpu_page_ref *ref; - int i; - - for (i = 0; i < ARRAY_SIZE(pages->refs); i++) { - ref = &pages->refs[i]; - - if (ref->pg) - put_pg(sb, ref->pg); - ref->pg = NULL; - } - - pages->root = RB_ROOT; -} - -/* - * Set the keys of the destination pages of a split. We try to find the - * key which balances the space consumed by items in the resulting split - * pages. We move the split key to the right, setting the left end by - * decrementing that key. We bias towards advancing the left item first - * so that we don't use it and possibly decrementing the starting page - * key. We can't have a page that covers a single key. Callers of - * split should have tried compacting which ensures that if we split we - * must have multiple items, even if they all have the max value length. - */ -static void set_split_keys(struct cached_page *pg, struct cached_page *left, - struct cached_page *right) -{ - struct cached_item *left_item = first_item(&pg->item_root); - struct cached_item *right_item = last_item(&pg->item_root); - struct cached_item *mid; - int left_tot = 0; - int right_tot = 0; - - BUILD_BUG_ON((PAGE_SIZE / SCOUTFS_MAX_VAL_SIZE) < 4); - BUG_ON(scoutfs_key_compare(&pg->start, &pg->end) > 0); - BUG_ON(left_item == NULL); - BUG_ON(right_item == NULL); - BUG_ON(left_item == right_item); - - while (left_item && right_item && left_item != right_item) { - if (left_tot <= right_tot) { - left_tot += item_val_bytes(left_item->val_len); - left_item = next_item(left_item); - } else { - right_tot += item_val_bytes(right_item->val_len); - right_item = prev_item(right_item); - } - } - - mid = left_item ?: right_item; - - left->start = pg->start; - left->end = mid->key; - scoutfs_key_dec(&left->end); - right->start = mid->key; - right->end = pg->end; -} - -/* - * The caller found a page that didn't have room for the item they - * wanted to allocate. We allocate pages for the split and see if the - * page still needs splitting once we've locked it. - * - * To modify page keys we need a write lock on the page rbtree, which - * globally prevents reads from finding pages. We want to minimize this - * so we add empty pages with the split ranges to the rbtree and then - * perform the item motion only with the page locks held. This will - * exclude any users of the items in the affected range. - */ -static int try_split_page(struct super_block *sb, struct item_cache_info *cinf, - struct scoutfs_key *key, int val_len) -{ - struct cached_page *right; - struct cached_page *left; - struct cached_page *pg; - struct cached_item *item; - struct rb_node **pnode; - struct rb_node *par; - int ret; - - left = alloc_pg(sb, 0); - right = alloc_pg(sb, 0); - if (!left || !right) { - ret = -ENOMEM; - goto out; - } - - write_lock(&cinf->rwlock); - - pg = page_rbtree_walk(sb, &cinf->pg_root, key, key, NULL, NULL, - &par, &pnode); - if (pg == NULL) { - write_unlock(&cinf->rwlock); - ret = 0; - goto out; - } - - write_lock(&pg->rwlock); - - if (!page_has_room(pg, val_len)) - compact_page_items(sb, pg, left); - - if (page_has_room(pg, val_len)) { - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - ret = 0; - goto out; - } - - /* special case adding an empty page when key is after the last item */ - item = last_item(&pg->item_root); - if (scoutfs_key_compare(key, &item->key) > 0) { - right->start = *key; - right->end = pg->end; - pg->end = *key; - scoutfs_key_dec(&pg->end); - - write_trylock_will_succeed(&right->rwlock); - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, right); - - /* adding right first removes pg */ - add_pcpu_page(sb, cinf, right); - add_pcpu_page(sb, cinf, pg); - - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - write_unlock(&right->rwlock); - right = NULL; - ret = 0; - goto out; - } - - scoutfs_inc_counter(sb, item_page_split); - - /* pages are still private, tylock will succeed */ - write_trylock_will_succeed(&left->rwlock); - write_trylock_will_succeed(&right->rwlock); - - set_split_keys(pg, left, right); - - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - rbtree_replace_node(&pg->node, &left->node, &cinf->pg_root); - lru_remove(sb, cinf, pg); - - write_unlock(&cinf->rwlock); - - /* move items while only holding page locks, visible once unlocked */ - move_page_items(sb, cinf, pg, left, &left->start, &right->start); - lru_accessed(sb, cinf, left); - add_pcpu_page(sb, cinf, left); - write_unlock(&left->rwlock); - left = NULL; - - move_page_items(sb, cinf, pg, right, &right->start, NULL); - lru_accessed(sb, cinf, right); - add_pcpu_page(sb, cinf, right); - write_unlock(&right->rwlock); - right = NULL; - - /* and drop the source page, it was replaced above */ - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - put_pg(sb, pg); - - ret = 0; -out: - put_pg(sb, left); - put_pg(sb, right); - return ret; -} - -/* - * The caller has a write-only cluster lock and wants to populate the - * cache so that it can insert an item without reading. They found a - * hole but unlocked so we check again under the lock after allocating. - * We insert an empty page that covers the key and extends to either the - * neighbours or the caller's (lock's) range. - */ -static int cache_empty_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_key *key, struct scoutfs_key *start, - struct scoutfs_key *end) -{ - struct cached_page *prev; - struct cached_page *next; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; - - pg = alloc_pg(sb, 0); - if (!pg) - return -ENOMEM; - - write_lock(&cinf->rwlock); - - if (!page_rbtree_walk(sb, &cinf->pg_root, key, key, &prev, &next, - &par, &pnode)) { - pg->start = *start; - if (prev && scoutfs_key_compare(&prev->end, start) > 0) { - pg->start = prev->end; - scoutfs_key_inc(&pg->start); - } - - pg->end = *end; - if (next && scoutfs_key_compare(&next->start, end) < 0) { - pg->end = next->start; - scoutfs_key_dec(&pg->end); - } - - rbtree_insert(&pg->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, pg); - pg = NULL; - } - - write_unlock(&cinf->rwlock); - - put_pg(sb, pg); - - return 0; -} - /* * Readers operate independently from dirty items and transactions. * They read a set of persistent items and insert them into the cache @@ -1345,316 +432,241 @@ static void del_active_reader(struct item_cache_info *cinf, struct active_reader } /* - * Add a newly read item to the pages that we're assembling for - * insertion into the cache. These pages are private, they only exist - * on our root and aren't in dirty or lru lists. - * - * We need to store deletion items here as we read items from all the - * btrees so that they can override older items. The deletion items - * will be deleted before we insert the pages into the cache. We don't - * insert old versions of items into the tree here so that the trees - * don't have to compare seqs. + * Returns true if a direct item search ends in a cached region. We're + * only searching for one key so if we find it then its cached and can + * ignore the previous item. */ -static int read_page_item(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, - void *val, int val_len, int fic, void *arg) +static bool item_lookup_is_cached(int cmp, struct cached_item *prev) +{ + return cmp == 0 || (prev && !prev->hole_after); +} + +/* + * Returns true if an item search within a range only traversed cached + * regions. Once we're searching a range we can be advancing search + * keys and we need to know if the search we just performed was inside + * the range, or not. If we hit an item at the key inside the range + * but prev indicates a hole then we skipped over unknown uncached keys + * and we can't use the next item. + */ +static bool item_next_is_cached(bool first, int cmp, struct cached_item *prev) +{ + return (first && cmp == 0) || (prev && !prev->hole_after); +} + +/* The item is positive and is visible in the cache */ +static bool item_is_positive(struct cached_item *item) +{ + return item && !item->deletion && !item->negative; +} + +/* + * Track read items in a private list. Newer versions of items replace + * older. We keep deletion items here so that they replace older + * non-deletion items. Deletion items and items that are outside of + * the eventual range of keys read from all trees are dropped before + * being inserted. + */ +static int item_reader(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, + void *val, int val_len, int fic, void *arg) { - DECLARE_ITEM_CACHE_INFO(sb, cinf); const bool deletion = !!(flags & SCOUTFS_ITEM_FLAG_DELETION); - struct rb_root *root = arg; - struct cached_page *right = NULL; - struct cached_page *left = NULL; - struct cached_page *pg; + struct scoutfs_cwskip_root *root = arg; + struct scoutfs_cwskip_writer wr; struct cached_item *found; struct cached_item *item; - struct rb_node *p_par; - struct rb_node *par; - struct rb_node **p_pnode; - struct rb_node **pnode; + int cmp; - pg = page_rbtree_walk(sb, root, key, key, NULL, NULL, &p_par, &p_pnode); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (found && (found->seq >= seq)) - return 0; + item = alloc_item(sb, key, seq, deletion, val, val_len); + if (!item) + return -ENOMEM; - if (!page_has_room(pg, val_len)) { - left = alloc_pg(sb, 0); - /* split needs multiple items, sparse may not have enough */ - if (!left) - return -ENOMEM; + scoutfs_cwskip_write_begin(root, key, item->node->height, + NULL, (void **)&found, &cmp, &wr); - compact_page_items(sb, pg, left); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, - &pnode); + if (cmp == 0 && (found->seq < seq)) { + /* remove existing if it's older */ + scoutfs_cwskip_write_remove(&wr, found->node); + call_free_item(sb, found); } - item = alloc_item(pg, key, seq, deletion, val, val_len); - if (!item) { - /* simpler split of private pages, no locking/dirty/lru */ - if (!left) - left = alloc_pg(sb, 0); - right = alloc_pg(sb, 0); - if (!left || !right) { - put_pg(sb, left); - put_pg(sb, right); - return -ENOMEM; - } - - scoutfs_inc_counter(sb, item_read_pages_split); - - set_split_keys(pg, left, right); - rbtree_insert(&right->node, p_par, p_pnode, root); - rbtree_replace_node(&pg->node, &left->node, root); - move_page_items(sb, cinf, pg, left, - &left->start, &right->start); - move_page_items(sb, cinf, pg, right, &right->start, NULL); - put_pg(sb, pg); - - pg = scoutfs_key_compare(key, &left->end) <= 0 ? left : right; - item = alloc_item(pg, key, seq, deletion, val, val_len); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, - &pnode); - - left = NULL; - right = NULL; + if (cmp != 0 || (found->seq < seq)) { + /* insert read if first or newer */ + item->persistent = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + item = NULL; } - /* if deleted a deletion item will be required */ - item->persistent = 1; + scoutfs_cwskip_write_end(&wr); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - if (found) - erase_item(pg, found); + kfree(item); + return 0; +} - put_pg(sb, left); - put_pg(sb, right); +static int insert_missing_negative(struct super_block *sb, struct scoutfs_cwskip_root *root, + struct scoutfs_key *key) +{ + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + int cmp; + + item = alloc_item(sb, key, 0, false, NULL, 0); + if (!item) + return -ENOMEM; + + scoutfs_cwskip_write_begin(root, key, item->node->height, NULL, NULL, &cmp, &wr); + if (cmp != 0) { + item->negative = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + item = NULL; + } + scoutfs_cwskip_write_end(&wr); + + kfree(item); return 0; } /* - * The caller couldn't find a page that contains the key we're looking - * for. We combine a block's worth of items around the key in all the - * forest btrees and store them in pages. After filtering out deletions - * and duplicates, we insert any resulting pages which don't overlap - * with existing cached pages. + * Read items from persistent btrees and populate the cache around the + * key. * - * We only insert uncached regions because this is called with cluster - * locks held, but without locking the cache. The regions we read can - * be stale with respect to the current cache, which can be read and - * dirtied by other cluster lock holders on our node, but the cluster - * locks protect the stable items we read. Invalidation is careful not - * to drop pages that have items that we couldn't see because they were - * dirty when we started reading. + * The caller holds cluster locks which ensure that the persistent items + * aren't changing. The currently cached items might be dirty and more + * recent than the persistent items. We only insert read items into + * holes in the cache. * - * The forest item reader is reading stable trees that could be - * overwritten. It can return -ESTALE which we return to the caller who - * will retry the operation and work with a new set of more recent - * btrees. + * We read a single full block of items around the key from each btree. + * The intersection of these read key ranges is the range of consistent + * items that can be cached. Any items read outside of this range might + * be stale because their keys weren't read from all the btrees. We + * drop all the read items outside of the consistent range. + * + * The consistent key range can extend outside of the set of items read + * inside the range. We add negative cached items to mark the + * boundaries of the consistent range if we didn't read items right at + * the edges. + * + * Once we have a set of read items that covers the entire range we try + * to insert them into the cache. For each read item we iterate + * through cached items until we find the two cached items around it. + * If the read item falls in a hole in the cache then we insert it. We + * iterate over all cached items in the range, rather than just + * searching for the position of each read item, because we may need to + * clear hole_after between cached items. + * + * This is racing with all operations on the cache: item api calls, + * other readers, memory pressure, and lock invalidation. We are very + * careful to only atomically modify the cache one locked item pair at a + * time to ensure that cache is always consistent. */ -static int read_pages(struct super_block *sb, struct item_cache_info *cinf, +static int read_items(struct super_block *sb, struct item_cache_info *cinf, struct scoutfs_key *key, struct scoutfs_lock *lock) { - struct rb_root root = RB_ROOT; + struct scoutfs_cwskip_root root; INIT_ACTIVE_READER(active); - struct cached_page *right = NULL; - struct cached_page *pg; - struct cached_page *rd; + struct scoutfs_cwskip_writer cached_wr; + struct scoutfs_cwskip_writer wr; + struct cached_item *cached_prev; + struct cached_item *cached_item; struct cached_item *item; struct scoutfs_key start; struct scoutfs_key end; - struct scoutfs_key inf; - struct scoutfs_key edge; - struct rb_node **pnode; - struct rb_node *par; - struct rb_node *pg_tmp; - struct rb_node *item_tmp; - int pgi; + struct scoutfs_key pos; + bool drop_before; + bool drop_after; + bool first; + int cmp; int ret; - /* start with an empty page that covers the whole lock */ - pg = alloc_pg(sb, 0); - if (!pg) { - ret = -ENOMEM; - goto out; - } - pg->start = lock->start; - pg->end = lock->end; - rbtree_insert(&pg->node, NULL, &root.rb_node, &root); + /* read into an empty private root */ + scoutfs_cwskip_init_root(&root, key_item_cmp, sizeof(struct cached_item)); /* set active reader seq before reading persistent roots */ add_active_reader(sb, &active); start = lock->start; end = lock->end; - ret = scoutfs_forest_read_items(sb, key, &lock->start, &start, &end, read_page_item, &root); + ret = scoutfs_forest_read_items(sb, key, &lock->start, &start, &end, item_reader, &root); if (ret < 0) goto out; - /* clean up our read items and pages before locking */ - for_each_page_safe(&root, pg, pg_tmp) { + /* drop deleted items and items outside of the final consistent read range */ + drop_before = true; + drop_after = false; + scoutfs_cwskip_write_begin(&root, &lock->start, SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + do { + if (drop_before && scoutfs_key_compare(&item->key, &start) >= 0) + drop_before = false; + if (!drop_before && !drop_after && scoutfs_key_compare(&item->key, &end) > 0) + drop_after = true; - /* trim any items we read outside the read range */ - scoutfs_key_set_zeros(&inf); - edge = start; - scoutfs_key_dec(&edge); - pgi = trim_page_intersection(sb, cinf, pg, NULL, &inf, &edge); - if (pgi != PGI_INSIDE) { - scoutfs_key_set_ones(&inf); - edge = end; - scoutfs_key_inc(&edge); - pgi = trim_page_intersection(sb, cinf, pg, NULL, &edge, - &inf); - } - if (pgi == PGI_INSIDE) { - rbtree_erase(&pg->node, &root); - put_pg(sb, pg); - continue; + if (drop_before || item->deletion || drop_after) { + scoutfs_cwskip_write_remove(&wr, item->node); + call_free_item(sb, item); } - /* drop deletion items, we don't need them in the cache */ - for_each_item_safe(&pg->item_root, item, item_tmp) { - if (item->deletion) - erase_item(pg, item); - } - } + } while (scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); -retry: - write_lock(&cinf->rwlock); + /* add negative items at the ends of the range if needed */ + ret = insert_missing_negative(sb, &root, &start) ?: + insert_missing_negative(sb, &root, &end); + if (ret < 0) + goto out; - while ((rd = first_page(&root))) { + /* lock max height on our private list so _next always succeeds */ + pos = start; + first = true; + scoutfs_cwskip_write_begin(&root, &start, SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &pos, item->node->height, + (void **)&cached_prev, (void **)&cached_item, + NULL, &cached_wr); + do { + if (cached_item) + cmp = scoutfs_key_compare(&item->key, &cached_item->key); + else + cmp = -1; - pg = page_rbtree_walk(sb, &cinf->pg_root, &rd->start, &rd->end, - NULL, NULL, &par, &pnode); - if (!pg) { - /* insert read pages that don't intersect */ - rbtree_erase(&rd->node, &root); - rbtree_insert(&rd->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, rd); - trace_scoutfs_item_read_page(sb, key, &rd->start, - &rd->end); - continue; - } + if (cmp <= 0) { + /* check read item once its between cached items */ + scoutfs_cwskip_write_remove(&wr, item->node); - pgi = trim_page_intersection(sb, cinf, rd, right, &pg->start, - &pg->end); - if (pgi == PGI_INSIDE) { - rbtree_erase(&rd->node, &root); - put_pg(sb, rd); + /* insert into holes or drop and free */ + if (cmp < 0 && (!cached_prev || cached_prev->hole_after)) { + item->hole_after = 1; + scoutfs_cwskip_write_insert(&cached_wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_age(cinf, item); + } else { + call_free_item(sb, item); + } - } else if (pgi == PGI_BISECT_NEEDED) { - write_unlock(&cinf->rwlock); - right = alloc_pg(sb, 0); - if (!right) { - ret = -ENOMEM; - goto out; + /* always succeeds for our private list */ + scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item); } - goto retry; - } else if (pgi == PGI_BISECT) { - page_rbtree_walk(sb, &root, &right->start, &right->end, - NULL, NULL, &par, &pnode); - rbtree_insert(&right->node, par, pnode, &root); - right = NULL; - } - } + /* gaps after all cached prevs except the first are in the read range */ + if (!first && cached_prev && cached_prev->hole_after) + cached_prev->hole_after = 0; + first = false; - write_unlock(&cinf->rwlock); + pos = cached_item->key; + scoutfs_key_inc(&pos); + + } while (item && scoutfs_cwskip_write_next(&cached_wr, item->node->height, + (void **)&cached_prev, + (void **)&cached_item)); + scoutfs_cwskip_write_end(&cached_wr); + + } while (item); + scoutfs_cwskip_write_end(&wr); ret = 0; out: del_active_reader(cinf, &active); - - /* free any pages we left dangling on error */ - for_each_page_safe(&root, rd, pg_tmp) { - rbtree_erase(&rd->node, &root); - put_pg(sb, rd); - } - - put_pg(sb, right); - - return ret; -} - -/* - * Get a locked cached page for the caller to work with. This populates - * the cache on misses and can ensure that the locked page has enough - * room for an item allocation for the caller. Unfortunately, sparse - * doesn't seem to deal very well with the pattern of conditional lock - * acquisition. Callers manually add __acquire. - */ -static int get_cached_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_lock *lock, struct scoutfs_key *key, - bool write, bool alloc, int val_len, - struct cached_page **pg_ret) -{ - struct cached_page *pg = NULL; - struct rb_node **pnode; - struct rb_node *par; - int ret; - - if (WARN_ON_ONCE(alloc && !write)) - return -EINVAL; - - pg = get_pcpu_page(sb, cinf, key, write); - if (pg) { - __acquire(pg->rwlock); - if (!alloc || page_has_room(pg, val_len)) - goto found; - - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - pg = NULL; - } - -retry: - read_lock(&cinf->rwlock); - - pg = page_rbtree_walk(sb, &cinf->pg_root, key, key, NULL, NULL, - &par, &pnode); - if (pg == NULL) { - read_unlock(&cinf->rwlock); - if (lock->mode == SCOUTFS_LOCK_WRITE_ONLY) - ret = cache_empty_page(sb, cinf, key, &lock->start, - &lock->end); - else - ret = read_pages(sb, cinf, key, lock); - if (ret < 0 && ret != -ESTALE) - goto out; - goto retry; - } - - if (write) - write_lock(&pg->rwlock); - else - read_lock(&pg->rwlock); - - if (alloc && !page_has_room(pg, val_len)) { - read_unlock(&cinf->rwlock); - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - - ret = try_split_page(sb, cinf, key, val_len); - if (ret < 0) - goto out; - goto retry; - } - - read_unlock(&cinf->rwlock); - - add_pcpu_page(sb, cinf, pg); -found: - __release(pg_rwlock); - lru_accessed(sb, cinf, pg); - ret = 0; -out: - if (ret < 0) - *pg_ret = NULL; - else - *pg_ret = pg; return ret; } @@ -1692,8 +704,11 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; + struct cached_item *prev; struct cached_item *item; - struct cached_page *pg; + bool valid; + int cmp; int ret; scoutfs_inc_counter(sb, item_lookup); @@ -1701,18 +716,24 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key, if ((ret = lock_safe(lock, key, SCOUTFS_LOCK_READ))) goto out; - ret = get_cached_page(sb, cinf, lock, key, false, false, 0, &pg); - if (ret < 0) - goto out; - __acquire(&pg->rwlock); + do { + scoutfs_cwskip_read_begin(&cinf->item_root, key, + (void **)&prev, (void **)&item, &cmp, &rd); - item = item_rbtree_walk(&pg->item_root, key, NULL, NULL, NULL); - if (!item || item->deletion) - ret = -ENOENT; - else - ret = copy_val(val, val_len, item->val, item->val_len); + if (!item_lookup_is_cached(cmp, prev)) + ret = -ERANGE; + else if (cmp != 0 || !item_is_positive(item)) + ret = -ENOENT; + else + ret = copy_val(val, val_len, item->val, item->val_len); + + valid = scoutfs_cwskip_read_valid(&rd); + if (valid && item) + mark_item_age(cinf, item); + + scoutfs_cwskip_read_end(&rd); + } while (!valid || (ret == -ERANGE || (ret = read_items(sb, cinf, key, lock)) == 0)); - read_unlock(&pg->rwlock); out: return ret; } @@ -1756,10 +777,13 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; struct cached_item *item; - struct cached_item *next; - struct cached_page *pg = NULL; + struct cached_item *prev; struct scoutfs_key pos; + struct scoutfs_key tmp; + bool first; + int cmp; int ret; scoutfs_inc_counter(sb, item_next); @@ -1768,50 +792,56 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, if (scoutfs_key_compare(&lock->end, last) < 0) last = &lock->end; - if (scoutfs_key_compare(key, last) > 0) { - ret = -ENOENT; - goto out; - } + if (scoutfs_key_compare(key, last) > 0) + return -ENOENT; if ((ret = lock_safe(lock, key, SCOUTFS_LOCK_READ))) - goto out; + return ret; + first = true; pos = *key; + do { + scoutfs_cwskip_read_begin(&cinf->item_root, &pos, + (void **)&prev, (void **)&item, &cmp, &rd); + do { + if (!item_next_is_cached(first, cmp, prev)) { + ret = -ERANGE; - for (;;) { - ret = get_cached_page(sb, cinf, lock, &pos, false, false, 0, - &pg); - if (ret < 0) - goto out; - __acquire(&pg->rwlock); + } else if (!item || scoutfs_key_compare(&item->key, last) > 0) { + ret = -ENOENT; - item = item_rbtree_walk(&pg->item_root, &pos, &next, - NULL, NULL) ?: next; - while (item && scoutfs_key_compare(&item->key, last) <= 0) { - if (!item->deletion) { - *key = item->key; - ret = copy_val(val, val_len, item->val, - item->val_len); - goto unlock; + } else if (item_is_positive(item)) { + ret = copy_val(val, val_len, item->val, item->val_len); + tmp = item->key; + + } else { + tmp = item->key; + scoutfs_key_inc(&tmp); + ret = -ESRCH; } - item = next_item(item); + if (scoutfs_cwskip_read_valid(&rd)) { + pos = tmp; + first = false; + if (ret != -ESRCH && item) + mark_item_age(cinf, item); + } else { + ret = -ESRCH; + } + } while (ret == -ESRCH && + scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); + + if (ret == -ERANGE) { + ret = read_items(sb, cinf, &pos, lock); + if (ret == 0) + ret = -ESRCH; } - if (scoutfs_key_compare(&pg->end, last) >= 0) { - ret = -ENOENT; - goto unlock; - } + } while(ret == -ESRCH); - pos = pg->end; - read_unlock(&pg->rwlock); - - scoutfs_key_inc(&pos); - } - -unlock: - read_unlock(&pg->rwlock); -out: + if (ret >= 0) + *key = pos; return ret; } @@ -1833,16 +863,17 @@ static u64 item_seq(struct super_block *sb, struct scoutfs_lock *lock) /* * Mark the item dirty. Dirtying while holding a transaction pins the - * page holding the item and guarantees that the item can be deleted or - * updated (without increasing the value length) during the transaction - * without errors. + * item and guarantees that the item can be deleted or updated (without + * increasing the value length) during the transaction without errors. */ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_writer wr; struct cached_item *item; - struct cached_page *pg; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_dirty); @@ -1854,21 +885,22 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, false, 0, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, key, 0, + (void **)&prev, (void **)&item, &cmp, &wr); + if (!item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (cmp != 0 || !item_is_positive(item)) { + ret = -ENOENT; + } else { + item->seq = item_seq(sb, lock); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + ret = 0; + } + scoutfs_cwskip_write_end(&wr); + } while (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0)); - item = item_rbtree_walk(&pg->item_root, key, NULL, NULL, NULL); - if (!item || item->deletion) { - ret = -ENOENT; - } else { - item->seq = item_seq(sb, lock); - mark_item_dirty(sb, cinf, pg, NULL, item); - ret = 0; - } - - write_unlock(&pg->rwlock); out: return ret; } @@ -1884,11 +916,11 @@ static int item_create(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; struct cached_item *found; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_create); @@ -1900,33 +932,43 @@ static int item_create(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) + item = alloc_item(sb, key, seq, false, val, val_len); + if (!item) { + ret = -ENOMEM; goto out; - __acquire(pg->rwlock); - - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!force && found && !found->deletion) { - ret = -EEXIST; - goto unlock; } - item = alloc_item(pg, key, seq, false, val, val_len); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, key, item->node->height, + (void **)&prev, (void **)&found, &cmp, &wr); + if (!force && !item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (!force && cmp == 0 && item_is_positive(found)) { + ret = -EEXIST; + } else { + if (found) { + item->persistent = found->persistent; + clear_item_dirty(sb, cinf, found); + scoutfs_cwskip_write_remove(&wr, found->node); + update_age_total(cinf, -found->alloc_bytes); + call_free_item(sb, found); + } - if (found) { - item->persistent = found->persistent; - clear_item_dirty(sb, cinf, pg, found); - erase_item(pg, found); - } + if (force) + item->persistent = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + item = NULL; - if (force) - item->persistent = 1; + ret = 0; + } + scoutfs_cwskip_write_end(&wr); - ret = 0; -unlock: - write_unlock(&pg->rwlock); + } while (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0)); + + kfree(item); out: return ret; } @@ -1934,16 +976,14 @@ out: int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { - return item_create(sb, key, val, val_len, lock, - SCOUTFS_LOCK_READ, false); + return item_create(sb, key, val, val_len, lock, SCOUTFS_LOCK_READ, false); } int scoutfs_item_create_force(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { - return item_create(sb, key, val, val_len, lock, - SCOUTFS_LOCK_WRITE_ONLY, true); + return item_create(sb, key, val, val_len, lock, SCOUTFS_LOCK_WRITE_ONLY, true); } /* @@ -1957,11 +997,13 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); - struct cached_item *item; + struct scoutfs_cwskip_writer wr; + struct cached_item *item = NULL; struct cached_item *found; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + bool need_alloc = false; + int lock_height; + int cmp; int ret; scoutfs_inc_counter(sb, item_update); @@ -1973,39 +1015,56 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); - - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!found || found->deletion) { - ret = -ENOENT; - goto unlock; - } - - if (val_len <= found->val_len) { - if (val_len) - memcpy(found->val, val, val_len); - if (val_len < found->val_len) - pg->erased_bytes += item_val_bytes(found->val_len) - - item_val_bytes(val_len); - found->val_len = val_len; - found->seq = seq; - mark_item_dirty(sb, cinf, pg, NULL, found); - } else { - item = alloc_item(pg, key, seq, false, val, val_len); - item->persistent = found->persistent; - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); - - clear_item_dirty(sb, cinf, pg, found); - erase_item(pg, found); - } - ret = 0; -unlock: - write_unlock(&pg->rwlock); + do { + if (need_alloc && !item) { + item = alloc_item(sb, key, seq, false, val, val_len); + if (!item) { + ret = -ENOMEM; + break; + } + lock_height = item->node->height; + } else { + lock_height = 0; + } + + scoutfs_cwskip_write_begin(&cinf->item_root, key, lock_height, + (void **)&prev, (void **)&found, &cmp, &wr); + if (!item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (cmp != 0 || !item_is_positive(found)) { + ret = -ENOENT; + } else { + if (val_len <= found->val_len) { + if (val_len) + memcpy(found->val, val, val_len); + found->val_len = val_len; + found->seq = seq; + mark_item_dirty(sb, cinf, found); + mark_item_age(cinf, item); + } else if (!item) { + need_alloc = true; + } else { + item->persistent = found->persistent; + + clear_item_dirty(sb, cinf, found); + scoutfs_cwskip_write_remove(&wr, found->node); + update_age_total(cinf, -found->alloc_bytes); + call_free_item(sb, found); + + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + item = NULL; + } + ret = 0; + } + scoutfs_cwskip_write_end(&wr); + + } while (need_alloc || (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0))); + + kfree(item); out: return ret; } @@ -2022,10 +1081,11 @@ int scoutfs_item_delta(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; + struct cached_item *alloc = NULL; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_delta); @@ -2037,47 +1097,56 @@ int scoutfs_item_delta(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) + alloc = alloc_item(sb, key, seq, false, val, val_len); + if (!alloc) { + ret = -ENOMEM; goto out; - __acquire(pg->rwlock); + } - item = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (item) { + scoutfs_cwskip_write_begin(&cinf->item_root, key, alloc->node->height, + (void **)&prev, (void **)&item, &cmp, &wr); + if (cmp == 0) { if (!item->delta) { ret = -EIO; - goto unlock; + goto end; } ret = scoutfs_forest_combine_deltas(key, item->val, item->val_len, val, val_len); if (ret <= 0) { if (ret == 0) ret = -EIO; - goto unlock; + goto end; } if (ret == SCOUTFS_DELTA_COMBINED) { item->seq = seq; - mark_item_dirty(sb, cinf, pg, NULL, item); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); } else if (ret == SCOUTFS_DELTA_COMBINED_NULL) { - clear_item_dirty(sb, cinf, pg, item); - erase_item(pg, item); + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); } else { ret = -EIO; - goto unlock; + goto end; } ret = 0; } else { - item = alloc_item(pg, key, seq, false, val, val_len); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); + item = alloc; + alloc = NULL; + + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); item->delta = 1; ret = 0; } - -unlock: - write_unlock(&pg->rwlock); +end: + scoutfs_cwskip_write_end(&wr); out: + kfree(alloc); return ret; } @@ -2094,10 +1163,13 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; + struct cached_item *alloc = NULL; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + bool need_alloc = false; + int lock_height; + int cmp; int ret; scoutfs_inc_counter(sb, item_delete); @@ -2109,43 +1181,65 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, force, 0, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); - - item = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!force && (!item || item->deletion)) { - ret = -ENOENT; - goto unlock; - } - - if (!item) { - item = alloc_item(pg, key, seq, false, NULL, 0); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - } - - if (force) - item->persistent = 1; - - if (!item->persistent) { - /* can just forget items that aren't yet persistent */ - clear_item_dirty(sb, cinf, pg, item); - erase_item(pg, item); - } else { - /* must emit deletion to clobber old persistent item */ - item->seq = seq; - item->deletion = 1; - pg->erased_bytes += item_val_bytes(item->val_len) - - item_val_bytes(0); - item->val_len = 0; - mark_item_dirty(sb, cinf, pg, NULL, item); - } - ret = 0; -unlock: - write_unlock(&pg->rwlock); + do { + if (need_alloc) { + need_alloc = false; + alloc = alloc_item(sb, key, seq, true, NULL, 0); + if (!item) { + ret = -ENOMEM; + goto out; + } + lock_height = alloc->node->height; + } else { + lock_height = 1; + } + + scoutfs_cwskip_write_begin(&cinf->item_root, key, lock_height, + (void **)&prev, (void **)&item, &cmp, &wr); + if (!force && !item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + goto end; + } + if (!force && !item_is_positive(item)) { + ret = -ENOENT; + goto end; + } + + if (!item) { + if (!alloc) { + need_alloc = true; + goto end; + } + item = alloc; + alloc = NULL; + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + } + + if (force) + item->persistent = 1; + + if (!item->persistent) { + /* can just forget items that aren't yet persistent */ + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + } else { + /* must emit deletion to clobber old persistent item */ + item->seq = seq; + item->deletion = 1; + item->val_len = 0; + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + } +end: + scoutfs_cwskip_write_end(&wr); + } while (need_alloc || (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0))); + out: + kfree(alloc); return ret; } @@ -2161,22 +1255,14 @@ int scoutfs_item_delete_force(struct super_block *sb, struct scoutfs_key *key, return item_delete(sb, key, lock, SCOUTFS_LOCK_WRITE_ONLY, true); } -u64 scoutfs_item_dirty_pages(struct super_block *sb) +u64 scoutfs_item_dirty_bytes(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - return (u64)atomic_read(&cinf->dirty_pages); + return (u64)atomic64_read(&cinf->dirty_bytes); } -static int cmp_pg_start(void *priv, struct list_head *A, struct list_head *B) -{ - struct cached_page *a = list_entry(A, struct cached_page, dirty_head); - struct cached_page *b = list_entry(B, struct cached_page, dirty_head); - - return scoutfs_key_compare(&a->start, &b->start); -} - -static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) +static int cmp_dirty_item_key(void *priv, struct list_head *A, struct list_head *B) { struct cached_item *a = list_entry(A, struct cached_item, dirty_head); struct cached_item *b = list_entry(B, struct cached_item, dirty_head); @@ -2184,6 +1270,48 @@ static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) return scoutfs_key_compare(&a->key, &b->key); } +/* + * btree block insertion is iterating through the items in write_dirty's + * private list. The dirty items won't change. Each time we're called + * we return if we filled the descriptor with the current position and + * advance. + */ +static void *item_btree_iter_cb(struct super_block *sb, struct scoutfs_btree_item_desc *desc, + void *pos, void *arg) +{ + struct list_head *private_list = arg; + struct cached_item *item = pos; + + if (item == NULL) { + memset(desc, 0, sizeof(struct scoutfs_btree_item_desc)); + return NULL; + } + + desc->key = &item->key; + desc->seq = item->seq; + desc->flags = item->deletion ? SCOUTFS_ITEM_FLAG_DELETION : 0; + desc->val = item->val; + desc->val_len = item->val_len; + + if (item->dirty_head.next == private_list) + item = NULL; + else + item = list_next_entry(item, dirty_head); + + return item; +} + +static void splice_all_pcpu_dirty_lists(struct item_cache_info *cinf, struct list_head *list) +{ + struct pcpu_dirty_list *pdlist; + int cpu; + + for_each_online_cpu(cpu) { + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + list_splice_init(&pdlist->list, list); + } +} + /* * Write all the dirty items into dirty blocks in the forest of btrees. * If this succeeds then the dirty blocks can be submitted to commit @@ -2193,184 +1321,110 @@ static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) * dirty items have been written. * * This is called during transaction commit which prevents item writers - * from entering a transaction and dirtying items. The set of dirty - * items will be constant. - * - * But the pages that contain the dirty items can be changing. A - * neighbouring read lock can be invalidated and require bisecting a - * page, moving dirty items to a new page. That new page will be put - * after the original page on the dirty list. This will be done under - * the page rwlock and the global dirty_lock. - * - * We first sort the pages by their keys, then lock each page and copy - * its items into a private allocated singly-linked list of the items to - * dirty. Once we have that we can hand it off to the forest of btrees - * to write into items without causing any contention with other page - * users. + * from entering a transaction and modifying dirtying items. The dirty + * items will not be modified and no new dirty items will be added. + * We're the only user of the dirty lists. */ int scoutfs_item_write_dirty(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - struct scoutfs_btree_item_list *first; - struct scoutfs_btree_item_list **prev; - struct scoutfs_btree_item_list *lst; + struct pcpu_dirty_list *pdlist; struct cached_item *item; - struct cached_page *pg; - struct page *second = NULL; - struct page *page; - LIST_HEAD(pages); - LIST_HEAD(pos); - u64 max_seq = 0; - int bytes; - int off; + LIST_HEAD(list); + u64 max_seq; + int cpu; int ret; - if (atomic_read(&cinf->dirty_pages) == 0) - return 0; - scoutfs_inc_counter(sb, item_write_dirty); - /* sort page dirty list by keys */ - read_lock(&cinf->rwlock); - spin_lock(&cinf->dirty_lock); + /* gather all dirty items and sort by their key */ + splice_all_pcpu_dirty_lists(cinf, &list); + list_sort(NULL, &list, cmp_dirty_item_key); - /* sort cached pages by key, add our pos head */ - list_sort(NULL, &cinf->dirty_list, cmp_pg_start); - list_add(&pos, &cinf->dirty_list); - - read_unlock(&cinf->rwlock); - spin_unlock(&cinf->dirty_lock); - - page = alloc_page(GFP_NOFS); - if (!page) { - ret = -ENOMEM; - goto out; - } - list_add(&page->list, &pages); - - first = NULL; - prev = &first; - off = 0; - - while (!list_empty_careful(&pos)) { - if (!second) { - second = alloc_page(GFP_NOFS); - if (!second) { - ret = -ENOMEM; - goto out; - } - list_add(&second->list, &pages); - } - - /* read lock next sorted page, we're only dirty_list user */ - - spin_lock(&cinf->dirty_lock); - pg = list_entry(pos.next, struct cached_page, dirty_head); - if (!read_trylock(&pg->rwlock)) { - spin_unlock(&cinf->dirty_lock); - cpu_relax(); - continue; - } - spin_unlock(&cinf->dirty_lock); - - list_sort(NULL, &pg->dirty_list, cmp_item_key); - - list_for_each_entry(item, &pg->dirty_list, dirty_head) { - bytes = offsetof(struct scoutfs_btree_item_list, - val[item->val_len]); - max_seq = max(max_seq, item->seq); - - if (off + bytes > PAGE_SIZE) { - page = second; - second = NULL; - off = 0; - } - - lst = (void *)page_address(page) + off; - off += round_up(bytes, CACHED_ITEM_ALIGN); - - lst->next = NULL; - *prev = lst; - prev = &lst->next; - - lst->key = item->key; - lst->seq = item->seq; - lst->flags = item->deletion ? SCOUTFS_ITEM_FLAG_DELETION : 0; - lst->val_len = item->val_len; - memcpy(lst->val, item->val, item->val_len); - } - - spin_lock(&cinf->dirty_lock); - if (pg->dirty_head.next == &cinf->dirty_list) - list_del_init(&pos); - else - list_move(&pos, &pg->dirty_head); - spin_unlock(&cinf->dirty_lock); - - read_unlock(&pg->rwlock); - } + /* scan for the max seq, really seems like we could track this :/ */ + max_seq = 0; + list_for_each_entry(item, &list, dirty_head) + max_seq = max(max_seq, item->seq); /* store max item seq in forest's log_trees */ scoutfs_forest_set_max_seq(sb, max_seq); /* write all the dirty items into log btree blocks */ - ret = scoutfs_forest_insert_list(sb, first); -out: - list_for_each_entry_safe(page, second, &pages, list) { - list_del_init(&page->list); - __free_page(page); + item = list_first_entry_or_null(&list, struct cached_item, dirty_head); + ret = scoutfs_forest_insert_list(sb, item_btree_iter_cb, item, &list); + + /* return items to a pcpu list, we know ours exists :) */ + cpu = get_cpu(); + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + list_splice_init(&list, &pdlist->list); + list_for_each_entry(item, &pdlist->list, dirty_head) { + item->dirty_cpu = cpu; } + put_cpu(); return ret; } /* * The caller has successfully committed all the dirty btree blocks that - * contained the currently dirty items. Clear all the dirty items and - * pages. + * contained the currently dirty items. Clear all the dirty items. + * + * Deletion and delta items only existed to emit items into the btree + * logs. They aren't read from the item cache so once they're written + * we can remove them. + * + * The items in the private dirty list are still protected by being + * dirty and won't be removed from the main item list. For each item in + * the private list we search for it in the item list and remove it. + * We're likely to encounter runs of dirty items so we try iterating + * from our search position and clear as many dirty items as we can + * find. */ int scoutfs_item_write_done(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_writer wr; + struct cached_item *found; struct cached_item *item; - struct cached_item *tmp; - struct cached_page *pg; + LIST_HEAD(list); + int cleared = 0; + int cmp; -retry: - spin_lock(&cinf->dirty_lock); + splice_all_pcpu_dirty_lists(cinf, &list); - while ((pg = list_first_entry_or_null(&cinf->dirty_list, - struct cached_page, - dirty_head))) { + while ((item = list_first_entry_or_null(&list, struct cached_item, dirty_head))) { - if (!write_trylock(&pg->rwlock)) { - spin_unlock(&cinf->dirty_lock); - cpu_relax(); - goto retry; - } + scoutfs_cwskip_write_begin(&cinf->item_root, &item->key, item->node->height, + NULL, (void **)&found, &cmp, &wr); + BUG_ON(cmp != 0 || found != item); + do { + if (!item->dirty) + break; - spin_unlock(&cinf->dirty_lock); - - list_for_each_entry_safe(item, tmp, &pg->dirty_list, - dirty_head) { - clear_item_dirty(sb, cinf, pg, item); + /* all dirty items are only on our private list */ + list_del_init(&item->dirty_head); + item->dirty = 0; + item->dirty_cpu = -1; + cleared++; if (item->delta) scoutfs_inc_counter(sb, item_delta_written); - /* free deletion items */ - if (item->deletion || item->delta) - erase_item(pg, item); - else + if (item->deletion || item->delta) { + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + } else { item->persistent = 1; - } + } - write_unlock(&pg->rwlock); + } while (scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); - spin_lock(&cinf->dirty_lock); + scoutfs_cwskip_write_end(&wr); } - spin_unlock(&cinf->dirty_lock); + scoutfs_add_counter(sb, item_clear_dirty, cleared); + atomic64_set(&cinf->dirty_bytes, 0); return 0; } @@ -2379,202 +1433,358 @@ retry: * Return true if the item cache covers the given range and set *dirty * to true if any items in the cached range are dirty. * - * This is relatively rarely called as locks are granted to make sure - * that we *don't* have existing cache covered by the lock which then - * must be inconsistent. Finding pages is the critical error case, - * under correct operation this will be a read locked walk of the page - * rbtree that doesn't find anything. + * This is called as locks are granted to make sure that we *don't* have + * existing cache covered by the lock which then must be inconsistent. + * Finding items is the critical error case. Under correct operation + * this will be a read search that doesn't find anything. + * + * The best way to think about searching for cached items is to see + * that the only way for there *not* to be cached items in the range is + * if there is a) no previous item before the start key or the previous + * item has hole_after set and b) there are no items in the range. If + * we see a prev with hole after, or any items within the end key, then + * the range is cached. */ bool scoutfs_item_range_cached(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end, bool *dirty) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; + struct scoutfs_key pos = *start; + struct scoutfs_key rd_pos; struct cached_item *item; - struct cached_page *pg; - struct scoutfs_key pos; - bool cached; + struct cached_item *prev; + bool cached = false; + bool done = false; + bool rd_cached; + bool rd_dirty; + bool rd_done; - cached = false; *dirty = false; - pos = *start; + do { + scoutfs_cwskip_read_begin(&cinf->item_root, &pos, + (void **)&prev, (void **)&item, NULL, &rd); + do { + /* catches region starting with cache between items */ + rd_cached = prev && !prev->hole_after; - read_lock(&cinf->rwlock); + rd_dirty = false; + rd_done = false; + if (!item || scoutfs_key_compare(&item->key, end) > 0) { + rd_done = true; + } else { + rd_pos = item->key; + scoutfs_key_inc(&rd_pos); - while (!(*dirty) && scoutfs_key_compare(&pos, end) <= 0 && - (pg = page_rbtree_walk(sb, &cinf->pg_root, &pos, end, NULL, NULL, - NULL, NULL))) { - cached = true; - - read_lock(&pg->rwlock); - read_unlock(&cinf->rwlock); - - /* the dirty list isn't sorted :/ */ - list_for_each_entry(item, &pg->dirty_list, dirty_head) { - if (!scoutfs_key_compare_ranges(&item->key, &item->key, - start, end)) { - *dirty = true; - break; + rd_cached = true; + if (item->dirty) { + rd_dirty = true; + rd_done = true; + } } - } - pos = pg->end; - scoutfs_key_inc(&pos); + if (scoutfs_cwskip_read_valid(&rd)) { + pos = rd_pos; + cached |= rd_cached; + *dirty |= rd_dirty; + done |= rd_done; + } + } while (!done && scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); - read_unlock(&pg->rwlock); - read_lock(&cinf->rwlock); - } - - read_unlock(&cinf->rwlock); + } while (!done); return cached; } /* - * Remove the cached items in the given range. We drop pages that are - * fully inside the range and trim any pages that intersect it. This is - * being by locking for a lock that can't be used so there can't be item - * calls within the range. It can race with all our other page uses. + * Remove the cached items in the given range. This is called by lock + * invalidation which is preventing use of the lock while its + * invalidating. There can be no read or write item calls for the + * specific key range. There can be item calls working with the + * neighbouring items that we might reference while invalidating the + * edges of the range. This can be racing with memory pressure + * shrinking the cache. + * + * We have to remove the negative cached space covered by the range as + * well as the cached items themselves. This is done by setting + * hole_after in the item before items we remove. We can have to + * remove only a negative cached region so we have to do this when there + * isn't a referenced node after the key. */ void scoutfs_item_invalidate(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - struct cached_page *right = NULL; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; - int pgi; + struct scoutfs_cwskip_writer wr; + struct scoutfs_key key = *start; + struct cached_item *prev; + struct cached_item *item; + bool first = true; + int cmp; scoutfs_inc_counter(sb, item_invalidate); -retry: - write_lock(&cinf->rwlock); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &key, 1, + (void **)&prev, (void **)&item, &cmp, &wr); + do { + if (!(first && cmp == 0) && prev && !prev->hole_after) + prev->hole_after = 1; + first = false; - while ((pg = page_rbtree_walk(sb, &cinf->pg_root, start, end, NULL, - NULL, &par, &pnode))) { + if (item) { + key = item->key; + scoutfs_key_inc(&key); + } else { + scoutfs_key_set_ones(&key); + } - scoutfs_inc_counter(sb, item_invalidate_page); + if (!item || scoutfs_key_compare(&item->key, end) > 0) + break; - write_lock(&pg->rwlock); + /* cluster locking must sync before invalidating */ + WARN_ON_ONCE(item->dirty); - pgi = trim_page_intersection(sb, cinf, pg, right, start, end); - trace_scoutfs_item_invalidate_page(sb, start, end, - &pg->start, &pg->end, pgi); - BUG_ON(pgi == PGI_DISJOINT); /* walk wouldn't ret disjoint */ + scoutfs_inc_counter(sb, item_invalidate_item); - if (pgi == PGI_INSIDE) { - /* free entirely invalidated page */ - lru_remove(sb, cinf, pg); - rbtree_erase(&pg->node, &cinf->pg_root); - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - put_pg(sb, pg); - continue; + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); - } else if (pgi == PGI_BISECT_NEEDED) { - /* allocate so we can bisect a larger page */ - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - right = alloc_pg(sb, __GFP_NOFAIL); - goto retry; + } while (scoutfs_cwskip_write_next(&wr, 1, (void **)&prev, (void **)&item)); + scoutfs_cwskip_write_end(&wr); - } else if (pgi == PGI_BISECT) { - /* inv was entirely inside page, done after bisect */ - write_trylock_will_succeed(&right->rwlock); - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, right); - write_unlock(&right->rwlock); - write_unlock(&pg->rwlock); - right = NULL; - break; - } + } while (scoutfs_key_compare(&key, end) <= 0); +} - /* OLAP trimmed edge, keep searching */ - write_unlock(&pg->rwlock); - } - - write_unlock(&cinf->rwlock); - - put_pg(sb, right); +static bool can_shrink_item(struct cached_item *item, u64 shrink_age, u64 first_reader_seq) +{ + return item && + atomic64_read(&item->age) <= shrink_age && + item->seq < first_reader_seq && + !item->dirty; } /* - * Shrink the size the item cache. We're operating against the fast - * path lock ordering and we skip pages if we can't acquire locks. We - * can run into dirty pages or pages with items that weren't visible to - * the earliest active reader which must be skipped. + * Shrink the size the item cache. + * + * As items were accessed we tried to mark them with coarse age values + * that divide them into fractions of the total cached items. We have + * no specific indexing of items by age, instead we randomly search the + * list looking for items that are old enough to shrink. + * + * We cast a very wide net when searching for items that are old enough. + * If we searched for a precise small age window then the random + * searching has to do more work before it finds the ages it's looking + * for. Instead we only search for two broad age categories: either + * items that are older than the most recently accessed half of the + * items, or all items. This ensures that the random search will find + * items to shrink reasonably often. + * + * While we initially search to a random position in the list, we try to + * shrink contiguous runs of items. We choose a small size that is + * still larger than can be read and inserted in a single operation. + * The worst case would be to randomly free individual items leading to + * later reads that discard most of their items while inserting into a + * single item hole. + * + * All of this can go wrong. Access patterns can lead to weird age + * groupings, the cache can be entirely dirty, invalidation can remove + * the entire cache out from under us if the entire system is in one + * lock (a handful of enormous files in one inode group). This is all + * a best effort that stops when it has too many attempts that don't + * make progress. + * + * Finally, while we work with items the caller really cares about + * allocated pages. We track the bytes allocated to items and + * translate that to units of pages for the caller. We have no idea if + * our frees make up freed contiguous pages, and we're not really + * freeing items before returning, we're asking RCU to free later for + * us. So while we can return and tell the caller we freed our objects + * it's mostly a lie that we hope works out in the end. */ -static int item_lru_shrink(struct shrinker *shrink, - struct shrink_control *sc) +static int item_shrink(struct shrinker *shrink, struct shrink_control *sc) { - struct item_cache_info *cinf = container_of(shrink, - struct item_cache_info, - shrinker); +#define ITEM_SHRINK_SCAN_LIMIT (2 * SCOUTFS_BLOCK_LG_SIZE) +#define ITEM_SHRINK_ATTEMPT_LIMIT 64 + struct item_cache_info *cinf = container_of(shrink, struct item_cache_info, shrinker); struct super_block *sb = cinf->sb; - struct cached_page *tmp; - struct cached_page *pg; + struct scoutfs_cwskip_reader rd; + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + struct cached_item *prev; + struct scoutfs_key key; u64 first_reader_seq; - int nr; + s64 shrink_bytes; + u64 shrink_age; + u64 cur_age; + int attempts; + int scanned; + bool found; if (sc->nr_to_scan == 0) goto out; - nr = sc->nr_to_scan; + + scoutfs_inc_counter(sb, item_shrink); /* can't invalidate pages with items that weren't visible to first reader */ first_reader_seq = first_active_reader_seq(cinf); - write_lock(&cinf->rwlock); - spin_lock(&cinf->lru_lock); + shrink_bytes = (u64)sc->nr_to_scan << PAGE_SHIFT; - list_for_each_entry_safe(pg, tmp, &cinf->lru_list, lru_head) { - - if (first_reader_seq <= pg->max_seq) { - scoutfs_inc_counter(sb, item_shrink_page_reader); - continue; - } - - if (!write_trylock(&pg->rwlock)) { - scoutfs_inc_counter(sb, item_shrink_page_trylock); - continue; - } - - if (!list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_shrink_page_dirty); - write_unlock(&pg->rwlock); - continue; - } - - scoutfs_inc_counter(sb, item_shrink_page); - - __lru_remove(sb, cinf, pg); - rbtree_erase(&pg->node, &cinf->pg_root); - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - - put_pg(sb, pg); - - if (--nr == 0) - break; + /* can shrink oldest half if shrinking less than half, otherwise everything */ + cur_age = atomic64_read(&cinf->current_age); + if ((shrink_bytes < (atomic64_read(&cinf->age_total) >> 1)) && (cur_age > ITEM_AGE_NR)) { + shrink_age = cur_age - ITEM_AGE_HALF; + } else { + scoutfs_inc_counter(sb, item_shrink_all); + shrink_age = U64_MAX; } - write_unlock(&cinf->rwlock); - spin_unlock(&cinf->lru_lock); + attempts = 0; + + do { + attempts++; + + /* find the key of a shrink candidate */ + scoutfs_inc_counter(sb, item_shrink_read_search); + scanned = 0; + found = false; + scoutfs_cwskip_read_begin(&cinf->item_root, NULL, + (void **)&prev, (void **)&item, NULL, &rd); + do { + if (!item) { + if (!prev) + shrink_bytes = 0; + break; + } + + /* keys don't change */ + key = item->key; + + if (can_shrink_item(item, shrink_age, first_reader_seq)) { + found = true; + break; + } + + scoutfs_key_inc(&key); + scoutfs_inc_counter(sb, item_shrink_searched); + scanned += item->alloc_bytes; + + } while (scanned < ITEM_SHRINK_SCAN_LIMIT && + scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); + + if (!found) + continue; + + /* try to shrink items in a region after the key */ + scoutfs_inc_counter(sb, item_shrink_write_search); + scanned = 0; + scoutfs_cwskip_write_begin(&cinf->item_root, &key, 1, + (void **)&prev, (void **)&item, NULL, &wr); + do { + if (!item) + break; + + key = item->key; + scoutfs_key_inc(&key); + scanned += item->alloc_bytes; + + if (can_shrink_item(item, shrink_age, first_reader_seq)) { + scoutfs_inc_counter(sb, item_shrink_removed); + if (prev && !prev->hole_after) + prev->hole_after = 1; + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + shrink_bytes -= item->alloc_bytes; + attempts = 0; + } else { + scoutfs_inc_counter(sb, item_shrink_skipped); + } + } while (shrink_bytes > 0 && scanned < ITEM_SHRINK_SCAN_LIMIT && + scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); + + } while (shrink_bytes > 0 && attempts < ITEM_SHRINK_ATTEMPT_LIMIT); + + if (attempts >= ITEM_SHRINK_ATTEMPT_LIMIT) + scoutfs_inc_counter(sb, item_shrink_exhausted); + out: - return min_t(unsigned long, cinf->lru_pages, INT_MAX); + return min_t(u64, atomic64_read(&cinf->age_total) >> PAGE_SHIFT, INT_MAX); +} + +/* + * Free all the items in batches so as not to overwhelm rcu. Only used + * during teardown when there must be no more item use. + */ +static void free_all_items(struct super_block *sb, struct item_cache_info *cinf) +{ + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + struct scoutfs_key key; + int i; + + /* free items in batches of rcu critical sections */ + scoutfs_key_set_zeros(&key); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &key, + SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + if (!item) + break; + i = 0; + do { + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + call_free_item(sb, item); + } while (++i < 1024 && scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); + + synchronize_rcu(); + } while (item); + + WARN_ON_ONCE(!scoutfs_cwskip_empty(&cinf->item_root)); } static int item_cpu_callback(struct notifier_block *nfb, unsigned long action, void *hcpu) { - struct item_cache_info *cinf = container_of(nfb, - struct item_cache_info, - notifier); - struct super_block *sb = cinf->sb; - unsigned long cpu = (unsigned long)hcpu; + struct item_cache_info *cinf = container_of(nfb, struct item_cache_info, notifier); + unsigned long dead_cpu = (unsigned long)hcpu; + struct pcpu_age_counters *pac; + struct pcpu_dirty_list *pdlist; + struct cached_item *item; + LIST_HEAD(list); + int our_cpu; - if (action == CPU_DEAD) - drop_pcpu_pages(sb, cinf, cpu); + if (action == CPU_DEAD) { + our_cpu = get_cpu(); + + /* age tracking */ + pac = per_cpu_ptr(cinf->pcpu_age, dead_cpu); + add_global_age_marked(cinf, atomic64_read(&pac->age_marked)); + atomic64_set(&pac->age_marked, 0); + atomic64_add(atomic64_xchg(&pac->total, 0), &cinf->age_total); + + /* dirty item lists */ + pdlist = per_cpu_ptr(cinf->pcpu_dirty, dead_cpu); + list_splice_init(&pdlist->list, &list); + + our_cpu = get_cpu(); + list_for_each_entry(item, &list, dirty_head) + item->dirty_cpu = our_cpu; + pdlist = per_cpu_ptr(cinf->pcpu_dirty, our_cpu); + spin_lock(&pdlist->lock); + list_splice_init(&list, &pdlist->list); + spin_unlock(&pdlist->lock); + + put_cpu(); + } return NOTIFY_OK; } @@ -2582,6 +1792,8 @@ static int item_cpu_callback(struct notifier_block *nfb, int scoutfs_item_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct pcpu_dirty_list *pdlist; + struct pcpu_age_counters *pac; struct item_cache_info *cinf; int cpu; @@ -2590,24 +1802,38 @@ int scoutfs_item_setup(struct super_block *sb) return -ENOMEM; cinf->sb = sb; - rwlock_init(&cinf->rwlock); - cinf->pg_root = RB_ROOT; - spin_lock_init(&cinf->dirty_lock); - INIT_LIST_HEAD(&cinf->dirty_list); - atomic_set(&cinf->dirty_pages, 0); - spin_lock_init(&cinf->lru_lock); - INIT_LIST_HEAD(&cinf->lru_list); + scoutfs_cwskip_init_root(&cinf->item_root, key_item_cmp, sizeof(struct cached_item)); + atomic64_set(&cinf->current_age, 1); + atomic64_set(&cinf->age_marked, 1ULL << ITEM_AGE_MARK_SHIFT); + atomic64_set(&cinf->age_total, 0); + atomic64_set(&cinf->dirty_bytes, 0); spin_lock_init(&cinf->active_lock); INIT_LIST_HEAD(&cinf->active_list); - cinf->pcpu_pages = alloc_percpu(struct item_percpu_pages); - if (!cinf->pcpu_pages) + cinf->pcpu_dirty = alloc_percpu(struct pcpu_dirty_list); + if (!cinf->pcpu_dirty) { + kfree(cinf); return -ENOMEM; + } - for_each_possible_cpu(cpu) - init_pcpu_pages(cinf, cpu); + cinf->pcpu_age = alloc_percpu(struct pcpu_age_counters); + if (!cinf->pcpu_age) { + kfree(cinf); + free_percpu(cinf->pcpu_dirty); + return -ENOMEM; + } - cinf->shrinker.shrink = item_lru_shrink; + for_each_possible_cpu(cpu) { + pac = per_cpu_ptr(cinf->pcpu_age, cpu); + pac->age_marked = cinf->age_marked; + atomic64_set(&pac->total, 0); + + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + spin_lock_init(&pdlist->lock); + INIT_LIST_HEAD(&pdlist->list); + } + + cinf->shrinker.shrink = item_shrink; cinf->shrinker.seeks = DEFAULT_SEEKS; register_shrinker(&cinf->shrinker); cinf->notifier.notifier_call = item_cpu_callback; @@ -2624,9 +1850,6 @@ void scoutfs_item_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache_info *cinf = sbi->item_cache_info; - struct cached_page *tmp; - struct cached_page *pg; - int cpu; if (cinf) { BUG_ON(!list_empty(&cinf->active_list)); @@ -2634,18 +1857,10 @@ void scoutfs_item_destroy(struct super_block *sb) unregister_hotcpu_notifier(&cinf->notifier); unregister_shrinker(&cinf->shrinker); - for_each_possible_cpu(cpu) - drop_pcpu_pages(sb, cinf, cpu); - free_percpu(cinf->pcpu_pages); + free_all_items(sb, cinf); - rbtree_postorder_for_each_entry_safe(pg, tmp, &cinf->pg_root, - node) { - RB_CLEAR_NODE(&pg->node); - INIT_LIST_HEAD(&pg->lru_head); - INIT_LIST_HEAD(&pg->dirty_list); - INIT_LIST_HEAD(&pg->dirty_head); - put_pg(sb, pg); - } + free_percpu(cinf->pcpu_dirty); + free_percpu(cinf->pcpu_age); kfree(cinf); sbi->item_cache_info = NULL; diff --git a/kmod/src/item.h b/kmod/src/item.h index 431866d5..a2332945 100644 --- a/kmod/src/item.h +++ b/kmod/src/item.h @@ -26,7 +26,7 @@ int scoutfs_item_delete_force(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock); -u64 scoutfs_item_dirty_pages(struct super_block *sb); +u64 scoutfs_item_dirty_bytes(struct super_block *sb); int scoutfs_item_write_dirty(struct super_block *sb); int scoutfs_item_write_done(struct super_block *sb); bool scoutfs_item_range_cached(struct super_block *sb, diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 80db3247..22524fa8 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -403,24 +403,24 @@ TRACE_EVENT(scoutfs_sync_fs, ); TRACE_EVENT(scoutfs_trans_write_func, - TP_PROTO(struct super_block *sb, u64 dirty_block_bytes, u64 dirty_item_pages), + TP_PROTO(struct super_block *sb, u64 dirty_block_bytes, u64 dirty_item_bytes), - TP_ARGS(sb, dirty_block_bytes, dirty_item_pages), + TP_ARGS(sb, dirty_block_bytes, dirty_item_bytes), TP_STRUCT__entry( SCSB_TRACE_FIELDS __field(__u64, dirty_block_bytes) - __field(__u64, dirty_item_pages) + __field(__u64, dirty_item_bytes) ), TP_fast_assign( SCSB_TRACE_ASSIGN(sb); __entry->dirty_block_bytes = dirty_block_bytes; - __entry->dirty_item_pages = dirty_item_pages; + __entry->dirty_item_bytes = dirty_item_bytes; ), - TP_printk(SCSBF" dirty_block_bytes %llu dirty_item_pages %llu", - SCSB_TRACE_ARGS, __entry->dirty_block_bytes, __entry->dirty_item_pages) + TP_printk(SCSBF" dirty_block_bytes %llu dirty_item_bytes %llu", + SCSB_TRACE_ARGS, __entry->dirty_block_bytes, __entry->dirty_item_bytes) ); DECLARE_EVENT_CLASS(scoutfs_trans_hold_release_class, diff --git a/kmod/src/trans.c b/kmod/src/trans.c index 14e45c15..e22a58b6 100644 --- a/kmod/src/trans.c +++ b/kmod/src/trans.c @@ -207,7 +207,7 @@ void scoutfs_trans_write_func(struct work_struct *work) } trace_scoutfs_trans_write_func(sb, scoutfs_block_writer_dirty_bytes(sb, &tri->wri), - scoutfs_item_dirty_pages(sb)); + scoutfs_item_dirty_bytes(sb)); if (tri->deadline_expired) scoutfs_inc_counter(sb, trans_commit_timer); @@ -422,16 +422,18 @@ static void release_holders(struct super_block *sb) */ static bool commit_before_hold(struct super_block *sb, struct trans_info *tri) { + u64 dirty_blocks = (scoutfs_item_dirty_bytes(sb) >> SCOUTFS_BLOCK_LG_SHIFT) + 1; + /* - * In theory each dirty item page could be straddling two full - * blocks, requiring 4 allocations for each item cache page. - * That's much too conservative, typically many dirty item cache - * pages that are near each other all land in one block. This + * In theory each dirty item could be added to a full block that + * has to split, requiring 2 meta block allocs for each dirty + * item. That's much too conservative, typically many dirty + * items that are near each other all land in one block. This * rough estimate is still so far beyond what typically happens * that it accounts for having to dirty parent blocks and * whatever dirtying is done during the transaction hold. */ - if (scoutfs_alloc_meta_low(sb, &tri->alloc, scoutfs_item_dirty_pages(sb) * 2)) { + if (scoutfs_alloc_meta_low(sb, &tri->alloc, dirty_blocks * 4)) { scoutfs_inc_counter(sb, trans_commit_dirty_meta_full); return true; }