From 5bea29a168de16e0bae07aec0f4a4be1f246727c Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Thu, 23 Dec 2021 14:58:19 -0800 Subject: [PATCH] Use cwskip for the item cache The use of pages in the item cache got us pretty far but it fundmanetally couldn't escape the contention around the global or per-page read locks. Some loads became bottlenecked in contention in the item cache. Worse, we were seeing inconsistency in the per-cpu cached mappings of key ranges to pages. All the users of items in the cache are transitioned from searching for items in locked pages to searching for items in the cwskip list. It's fundamentally built around a seqlock-like begin/retry pattern so most of the item work gets wrapped around search and retry helpers. Without pages we no longer have a global list of dirty pages. Instead we have per-cpu lists of dirty items that are later sorted and handed to the btree insertion iterator. We take the opportunity to clean up that interface now that it's very easy for us to iterate through the stable list of dirty items. Rather than a global lru of pages we have an algorithm for maintaining items in rough groups of ages. Shrinking randomly walks the cwskip list looking for regions of sufficiently old items rather than walking a precise global lru list of pages. Signed-off-by: Zach Brown --- kmod/src/btree.c | 39 +- kmod/src/btree.h | 23 +- kmod/src/counters.h | 31 +- kmod/src/forest.c | 6 +- kmod/src/forest.h | 4 +- kmod/src/item.c | 2987 ++++++++++++++------------------------ kmod/src/item.h | 2 +- kmod/src/scoutfs_trace.h | 12 +- kmod/src/trans.c | 14 +- 9 files changed, 1169 insertions(+), 1949 deletions(-) diff --git a/kmod/src/btree.c b/kmod/src/btree.c index 9807c8c5..30e85b10 100644 --- a/kmod/src/btree.c +++ b/kmod/src/btree.c @@ -1875,12 +1875,11 @@ out: * set in btree items. They're only used for fs items written through * the item cache and forest of log btrees. */ -int scoutfs_btree_insert_list(struct super_block *sb, - struct scoutfs_alloc *alloc, - struct scoutfs_block_writer *wri, - struct scoutfs_btree_root *root, - struct scoutfs_btree_item_list *lst) +int scoutfs_btree_insert_list(struct super_block *sb, struct scoutfs_alloc *alloc, + struct scoutfs_block_writer *wri, struct scoutfs_btree_root *root, + scoutfs_btree_item_iter_cb iter_cb, void *pos, void *arg) { + struct scoutfs_btree_item_desc desc; struct scoutfs_btree_item *item; struct btree_walk_key_range kr; struct scoutfs_btree_block *bt; @@ -1889,44 +1888,46 @@ int scoutfs_btree_insert_list(struct super_block *sb, int cmp; int ret = 0; - while (lst) { + pos = iter_cb(sb, &desc, pos, arg); + + while (pos) { ret = btree_walk(sb, alloc, wri, root, BTW_DIRTY | BTW_INSERT, - &lst->key, lst->val_len, &bl, &kr, NULL); + desc.key, desc.val_len, &bl, &kr, NULL); if (ret < 0) goto out; bt = bl->data; do { - item = leaf_item_hash_search(sb, bt, &lst->key); + item = leaf_item_hash_search(sb, bt, desc.key); if (item) { /* try to merge delta values, _NULL not deleted; merge will */ - ret = scoutfs_forest_combine_deltas(&lst->key, + ret = scoutfs_forest_combine_deltas(desc.key, item_val(bt, item), item_val_len(item), - lst->val, lst->val_len); + desc.val, desc.val_len); if (ret < 0) { scoutfs_block_put(sb, bl); goto out; } - item->seq = cpu_to_le64(lst->seq); - item->flags = lst->flags; + item->seq = cpu_to_le64(desc.seq); + item->flags = desc.flags; if (ret == 0) - update_item_value(bt, item, lst->val, lst->val_len); + update_item_value(bt, item, desc.val, desc.val_len); else ret = 0; } else { scoutfs_avl_search(&bt->item_root, - cmp_key_item, &lst->key, + cmp_key_item, desc.key, &cmp, &par, NULL, NULL); - create_item(bt, &lst->key, lst->seq, lst->flags, lst->val, - lst->val_len, par, cmp); + create_item(bt, desc.key, desc.seq, desc.flags, desc.val, + desc.val_len, par, cmp); } - lst = lst->next; - } while (lst && scoutfs_key_compare(&lst->key, &kr.end) <= 0 && - mid_free_item_room(bt, lst->val_len)); + pos = iter_cb(sb, &desc, pos, arg); + } while (pos && scoutfs_key_compare(desc.key, &kr.end) <= 0 && + mid_free_item_room(bt, desc.val_len)); scoutfs_block_put(sb, bl); } diff --git a/kmod/src/btree.h b/kmod/src/btree.h index 057aa779..4233e912 100644 --- a/kmod/src/btree.h +++ b/kmod/src/btree.h @@ -18,11 +18,24 @@ struct scoutfs_btree_item_ref { #define SCOUTFS_BTREE_ITEM_REF(name) \ struct scoutfs_btree_item_ref name = {NULL,} -/* caller gives an item to the callback */ +/* btree gives an item to caller */ typedef int (*scoutfs_btree_item_cb)(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, void *val, int val_len, void *arg); +struct scoutfs_btree_item_desc { + struct scoutfs_key *key; + void *val; + u64 seq; + u8 flags; + unsigned val_len; +}; + +/* btree iterates through items from caller */ +typedef void *(*scoutfs_btree_item_iter_cb)(struct super_block *sb, + struct scoutfs_btree_item_desc *desc, + void *pos, void *arg); + /* simple singly-linked list of items */ struct scoutfs_btree_item_list { struct scoutfs_btree_item_list *next; @@ -78,11 +91,9 @@ int scoutfs_btree_read_items(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end, scoutfs_btree_item_cb cb, void *arg); -int scoutfs_btree_insert_list(struct super_block *sb, - struct scoutfs_alloc *alloc, - struct scoutfs_block_writer *wri, - struct scoutfs_btree_root *root, - struct scoutfs_btree_item_list *lst); +int scoutfs_btree_insert_list(struct super_block *sb, struct scoutfs_alloc *alloc, + struct scoutfs_block_writer *wri, struct scoutfs_btree_root *root, + scoutfs_btree_item_iter_cb iter_cb, void *pos, void *arg); int scoutfs_btree_parent_range(struct super_block *sb, struct scoutfs_btree_root *root, diff --git a/kmod/src/counters.h b/kmod/src/counters.h index b3e68bd4..f8645749 100644 --- a/kmod/src/counters.h +++ b/kmod/src/counters.h @@ -90,36 +90,27 @@ EXPAND_COUNTER(forest_read_items) \ EXPAND_COUNTER(forest_roots_next_hint) \ EXPAND_COUNTER(forest_set_bloom_bits) \ + EXPAND_COUNTER(item_alloc_bytes) \ EXPAND_COUNTER(item_clear_dirty) \ EXPAND_COUNTER(item_create) \ EXPAND_COUNTER(item_delete) \ EXPAND_COUNTER(item_delta) \ EXPAND_COUNTER(item_delta_written) \ EXPAND_COUNTER(item_dirty) \ + EXPAND_COUNTER(item_free_bytes) \ EXPAND_COUNTER(item_invalidate) \ - EXPAND_COUNTER(item_invalidate_page) \ + EXPAND_COUNTER(item_invalidate_item) \ EXPAND_COUNTER(item_lookup) \ EXPAND_COUNTER(item_mark_dirty) \ EXPAND_COUNTER(item_next) \ - EXPAND_COUNTER(item_page_accessed) \ - EXPAND_COUNTER(item_page_alloc) \ - EXPAND_COUNTER(item_page_clear_dirty) \ - EXPAND_COUNTER(item_page_compact) \ - EXPAND_COUNTER(item_page_free) \ - EXPAND_COUNTER(item_page_lru_add) \ - EXPAND_COUNTER(item_page_lru_remove) \ - EXPAND_COUNTER(item_page_mark_dirty) \ - EXPAND_COUNTER(item_page_rbtree_walk) \ - EXPAND_COUNTER(item_page_split) \ - EXPAND_COUNTER(item_pcpu_add_replaced) \ - EXPAND_COUNTER(item_pcpu_page_hit) \ - EXPAND_COUNTER(item_pcpu_page_miss) \ - EXPAND_COUNTER(item_pcpu_page_miss_keys) \ - EXPAND_COUNTER(item_read_pages_split) \ - EXPAND_COUNTER(item_shrink_page) \ - EXPAND_COUNTER(item_shrink_page_dirty) \ - EXPAND_COUNTER(item_shrink_page_reader) \ - EXPAND_COUNTER(item_shrink_page_trylock) \ + EXPAND_COUNTER(item_shrink) \ + EXPAND_COUNTER(item_shrink_all) \ + EXPAND_COUNTER(item_shrink_exhausted) \ + EXPAND_COUNTER(item_shrink_read_search) \ + EXPAND_COUNTER(item_shrink_removed) \ + EXPAND_COUNTER(item_shrink_searched) \ + EXPAND_COUNTER(item_shrink_skipped) \ + EXPAND_COUNTER(item_shrink_write_search) \ EXPAND_COUNTER(item_update) \ EXPAND_COUNTER(item_write_dirty) \ EXPAND_COUNTER(lock_alloc) \ diff --git a/kmod/src/forest.c b/kmod/src/forest.c index 1b4c9c4b..a9b33236 100644 --- a/kmod/src/forest.c +++ b/kmod/src/forest.c @@ -494,13 +494,13 @@ out: return ret; } -int scoutfs_forest_insert_list(struct super_block *sb, - struct scoutfs_btree_item_list *lst) +int scoutfs_forest_insert_list(struct super_block *sb, scoutfs_btree_item_iter_cb cb, + void *pos, void *arg) { DECLARE_FOREST_INFO(sb, finf); return scoutfs_btree_insert_list(sb, finf->alloc, finf->wri, - &finf->our_log.item_root, lst); + &finf->our_log.item_root, cb, pos, arg); } /* diff --git a/kmod/src/forest.h b/kmod/src/forest.h index 30564a11..e699d4bc 100644 --- a/kmod/src/forest.h +++ b/kmod/src/forest.h @@ -29,8 +29,8 @@ void scoutfs_forest_set_max_seq(struct super_block *sb, u64 max_seq); int scoutfs_forest_get_max_seq(struct super_block *sb, struct scoutfs_super_block *super, u64 *seq); -int scoutfs_forest_insert_list(struct super_block *sb, - struct scoutfs_btree_item_list *lst); +int scoutfs_forest_insert_list(struct super_block *sb, scoutfs_btree_item_iter_cb cb, + void *pos, void *arg); int scoutfs_forest_srch_add(struct super_block *sb, u64 hash, u64 ino, u64 id); void scoutfs_forest_inc_inode_count(struct super_block *sb); diff --git a/kmod/src/item.c b/kmod/src/item.c index 7151b380..267ec9cf 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -15,16 +15,18 @@ #include #include #include -#include #include +#include #include #include #include "super.h" +#include "cwskip.h" #include "item.h" #include "forest.h" #include "block.h" #include "trans.h" +#include "cwskip.h" #include "counters.h" #include "scoutfs_trace.h" @@ -33,1245 +35,330 @@ * from and written to the forest of btrees under the protection of * cluster locks. * - * The cache is built around pages of items. A page has the range of - * keys that it caches and the items that are present in that range. - * Pages are non-overlapping, there is only one page that can contain a - * given key at a time. The pages are tracked by an rbtree, and each - * page has an rbtree of items. + * The cache is built around a concurrent skip list of items. Readers + * are protected by per-item seqlocks and retry if their items are + * modified while they're being referenced. Writers use trylock to + * acquire locks on adjacent pairs of items and retry if they encounter + * contention. + * + * The item cache has to support negative caches of ranges of keys that + * contain no items. This is done by marking a node as having a "hole" + * in the cache following its key. Searches that hit keys in these + * hole regions read items from btree blocks and insert the resulting + * key range into the cache. Searches that end after items without the + * following hole marker know that the item doesn't exist and can act + * accordingly, say by returning enoent. Working with this space + * between items is why the skip list interface is built around + * returning the pair of items that surround a key. * * The cache is populated by reading items from the forest of btrees - * into a private set of pages. The regions of those pages which - * weren't already cached are then inserted into the cache. + * into a private list. The range of keys in the list that didn't exist + * in the cache are inserted into the list, maintaining the negative + * cached range around the read items. * - * CPUs can concurrently modify items that are in different pages. The - * page rbtree can be read locked to find a page, and then the page is - * locked to work with its items. We then add per-cpu references to - * recently used pages so that the global page rbtree can be skipped in - * the typical case of repeated calls to localized portions of the key - * space. + * Dirty items are kept in per-cpu lists to reduce global contention, + * loads where all cpus are only creating dirty items are common. The + * dirty items are only combined and sorted when it comes to to commit + * them. * - * Dirty items are kept in a per-page dirty list, and pages with dirty - * items are kept in a global dirty list. This reduces contention on - * the global list by accessing it at page granularity instead of every - * time an item is dirtied. The dirty items are not sorted until it - * comes time to commit them to the btrees. This reduces the cost of - * tracking dirty items during the transaction, particularly moving them - * between pages as pages are split to make room for new items. - * - * The size of the cache is only limited by memory reclaim. Pages are - * kept in a very coarse lru. Dirtying doesn't remove pages from the - * lru, and is operating against lock ordering with trylocks, so - * shrinking can rarely have to skip pages in the LRU. - * - * The locking is built around the fast path of everyone checking the - * the page rbtree, then locking pages, and then adding or removing - * pages from the lru or dirty lists. Writing and the shrinker work - * work in reverse, starting with the dirty or lru lists and have to use - * trylock to lock the pages. When we split we have to lock multiple - * pages and we use trylock which is guaranteed to succeed because the - * pages are private. + * The size of the cache is only limited by memory reclaim. We try to + * group items into coarse ages by how recently they were accessed. We + * don't precisely order items by access time to avoid contention. + * Shrinking randomly walks all items looking for items that weren't + * accessed recently. */ +struct pcpu_age_counters { + atomic64_t age_marked; + atomic64_t total; +}; + +struct pcpu_dirty_list { + struct list_head list; + spinlock_t lock; +}; + struct item_cache_info { /* almost always read, barely written */ struct super_block *sb; - struct item_percpu_pages __percpu *pcpu_pages; + struct pcpu_age_counters __percpu *pcpu_age; + struct pcpu_dirty_list __percpu *pcpu_dirty; struct shrinker shrinker; struct notifier_block notifier; - /* often walked, but per-cpu refs are fast path */ - rwlock_t rwlock; - struct rb_root pg_root; + /* read for every op, rarely written by tall or early nodes */ + ____cacheline_aligned_in_smp struct scoutfs_cwskip_root item_root; - /* page-granular modification by writers, then exclusive to commit */ - spinlock_t dirty_lock; - struct list_head dirty_list; - atomic_t dirty_pages; + /* often read, rarely written as ages advance */ + atomic64_t current_age; + atomic64_t age_marked; + atomic64_t age_total; - /* page-granular modification by readers */ - spinlock_t lru_lock; - struct list_head lru_list; - unsigned long lru_pages; + /* written by every dirty item change */ + ____cacheline_aligned_in_smp atomic64_t dirty_bytes; - /* written by page readers, read by shrink */ - spinlock_t active_lock; + /* written by readers, read by shrink */ + ____cacheline_aligned_in_smp spinlock_t active_lock; struct list_head active_list; }; #define DECLARE_ITEM_CACHE_INFO(sb, name) \ struct item_cache_info *name = SCOUTFS_SB(sb)->item_cache_info -#define PG_PER_CPU 32 -struct item_percpu_pages { - struct rb_root root; - struct list_head list; - struct pcpu_page_ref { - struct scoutfs_key start; - struct scoutfs_key end; - struct cached_page *pg; - struct rb_node node; - struct list_head head; - } refs[PG_PER_CPU]; -}; - -struct cached_page { - /* often read by concurrent rbtree walks */ - struct rb_node node; - struct scoutfs_key start; - struct scoutfs_key end; - - /* often modified by page rwlock holder */ - rwlock_t rwlock; - struct rb_root item_root; - struct list_head lru_head; - unsigned long lru_time; - struct list_head dirty_list; - struct list_head dirty_head; - u64 max_seq; - struct page *page; - unsigned int page_off; - unsigned int erased_bytes; - atomic_t refcount; -}; - struct cached_item { - struct rb_node node; + struct scoutfs_cwskip_node *node; struct list_head dirty_head; + struct rcu_head rcu_head; + atomic64_t age; unsigned int dirty:1, /* needs to be written */ persistent:1, /* in btrees, needs deletion item */ deletion:1, /* negative del item for writing */ - delta:1; /* item vales are combined, freed after write */ + delta:1, /* item vales are combined, freed after write */ + negative:1, /* no item, marks hole_after boundary */ + hole_after:1; /* no cache until next item */ + unsigned int alloc_bytes; unsigned int val_len; + int dirty_cpu; struct scoutfs_key key; u64 seq; - char val[0]; + char *val; }; -#define CACHED_ITEM_ALIGN 8 - -static int item_val_bytes(int val_len) +static int key_item_cmp(void *K, void *C) { - return round_up(offsetof(struct cached_item, val[val_len]), - CACHED_ITEM_ALIGN); + struct scoutfs_key *key = K; + struct cached_item *item = C; + + return scoutfs_key_compare(key, &item->key); +} + +static int item_alloc_bytes(int height, int val_len) +{ + return sizeof(struct cached_item) + + offsetof(struct scoutfs_cwskip_node, links[height]) + + val_len; } /* - * Return if the page has room to allocate an item with the given value - * length at its free page offset. This must be called with the page - * writelock held because it can modify the page to reclaim free space - * to mkae room for the allocation. Today all it does is recognize that - * the page is empty and reset the page_off. + * Allocate and initialize a new item. These can be freed directly + * until they're inserted into the item list. The moment they're + * visible via the list they have to be freed with call_free_item within + * an RCU read lock. */ -static bool page_has_room(struct cached_page *pg, int val_len) -{ - if (RB_EMPTY_ROOT(&pg->item_root)) - pg->page_off = 0; - - return pg->page_off + item_val_bytes(val_len) <= PAGE_SIZE; -} - -static struct cached_page *first_page(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_first(root))) - return NULL; - - return rb_entry(node, struct cached_page, node); -} - -static struct cached_item *first_item(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_first(root))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *last_item(struct rb_root *root) -{ - struct rb_node *node; - - if (!root || !(node = rb_last(root))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *next_item(struct cached_item *item) -{ - struct rb_node *node; - - if (!item || !(node = rb_next(&item->node))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static struct cached_item *prev_item(struct cached_item *item) -{ - struct rb_node *node; - - if (!item || !(node = rb_prev(&item->node))) - return NULL; - - return rb_entry(node, struct cached_item, node); -} - -static void rbtree_insert(struct rb_node *node, struct rb_node *par, - struct rb_node **pnode, struct rb_root *root) -{ - rb_link_node(node, par, pnode); - rb_insert_color(node, root); -} - -static void rbtree_erase(struct rb_node *node, struct rb_root *root) -{ - rb_erase(node, root); - RB_CLEAR_NODE(node); -} - -static void rbtree_replace_node(struct rb_node *victim, struct rb_node *new, - struct rb_root *root) -{ - rb_replace_node(victim, new, root); - RB_CLEAR_NODE(victim); -} - -/* - * This is far too expensive to use regularly, but it's very helpful for - * discovering corruption after modifications to cached pages. - */ -static __attribute__((unused)) void verify_page_rbtree(struct rb_root *root) -{ - struct cached_item *item; - struct cached_page *par; - struct cached_page *pg; - struct cached_page *n; - char *reason = NULL; - struct rb_node *p; - int cmp; - - rbtree_postorder_for_each_entry_safe(pg, n, root, node) { - - item = NULL; - par = NULL; - - if (scoutfs_key_compare(&pg->start, &pg->end) > 0) { - reason = "start > end"; - break; - } - - item = first_item(&pg->item_root); - if (item && scoutfs_key_compare(&item->key, &pg->start) < 0) { - reason = "first item < start"; - break; - } - - item = last_item(&pg->item_root); - if (item && scoutfs_key_compare(&item->key, &pg->end) > 0) { - reason = "last item > end"; - break; - } - - p = rb_parent(&pg->node); - if (!p) - continue; - par = rb_entry(p, struct cached_page, node); - - cmp = scoutfs_key_compare_ranges(&pg->start, &pg->end, - &par->start, &par->end); - if (cmp == 0) { - reason = "parent and child overlap"; - break; - } - - if (par->node.rb_right == &pg->node && cmp < 0) { - reason = "right child < parent"; - break; - } - - if (par->node.rb_left == &pg->node && cmp > 0) { - reason = "left child > parent"; - break; - } - } - - if (!reason) - return; - - printk("bad item page rbtree: %s\n", reason); - printk("pg %p start "SK_FMT" end "SK_FMT"\n", - pg, SK_ARG(&pg->start), SK_ARG(&pg->end)); - if (par) - printk("par %p start "SK_FMT" end "SK_FMT"\n", - par, SK_ARG(&par->start), SK_ARG(&par->end)); - if (item) - printk("item %p key "SK_FMT"\n", item, SK_ARG(&item->key)); - - rbtree_postorder_for_each_entry_safe(pg, n, root, node) { - printk(" pg %p left %p right %p start "SK_FMT" end "SK_FMT"\n", - pg, - pg->node.rb_left ? rb_entry(pg->node.rb_left, - struct cached_page, node) : - NULL, - pg->node.rb_right ? rb_entry(pg->node.rb_right, - struct cached_page, node) : - NULL, - SK_ARG(&pg->start), - SK_ARG(&pg->end)); - } - - BUG(); -} - - -/* - * This lets us lock newly allocated pages without having to add nesting - * annotation. The non-acquired path is never executed. - */ -static void write_trylock_will_succeed(rwlock_t *rwlock) -__acquires(rwlock) -{ - while (!write_trylock(rwlock)) - BUG(); -} - -static struct cached_page *alloc_pg(struct super_block *sb, gfp_t gfp) -{ - struct cached_page *pg; - struct page *page; - - pg = kzalloc(sizeof(struct cached_page), GFP_NOFS | gfp); - page = alloc_page(GFP_NOFS | gfp); - if (!page || !pg) { - kfree(pg); - if (page) - __free_page(page); - return NULL; - } - - scoutfs_inc_counter(sb, item_page_alloc); - - RB_CLEAR_NODE(&pg->node); - rwlock_init(&pg->rwlock); - pg->item_root = RB_ROOT; - INIT_LIST_HEAD(&pg->lru_head); - INIT_LIST_HEAD(&pg->dirty_list); - INIT_LIST_HEAD(&pg->dirty_head); - pg->page = page; - atomic_set(&pg->refcount, 1); - - return pg; -} - -static void get_pg(struct cached_page *pg) -{ - atomic_inc(&pg->refcount); -} - -static void put_pg(struct super_block *sb, struct cached_page *pg) -{ - if (pg && atomic_dec_and_test(&pg->refcount)) { - scoutfs_inc_counter(sb, item_page_free); - - BUG_ON(!RB_EMPTY_NODE(&pg->node)); - BUG_ON(!list_empty(&pg->lru_head)); - BUG_ON(!list_empty(&pg->dirty_list)); - BUG_ON(!list_empty(&pg->dirty_head)); - - __free_page(pg->page); - kfree(pg); - } -} - -static void update_pg_max_seq(struct cached_page *pg, struct cached_item *item) -{ - if (item->seq > pg->max_seq) - pg->max_seq = item->seq; -} - -/* - * Allocate space for a new item from the free offset at the end of a - * cached page. This isn't a blocking allocation, and it's likely that - * the caller has ensured it will succeed by allocating from a new empty - * page or checking the free space first. - */ -static struct cached_item *alloc_item(struct cached_page *pg, +static struct cached_item *alloc_item(struct super_block *sb, struct scoutfs_key *key, u64 seq, bool deletion, void *val, int val_len) { struct cached_item *item; + int height; + int bytes; - if (!page_has_room(pg, val_len)) + height = scoutfs_cwskip_rand_height(); + bytes = item_alloc_bytes(height, val_len); + item = kmalloc(bytes, GFP_NOFS); + if (!item) return NULL; - item = page_address(pg->page) + pg->page_off; - pg->page_off += item_val_bytes(val_len); + item->node = (void *)item + sizeof(struct cached_item); + item->val = (void *)&item->node->links[height]; - RB_CLEAR_NODE(&item->node); INIT_LIST_HEAD(&item->dirty_head); + atomic64_set(&item->age, 0); + item->dirty = 0; item->persistent = 0; item->deletion = !!deletion; item->delta = 0; + item->negative = 0; + item->hole_after = 0; + item->alloc_bytes = bytes; item->val_len = val_len; + item->dirty_cpu = -1; item->key = *key; item->seq = seq; + item->node->height = height; + item->node->write_seq = 0; + /* insert initializes all node links */ + if (val_len) memcpy(item->val, val, val_len); - update_pg_max_seq(pg, item); + scoutfs_add_counter(sb, item_alloc_bytes, bytes); return item; } -static void erase_item(struct cached_page *pg, struct cached_item *item) +static void call_free_item(struct super_block *sb, struct cached_item *item) { - rbtree_erase(&item->node, &pg->item_root); - pg->erased_bytes += item_val_bytes(item->val_len); -} - -static void lru_add(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - spin_lock(&cinf->lru_lock); - if (list_empty(&pg->lru_head)) { - scoutfs_inc_counter(sb, item_page_lru_add); - list_add_tail(&pg->lru_head, &cinf->lru_list); - cinf->lru_pages++; - } - spin_unlock(&cinf->lru_lock); -} - -static void __lru_remove(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - if (!list_empty(&pg->lru_head)) { - scoutfs_inc_counter(sb, item_page_lru_remove); - list_del_init(&pg->lru_head); - cinf->lru_pages--; + if (item) { + scoutfs_add_counter(sb, item_free_bytes, item->alloc_bytes); + kfree_rcu(item, rcu_head); } } -static void lru_remove(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - spin_lock(&cinf->lru_lock); - __lru_remove(sb, cinf, pg); - spin_unlock(&cinf->lru_lock); -} +#define ITEM_AGE_NR_SHIFT 3 +#define ITEM_AGE_NR (1 << ITEM_AGE_NR_SHIFT) +#define ITEM_AGE_HALF (ITEM_AGE_NR / 2) +#define ITEM_AGE_MARK_BATCH (256 * 1024) +#define ITEM_AGE_MARK_SHIFT 62 +#define ITEM_AGE_MARK_MASK ((1ULL << ITEM_AGE_MARK_SHIFT) - 1) + /* - * Make sure that the page the caller just accessed is reasonably close - * to the tail of the lru so it will be less likely to be reclaimed by - * the shrinker. - * - * We want to quickly determine that the page is close enough to the - * tail by only looking at the page. We use a coarse clock tick to - * determine if we've already moved the head to the tail sufficiently - * recently. We can't differentiate shrinking priority amongst the - * number of pages that the cpu can access within given chunk of time. - * - * We don't care that the lru_time accessed aren't locked and could see - * rare corruption. It's just a shrink priority heuristic. + * Add the caller's per-cpu marked count for their age to the global + * marked count for the current age. If the current age advances we + * drop the caller's marked count. */ -static void lru_accessed(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) +static u64 add_global_age_marked(struct item_cache_info *cinf, u64 age_marked) { - unsigned long time = jiffies_to_msecs(jiffies); + u64 old; + u64 new; - scoutfs_inc_counter(sb, item_page_accessed); + do { + old = atomic64_read(&cinf->age_marked); + if ((old & ~ITEM_AGE_MARK_MASK) != + (age_marked & ~ITEM_AGE_MARK_MASK)) + return 0; - if (pg->lru_time != time) { - lru_remove(sb, cinf, pg); - pg->lru_time = time; - lru_add(sb, cinf, pg); - } + new = old + (age_marked & ITEM_AGE_MARK_MASK); + } while (atomic64_cmpxchg(&cinf->age_marked, old, new) != old); + + return new & ITEM_AGE_MARK_MASK; } /* - * Return the pg that contains the key and set the parent nodes for insertion. - * When we find the pg we go right so that the caller can insert a new - * page to the right of the found page if it had to split the page. + * Make sure that a recently accessed item is marked with the current + * age to protect it from shrink. We record the total bytes we've + * marked in per-cpu counters. If the per-cpu marked count crosses a + * threshold we combine it with a global count. If the global count + * exceeds an age's fraction of the total then we increment the current + * age to mark and clear the marking counts. + * + * The result is that recent ages will have roughly a (1/ages) fraction + * of the total bytes of the cache. That gets less and less true over + * time as the old ages have items removed. + * + * This is very far from perfect, but we don't need perfect. We need to + * avoid creating read storms by shrinking active items while also not + * creating global contention by tracking items. + * + * This has to be a little fiddly to avoid a marked batch count on a cpu + * for age N being added to the global total for age N+1. We mark the + * high bits of the marked totals with the low two bits of the current + * age. cmpxchg then stops the total for an old age being added to a + * different age, within a short distance. */ -static struct cached_page *page_rbtree_walk(struct super_block *sb, - struct rb_root *root, - struct scoutfs_key *start, - struct scoutfs_key *end, - struct cached_page **prev, - struct cached_page **next, - struct rb_node **par, - struct rb_node ***pnode) +static void mark_item_age(struct item_cache_info *cinf, struct cached_item *item) { - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct cached_page *ret = NULL; - struct cached_page *pg; - int cmp; + struct pcpu_age_counters *pac; + u64 old_age; + u64 marked; + u64 limit; + u64 age; + u64 old; + u64 new; + u64 was; + int cpu; - scoutfs_inc_counter(sb, item_page_rbtree_walk); + old_age = atomic64_read(&item->age); + age = atomic64_read(&cinf->current_age); + if (old_age == age || + atomic64_cmpxchg(&item->age, old_age, age) != old_age) + return; - if (next) - *next = NULL; - if (prev) - *prev = NULL; + pac = get_cpu_ptr(cinf->pcpu_age); - while (*node) { - parent = *node; - pg = container_of(*node, struct cached_page, node); + old = atomic64_read(&pac->age_marked); + marked = (old & ITEM_AGE_MARK_MASK) + item->alloc_bytes; + new = (age << ITEM_AGE_MARK_SHIFT) + marked; - cmp = scoutfs_key_compare_ranges(start, end, &pg->start, - &pg->end); - if (cmp < 0) { - if (next) - *next = pg; - node = &(*node)->rb_left; - } else if (cmp > 0) { - if (prev) - *prev = pg; - node = &(*node)->rb_right; - } else { - ret = pg; - node = &(*node)->rb_right; - } + /* bail on the only failure case when the age advances */ + was = atomic64_cmpxchg(&pac->age_marked, old, new); + put_cpu_ptr(cinf->pcpu_age); + if (was != old) + return; + + if (marked < ITEM_AGE_MARK_BATCH) + return; + + /* adding to the global retries unless the age changes */ + marked = add_global_age_marked(cinf, atomic64_read(&pac->age_marked)); + limit = atomic64_read(&cinf->age_total) >> ITEM_AGE_NR_SHIFT; + if (marked < limit) + return; + + age = atomic64_inc_return(&cinf->current_age); + atomic64_set(&cinf->age_marked, age << ITEM_AGE_MARK_SHIFT); + + for_each_online_cpu(cpu) { + atomic64_set(&pac->age_marked, age << ITEM_AGE_MARK_SHIFT); + atomic64_add(atomic64_xchg(&pac->total, 0), &cinf->age_total); } - - if (par) - *par = parent; - if (pnode) - *pnode = node; - - return ret; } -#define for_each_page_safe(root, pg, tmp) \ - for (tmp = rb_first(root); \ - tmp && (pg = container_of(tmp, struct cached_page, node)) && \ - ((tmp = rb_next(tmp)), 1); ) - -static struct cached_item *item_rbtree_walk(struct rb_root *root, - struct scoutfs_key *key, - struct cached_item **next, - struct rb_node **par, - struct rb_node ***pnode) +static void update_age_total(struct item_cache_info *cinf, int upd) { - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct cached_item *ret = NULL; - struct cached_item *item; - int cmp; + struct pcpu_age_counters *pac = get_cpu_ptr(cinf->pcpu_age); - if (next) - *next = NULL; - - while (*node) { - parent = *node; - item = container_of(*node, struct cached_item, node); - - cmp = scoutfs_key_compare(key, &item->key); - if (cmp < 0) { - if (next) - *next = item; - node = &(*node)->rb_left; - } else if (cmp > 0) { - node = &(*node)->rb_right; - } else { - ret = item; - node = &(*node)->rb_left; - } - } - - if (par) - *par = parent; - if (pnode) - *pnode = node; - - return ret; + atomic64_add((s64)upd, &pac->total); + put_cpu_ptr(cinf->pcpu_age); } -#define for_each_item_from_safe(root, item, tmp, key) \ - for (item = item_rbtree_walk(root, key, &tmp, NULL, NULL) ?: tmp; \ - item && ((tmp = next_item(item)), 1); \ - item = tmp) - -#define for_each_item_safe(root, item, tmp) \ - for (tmp = rb_first(root); \ - tmp && (item = container_of(tmp, struct cached_item, node)) && \ - ((tmp = rb_next(tmp)), 1); ) - /* - * As we mark the first and clear the last items in a page, we add or - * delete the page from the dirty list. The caller can give us a page - * to add the newly dirtied page after, rather than at the tail of the - * list. + * Dirty items have a particular usage pattern. Many cpus can be + * creating them at full speed, they're almost never removed, their + * total number is limited by the size of a commit, and they're + * committed while protected from modification. We track the dirty + * items in per-cpu lists to avoid contention. They're later spliced + * and sorted when it's time to write. + * + * We're still using a global atomic for the ease and precision. If it + * becomes a problem we can degrade it to fuzzier use of percpu + * counters. */ static void mark_item_dirty(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg, - struct cached_page *after, struct cached_item *item) { + struct pcpu_dirty_list *pdlist; + int cpu; + if (!item->dirty) { - if (list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_page_mark_dirty); - spin_lock(&cinf->dirty_lock); - if (after) - list_add(&pg->dirty_head, &after->dirty_head); - else - list_add_tail(&pg->dirty_head, - &cinf->dirty_list); - atomic_inc(&cinf->dirty_pages); - spin_unlock(&cinf->dirty_lock); - } + cpu = get_cpu(); + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + spin_lock(&pdlist->lock); + list_add_tail(&item->dirty_head, &pdlist->list); + item->dirty_cpu = cpu; + spin_unlock(&pdlist->lock); + put_cpu(); scoutfs_inc_counter(sb, item_mark_dirty); - list_add_tail(&item->dirty_head, &pg->dirty_list); + atomic64_add(item->alloc_bytes, &cinf->dirty_bytes); item->dirty = 1; } - - update_pg_max_seq(pg, item); } static void clear_item_dirty(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg, struct cached_item *item) { + struct pcpu_dirty_list *pdlist; + if (item->dirty) { - scoutfs_inc_counter(sb, item_clear_dirty); - item->dirty = 0; + pdlist = get_cpu_ptr(cinf->pcpu_dirty); + spin_lock(&pdlist->lock); list_del_init(&item->dirty_head); + item->dirty_cpu = -1; + spin_unlock(&pdlist->lock); + put_cpu_ptr(cinf->pcpu_dirty); - if (list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_page_clear_dirty); - spin_lock(&cinf->dirty_lock); - list_del_init(&pg->dirty_head); - atomic_dec(&cinf->dirty_pages); - spin_unlock(&cinf->dirty_lock); - } + scoutfs_inc_counter(sb, item_clear_dirty); + atomic64_sub(item->alloc_bytes, &cinf->dirty_bytes); + item->dirty = 0; } } -static void erase_page_items(struct cached_page *pg, - struct scoutfs_key *start, - struct scoutfs_key *end) -{ - struct cached_item *item; - struct cached_item *tmp; - - for_each_item_from_safe(&pg->item_root, item, tmp, start) { - - /* only called in unused read regions or read_pages pages */ - BUG_ON(item->dirty); - - if (scoutfs_key_compare(&item->key, end) > 0) - break; - - erase_item(pg, item); - } -} - -/* - * Move all the items starting from the key and stopping before moving - * the stop key. The right destination page must be empty. Items are - * copied in tree order which lets us easily insert after each previous - * item. - * - * This preserves dirty page and item ordering by adding the right page - * to the dirty list after the left page, and by adding items to the - * tail of right's dirty list in key sort order. - * - * The caller is responsible for page locking and managing the lru. - */ -static void move_page_items(struct super_block *sb, - struct item_cache_info *cinf, - struct cached_page *left, - struct cached_page *right, - struct scoutfs_key *key, - struct scoutfs_key *stop) -{ - struct cached_item *from; - struct cached_item *to; - struct cached_item *tmp; - struct rb_node **pnode; - struct rb_node *par; - - /* really empty right destination? */ - BUG_ON(!RB_EMPTY_ROOT(&right->item_root)); - par = NULL; - pnode = &right->item_root.rb_node; - - for_each_item_from_safe(&left->item_root, from, tmp, key) { - - if (stop && scoutfs_key_compare(&from->key, stop) >= 0) - break; - - to = alloc_item(right, &from->key, from->seq, from->deletion, from->val, - from->val_len); - rbtree_insert(&to->node, par, pnode, &right->item_root); - par = &to->node; - pnode = &to->node.rb_right; - - if (from->dirty) { - mark_item_dirty(sb, cinf, right, left, to); - clear_item_dirty(sb, cinf, left, from); - } - - to->persistent = from->persistent; - to->delta = from->delta; - - erase_item(left, from); - } -} - -enum page_intersection_type { - PGI_DISJOINT, - PGI_INSIDE, - PGI_START_OLAP, - PGI_END_OLAP, - PGI_BISECT_NEEDED, - PGI_BISECT, -}; - -/* - * Remove items from the page with intersect with the range. We return - * a code to indicate which kind of intersection occurred. The caller - * provides the right page to move items to if the page is bisected by - * the range. - * - * This modifies the page keys so it needs to be held with a write page - * rbtree lock if the page is in the page rbtree. - */ -static int trim_page_intersection(struct super_block *sb, - struct item_cache_info *cinf, - struct cached_page *pg, - struct cached_page *right, - struct scoutfs_key *start, - struct scoutfs_key *end) -{ - int ps_e = scoutfs_key_compare(&pg->start, end); - int pe_s = scoutfs_key_compare(&pg->end, start); - int ps_s; - int pe_e; - - /* - * page and range don't intersect - * - * ps |----------| pe - * s |----------| e - * (or) - * ps |----------| pe - * s |----------| e - */ - if (ps_e > 0 || pe_s < 0) - return PGI_DISJOINT; - - ps_s = scoutfs_key_compare(&pg->start, start); - pe_e = scoutfs_key_compare(&pg->end, end); - - /* - * page entirely inside range - * - * ps |----------| pe - * s |----------| e - */ - if (ps_s >= 0 && pe_e <= 0) - return PGI_INSIDE; - - /* - * page surrounds range, and is bisected by it - * - * ps |----------| pe - * s |------| e - */ - if (ps_s < 0 && pe_e > 0) { - if (!right) - return PGI_BISECT_NEEDED; - - right->start = *end; - scoutfs_key_inc(&right->start); - right->end = pg->end; - pg->end = *start; - scoutfs_key_dec(&pg->end); - erase_page_items(pg, start, end); - move_page_items(sb, cinf, pg, right, &right->start, NULL); - return PGI_BISECT; - } - - /* - * start of page overlaps with range - * - * ps |----------| pe - * s |----------| e - */ - if (pe_e > 0) { - /* start of page overlaps range */ - pg->start = *end; - scoutfs_key_inc(&pg->start); - erase_page_items(pg, start, end); - return PGI_START_OLAP; - } - - /* - * end of page overlaps with range - * - * ps |----------| pe - * s |----------| e - */ - pg->end = *start; - scoutfs_key_dec(&pg->end); - erase_page_items(pg, start, end); - return PGI_END_OLAP; -} - -/* - * The caller wants to allocate an item in the page but there isn't room - * at the page_off. If erasing items has left sufficient internal free - * space we can pack the existing items to the start of the page to make - * room for the insertion. - * - * The caller's empty pg is only used for its page struct, which we swap - * with our old empty page. We don't touch its pg struct. - * - * This is a coarse bulk way of dealing with free space, as opposed to - * specifically tracking internal free regions and using them to satisfy - * item allocations. - */ -static void compact_page_items(struct super_block *sb, - struct cached_page *pg, - struct cached_page *empty) -{ - struct cached_item *from; - struct cached_item *to; - struct rb_root item_root = RB_ROOT; - struct rb_node *par = NULL; - struct rb_node **pnode = &item_root.rb_node; - unsigned int page_off = 0; - LIST_HEAD(dirty_list); - - if (pg->erased_bytes < item_val_bytes(SCOUTFS_MAX_VAL_SIZE)) - return; - - if (WARN_ON_ONCE(empty->page_off != 0) || - WARN_ON_ONCE(!RB_EMPTY_ROOT(&empty->item_root)) || - WARN_ON_ONCE(!list_empty(&empty->dirty_list))) - return; - - scoutfs_inc_counter(sb, item_page_compact); - - for (from = first_item(&pg->item_root); from; from = next_item(from)) { - to = page_address(empty->page) + page_off; - page_off += item_val_bytes(from->val_len); - - /* copy the entire item, struct members and all */ - memcpy(to, from, item_val_bytes(from->val_len)); - - rbtree_insert(&to->node, par, pnode, &item_root); - par = &to->node; - pnode = &to->node.rb_right; - - if (to->dirty) - list_add_tail(&to->dirty_head, &dirty_list); - } - - pg->item_root = item_root; - list_replace(&dirty_list, &pg->dirty_list); - swap(pg->page, empty->page); - pg->page_off = page_off; - pg->erased_bytes = 0; -} - -/* - * This behaves a little differently than the other walks because we - * want to minimize compares and there are only simple searching and - * inserting callers. - */ -static struct pcpu_page_ref *pcpu_page_rbtree_walk(struct rb_root *root, - struct scoutfs_key *key, - struct pcpu_page_ref *ins) -{ - struct rb_node **node = &root->rb_node; - struct rb_node *parent = NULL; - struct pcpu_page_ref *ret = NULL; - struct pcpu_page_ref *ref; - int cmp; - - while (*node) { - parent = *node; - ref = container_of(*node, struct pcpu_page_ref, node); - - cmp = scoutfs_key_compare_ranges(key, key, - &ref->start, &ref->end); - if (cmp < 0) { - node = &(*node)->rb_left; - } else if (cmp > 0) { - node = &(*node)->rb_right; - } else { - ret = ref; - if (!ins) - return ret; - node = &(*node)->rb_right; - } - } - - if (ins) - rbtree_insert(&ins->node, parent, node, root); - - return ret; -} - -/* - * Search the per-cpu page references for a page that contains the key - * the caller needs. These lookups are very frequent and key - * comparisons are relatively expensive, so we use an rbtree to decrease - * the comparison costs, particularly of misses. - * - * All the references in all the cpus go stale as page key boundaries - * are modified by reading, insertion, and invalidation. If we find a - * stale ref we will drop it, but otherwise we let stale refs age out as - * new refs are inserted. - */ -static struct cached_page *get_pcpu_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_key *key, - bool write) -{ - struct item_percpu_pages *pages = get_cpu_ptr(cinf->pcpu_pages); - struct cached_page *pg = NULL; - struct pcpu_page_ref *ref; - - ref = pcpu_page_rbtree_walk(&pages->root, key, NULL); - if (ref) { - pg = ref->pg; - if (write) - write_lock(&pg->rwlock); - else - read_lock(&pg->rwlock); - - if (scoutfs_key_compare_ranges(key, key, - &pg->start, &pg->end)) { - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - - scoutfs_inc_counter(sb, item_pcpu_page_miss_keys); - rbtree_erase(&ref->node, &pages->root); - list_move_tail(&ref->head, &pages->list); - put_pg(sb, pg); - ref->pg = NULL; - pg = NULL; - } else { - if (pages->list.next != &ref->head) - list_move(&ref->head, &pages->list); - __release(pg_rwlock); - } - } - - put_cpu_ptr(cinf->pcpu_pages); - - if (pg) - scoutfs_inc_counter(sb, item_pcpu_page_hit); - else - scoutfs_inc_counter(sb, item_pcpu_page_miss); - - return pg; -} - -/* - * The caller has a locked page that it knows is authoritative for its - * range of keys. Add it to this cpu's cache and remove any other page - * in the pool which intersects with its range. - */ -static void add_pcpu_page(struct super_block *sb, struct item_cache_info *cinf, - struct cached_page *pg) -{ - struct item_percpu_pages *pages = get_cpu_ptr(cinf->pcpu_pages); - struct pcpu_page_ref *old; - struct pcpu_page_ref *ref; - - ref = list_last_entry(&pages->list, struct pcpu_page_ref, head); - if (ref->pg) { - rbtree_erase(&ref->node, &pages->root); - put_pg(sb, ref->pg); - } - ref->start = pg->start; - ref->end = pg->end; - ref->pg = pg; - get_pg(pg); - - list_move(&ref->head, &pages->list); - - old = pcpu_page_rbtree_walk(&pages->root, &ref->end, ref); - if (old) { - scoutfs_inc_counter(sb, item_pcpu_add_replaced); - rbtree_erase(&old->node, &pages->root); - list_move_tail(&old->head, &pages->list); - put_pg(sb, old->pg); - old->pg = NULL; - } - - put_cpu_ptr(cinf->pcpu_pages); -} - -/* - * If a page is removed from the page rbtree we clear its keys so that percpu - * references won't use the page and will drop their reference. Must be - * called with a write page rwlock. - */ -static void invalidate_pcpu_page(struct cached_page *pg) -{ - scoutfs_key_set_zeros(&pg->start); - scoutfs_key_set_zeros(&pg->end); -} - -static void init_pcpu_pages(struct item_cache_info *cinf, int cpu) -{ - struct item_percpu_pages *pages = per_cpu_ptr(cinf->pcpu_pages, cpu); - struct pcpu_page_ref *ref; - int i; - - pages->root = RB_ROOT; - INIT_LIST_HEAD(&pages->list); - - for (i = 0; i < ARRAY_SIZE(pages->refs); i++) { - ref = &pages->refs[i]; - - ref->pg = NULL; - list_add_tail(&ref->head, &pages->list); - } -} - -static void drop_pcpu_pages(struct super_block *sb, - struct item_cache_info *cinf, int cpu) -{ - struct item_percpu_pages *pages = per_cpu_ptr(cinf->pcpu_pages, cpu); - struct pcpu_page_ref *ref; - int i; - - for (i = 0; i < ARRAY_SIZE(pages->refs); i++) { - ref = &pages->refs[i]; - - if (ref->pg) - put_pg(sb, ref->pg); - ref->pg = NULL; - } - - pages->root = RB_ROOT; -} - -/* - * Set the keys of the destination pages of a split. We try to find the - * key which balances the space consumed by items in the resulting split - * pages. We move the split key to the right, setting the left end by - * decrementing that key. We bias towards advancing the left item first - * so that we don't use it and possibly decrementing the starting page - * key. We can't have a page that covers a single key. Callers of - * split should have tried compacting which ensures that if we split we - * must have multiple items, even if they all have the max value length. - */ -static void set_split_keys(struct cached_page *pg, struct cached_page *left, - struct cached_page *right) -{ - struct cached_item *left_item = first_item(&pg->item_root); - struct cached_item *right_item = last_item(&pg->item_root); - struct cached_item *mid; - int left_tot = 0; - int right_tot = 0; - - BUILD_BUG_ON((PAGE_SIZE / SCOUTFS_MAX_VAL_SIZE) < 4); - BUG_ON(scoutfs_key_compare(&pg->start, &pg->end) > 0); - BUG_ON(left_item == NULL); - BUG_ON(right_item == NULL); - BUG_ON(left_item == right_item); - - while (left_item && right_item && left_item != right_item) { - if (left_tot <= right_tot) { - left_tot += item_val_bytes(left_item->val_len); - left_item = next_item(left_item); - } else { - right_tot += item_val_bytes(right_item->val_len); - right_item = prev_item(right_item); - } - } - - mid = left_item ?: right_item; - - left->start = pg->start; - left->end = mid->key; - scoutfs_key_dec(&left->end); - right->start = mid->key; - right->end = pg->end; -} - -/* - * The caller found a page that didn't have room for the item they - * wanted to allocate. We allocate pages for the split and see if the - * page still needs splitting once we've locked it. - * - * To modify page keys we need a write lock on the page rbtree, which - * globally prevents reads from finding pages. We want to minimize this - * so we add empty pages with the split ranges to the rbtree and then - * perform the item motion only with the page locks held. This will - * exclude any users of the items in the affected range. - */ -static int try_split_page(struct super_block *sb, struct item_cache_info *cinf, - struct scoutfs_key *key, int val_len) -{ - struct cached_page *right; - struct cached_page *left; - struct cached_page *pg; - struct cached_item *item; - struct rb_node **pnode; - struct rb_node *par; - int ret; - - left = alloc_pg(sb, 0); - right = alloc_pg(sb, 0); - if (!left || !right) { - ret = -ENOMEM; - goto out; - } - - write_lock(&cinf->rwlock); - - pg = page_rbtree_walk(sb, &cinf->pg_root, key, key, NULL, NULL, - &par, &pnode); - if (pg == NULL) { - write_unlock(&cinf->rwlock); - ret = 0; - goto out; - } - - write_lock(&pg->rwlock); - - if (!page_has_room(pg, val_len)) - compact_page_items(sb, pg, left); - - if (page_has_room(pg, val_len)) { - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - ret = 0; - goto out; - } - - /* special case adding an empty page when key is after the last item */ - item = last_item(&pg->item_root); - if (scoutfs_key_compare(key, &item->key) > 0) { - right->start = *key; - right->end = pg->end; - pg->end = *key; - scoutfs_key_dec(&pg->end); - - write_trylock_will_succeed(&right->rwlock); - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, right); - - /* adding right first removes pg */ - add_pcpu_page(sb, cinf, right); - add_pcpu_page(sb, cinf, pg); - - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - write_unlock(&right->rwlock); - right = NULL; - ret = 0; - goto out; - } - - scoutfs_inc_counter(sb, item_page_split); - - /* pages are still private, tylock will succeed */ - write_trylock_will_succeed(&left->rwlock); - write_trylock_will_succeed(&right->rwlock); - - set_split_keys(pg, left, right); - - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - rbtree_replace_node(&pg->node, &left->node, &cinf->pg_root); - lru_remove(sb, cinf, pg); - - write_unlock(&cinf->rwlock); - - /* move items while only holding page locks, visible once unlocked */ - move_page_items(sb, cinf, pg, left, &left->start, &right->start); - lru_accessed(sb, cinf, left); - add_pcpu_page(sb, cinf, left); - write_unlock(&left->rwlock); - left = NULL; - - move_page_items(sb, cinf, pg, right, &right->start, NULL); - lru_accessed(sb, cinf, right); - add_pcpu_page(sb, cinf, right); - write_unlock(&right->rwlock); - right = NULL; - - /* and drop the source page, it was replaced above */ - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - put_pg(sb, pg); - - ret = 0; -out: - put_pg(sb, left); - put_pg(sb, right); - return ret; -} - -/* - * The caller has a write-only cluster lock and wants to populate the - * cache so that it can insert an item without reading. They found a - * hole but unlocked so we check again under the lock after allocating. - * We insert an empty page that covers the key and extends to either the - * neighbours or the caller's (lock's) range. - */ -static int cache_empty_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_key *key, struct scoutfs_key *start, - struct scoutfs_key *end) -{ - struct cached_page *prev; - struct cached_page *next; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; - - pg = alloc_pg(sb, 0); - if (!pg) - return -ENOMEM; - - write_lock(&cinf->rwlock); - - if (!page_rbtree_walk(sb, &cinf->pg_root, key, key, &prev, &next, - &par, &pnode)) { - pg->start = *start; - if (prev && scoutfs_key_compare(&prev->end, start) > 0) { - pg->start = prev->end; - scoutfs_key_inc(&pg->start); - } - - pg->end = *end; - if (next && scoutfs_key_compare(&next->start, end) < 0) { - pg->end = next->start; - scoutfs_key_dec(&pg->end); - } - - rbtree_insert(&pg->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, pg); - pg = NULL; - } - - write_unlock(&cinf->rwlock); - - put_pg(sb, pg); - - return 0; -} - /* * Readers operate independently from dirty items and transactions. * They read a set of persistent items and insert them into the cache @@ -1345,316 +432,241 @@ static void del_active_reader(struct item_cache_info *cinf, struct active_reader } /* - * Add a newly read item to the pages that we're assembling for - * insertion into the cache. These pages are private, they only exist - * on our root and aren't in dirty or lru lists. - * - * We need to store deletion items here as we read items from all the - * btrees so that they can override older items. The deletion items - * will be deleted before we insert the pages into the cache. We don't - * insert old versions of items into the tree here so that the trees - * don't have to compare seqs. + * Returns true if a direct item search ends in a cached region. We're + * only searching for one key so if we find it then its cached and can + * ignore the previous item. */ -static int read_page_item(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, - void *val, int val_len, int fic, void *arg) +static bool item_lookup_is_cached(int cmp, struct cached_item *prev) +{ + return cmp == 0 || (prev && !prev->hole_after); +} + +/* + * Returns true if an item search within a range only traversed cached + * regions. Once we're searching a range we can be advancing search + * keys and we need to know if the search we just performed was inside + * the range, or not. If we hit an item at the key inside the range + * but prev indicates a hole then we skipped over unknown uncached keys + * and we can't use the next item. + */ +static bool item_next_is_cached(bool first, int cmp, struct cached_item *prev) +{ + return (first && cmp == 0) || (prev && !prev->hole_after); +} + +/* The item is positive and is visible in the cache */ +static bool item_is_positive(struct cached_item *item) +{ + return item && !item->deletion && !item->negative; +} + +/* + * Track read items in a private list. Newer versions of items replace + * older. We keep deletion items here so that they replace older + * non-deletion items. Deletion items and items that are outside of + * the eventual range of keys read from all trees are dropped before + * being inserted. + */ +static int item_reader(struct super_block *sb, struct scoutfs_key *key, u64 seq, u8 flags, + void *val, int val_len, int fic, void *arg) { - DECLARE_ITEM_CACHE_INFO(sb, cinf); const bool deletion = !!(flags & SCOUTFS_ITEM_FLAG_DELETION); - struct rb_root *root = arg; - struct cached_page *right = NULL; - struct cached_page *left = NULL; - struct cached_page *pg; + struct scoutfs_cwskip_root *root = arg; + struct scoutfs_cwskip_writer wr; struct cached_item *found; struct cached_item *item; - struct rb_node *p_par; - struct rb_node *par; - struct rb_node **p_pnode; - struct rb_node **pnode; + int cmp; - pg = page_rbtree_walk(sb, root, key, key, NULL, NULL, &p_par, &p_pnode); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (found && (found->seq >= seq)) - return 0; + item = alloc_item(sb, key, seq, deletion, val, val_len); + if (!item) + return -ENOMEM; - if (!page_has_room(pg, val_len)) { - left = alloc_pg(sb, 0); - /* split needs multiple items, sparse may not have enough */ - if (!left) - return -ENOMEM; + scoutfs_cwskip_write_begin(root, key, item->node->height, + NULL, (void **)&found, &cmp, &wr); - compact_page_items(sb, pg, left); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, - &pnode); + if (cmp == 0 && (found->seq < seq)) { + /* remove existing if it's older */ + scoutfs_cwskip_write_remove(&wr, found->node); + call_free_item(sb, found); } - item = alloc_item(pg, key, seq, deletion, val, val_len); - if (!item) { - /* simpler split of private pages, no locking/dirty/lru */ - if (!left) - left = alloc_pg(sb, 0); - right = alloc_pg(sb, 0); - if (!left || !right) { - put_pg(sb, left); - put_pg(sb, right); - return -ENOMEM; - } - - scoutfs_inc_counter(sb, item_read_pages_split); - - set_split_keys(pg, left, right); - rbtree_insert(&right->node, p_par, p_pnode, root); - rbtree_replace_node(&pg->node, &left->node, root); - move_page_items(sb, cinf, pg, left, - &left->start, &right->start); - move_page_items(sb, cinf, pg, right, &right->start, NULL); - put_pg(sb, pg); - - pg = scoutfs_key_compare(key, &left->end) <= 0 ? left : right; - item = alloc_item(pg, key, seq, deletion, val, val_len); - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, - &pnode); - - left = NULL; - right = NULL; + if (cmp != 0 || (found->seq < seq)) { + /* insert read if first or newer */ + item->persistent = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + item = NULL; } - /* if deleted a deletion item will be required */ - item->persistent = 1; + scoutfs_cwskip_write_end(&wr); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - if (found) - erase_item(pg, found); + kfree(item); + return 0; +} - put_pg(sb, left); - put_pg(sb, right); +static int insert_missing_negative(struct super_block *sb, struct scoutfs_cwskip_root *root, + struct scoutfs_key *key) +{ + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + int cmp; + + item = alloc_item(sb, key, 0, false, NULL, 0); + if (!item) + return -ENOMEM; + + scoutfs_cwskip_write_begin(root, key, item->node->height, NULL, NULL, &cmp, &wr); + if (cmp != 0) { + item->negative = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + item = NULL; + } + scoutfs_cwskip_write_end(&wr); + + kfree(item); return 0; } /* - * The caller couldn't find a page that contains the key we're looking - * for. We combine a block's worth of items around the key in all the - * forest btrees and store them in pages. After filtering out deletions - * and duplicates, we insert any resulting pages which don't overlap - * with existing cached pages. + * Read items from persistent btrees and populate the cache around the + * key. * - * We only insert uncached regions because this is called with cluster - * locks held, but without locking the cache. The regions we read can - * be stale with respect to the current cache, which can be read and - * dirtied by other cluster lock holders on our node, but the cluster - * locks protect the stable items we read. Invalidation is careful not - * to drop pages that have items that we couldn't see because they were - * dirty when we started reading. + * The caller holds cluster locks which ensure that the persistent items + * aren't changing. The currently cached items might be dirty and more + * recent than the persistent items. We only insert read items into + * holes in the cache. * - * The forest item reader is reading stable trees that could be - * overwritten. It can return -ESTALE which we return to the caller who - * will retry the operation and work with a new set of more recent - * btrees. + * We read a single full block of items around the key from each btree. + * The intersection of these read key ranges is the range of consistent + * items that can be cached. Any items read outside of this range might + * be stale because their keys weren't read from all the btrees. We + * drop all the read items outside of the consistent range. + * + * The consistent key range can extend outside of the set of items read + * inside the range. We add negative cached items to mark the + * boundaries of the consistent range if we didn't read items right at + * the edges. + * + * Once we have a set of read items that covers the entire range we try + * to insert them into the cache. For each read item we iterate + * through cached items until we find the two cached items around it. + * If the read item falls in a hole in the cache then we insert it. We + * iterate over all cached items in the range, rather than just + * searching for the position of each read item, because we may need to + * clear hole_after between cached items. + * + * This is racing with all operations on the cache: item api calls, + * other readers, memory pressure, and lock invalidation. We are very + * careful to only atomically modify the cache one locked item pair at a + * time to ensure that cache is always consistent. */ -static int read_pages(struct super_block *sb, struct item_cache_info *cinf, +static int read_items(struct super_block *sb, struct item_cache_info *cinf, struct scoutfs_key *key, struct scoutfs_lock *lock) { - struct rb_root root = RB_ROOT; + struct scoutfs_cwskip_root root; INIT_ACTIVE_READER(active); - struct cached_page *right = NULL; - struct cached_page *pg; - struct cached_page *rd; + struct scoutfs_cwskip_writer cached_wr; + struct scoutfs_cwskip_writer wr; + struct cached_item *cached_prev; + struct cached_item *cached_item; struct cached_item *item; struct scoutfs_key start; struct scoutfs_key end; - struct scoutfs_key inf; - struct scoutfs_key edge; - struct rb_node **pnode; - struct rb_node *par; - struct rb_node *pg_tmp; - struct rb_node *item_tmp; - int pgi; + struct scoutfs_key pos; + bool drop_before; + bool drop_after; + bool first; + int cmp; int ret; - /* start with an empty page that covers the whole lock */ - pg = alloc_pg(sb, 0); - if (!pg) { - ret = -ENOMEM; - goto out; - } - pg->start = lock->start; - pg->end = lock->end; - rbtree_insert(&pg->node, NULL, &root.rb_node, &root); + /* read into an empty private root */ + scoutfs_cwskip_init_root(&root, key_item_cmp, sizeof(struct cached_item)); /* set active reader seq before reading persistent roots */ add_active_reader(sb, &active); start = lock->start; end = lock->end; - ret = scoutfs_forest_read_items(sb, key, &lock->start, &start, &end, read_page_item, &root); + ret = scoutfs_forest_read_items(sb, key, &lock->start, &start, &end, item_reader, &root); if (ret < 0) goto out; - /* clean up our read items and pages before locking */ - for_each_page_safe(&root, pg, pg_tmp) { + /* drop deleted items and items outside of the final consistent read range */ + drop_before = true; + drop_after = false; + scoutfs_cwskip_write_begin(&root, &lock->start, SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + do { + if (drop_before && scoutfs_key_compare(&item->key, &start) >= 0) + drop_before = false; + if (!drop_before && !drop_after && scoutfs_key_compare(&item->key, &end) > 0) + drop_after = true; - /* trim any items we read outside the read range */ - scoutfs_key_set_zeros(&inf); - edge = start; - scoutfs_key_dec(&edge); - pgi = trim_page_intersection(sb, cinf, pg, NULL, &inf, &edge); - if (pgi != PGI_INSIDE) { - scoutfs_key_set_ones(&inf); - edge = end; - scoutfs_key_inc(&edge); - pgi = trim_page_intersection(sb, cinf, pg, NULL, &edge, - &inf); - } - if (pgi == PGI_INSIDE) { - rbtree_erase(&pg->node, &root); - put_pg(sb, pg); - continue; + if (drop_before || item->deletion || drop_after) { + scoutfs_cwskip_write_remove(&wr, item->node); + call_free_item(sb, item); } - /* drop deletion items, we don't need them in the cache */ - for_each_item_safe(&pg->item_root, item, item_tmp) { - if (item->deletion) - erase_item(pg, item); - } - } + } while (scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); -retry: - write_lock(&cinf->rwlock); + /* add negative items at the ends of the range if needed */ + ret = insert_missing_negative(sb, &root, &start) ?: + insert_missing_negative(sb, &root, &end); + if (ret < 0) + goto out; - while ((rd = first_page(&root))) { + /* lock max height on our private list so _next always succeeds */ + pos = start; + first = true; + scoutfs_cwskip_write_begin(&root, &start, SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &pos, item->node->height, + (void **)&cached_prev, (void **)&cached_item, + NULL, &cached_wr); + do { + if (cached_item) + cmp = scoutfs_key_compare(&item->key, &cached_item->key); + else + cmp = -1; - pg = page_rbtree_walk(sb, &cinf->pg_root, &rd->start, &rd->end, - NULL, NULL, &par, &pnode); - if (!pg) { - /* insert read pages that don't intersect */ - rbtree_erase(&rd->node, &root); - rbtree_insert(&rd->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, rd); - trace_scoutfs_item_read_page(sb, key, &rd->start, - &rd->end); - continue; - } + if (cmp <= 0) { + /* check read item once its between cached items */ + scoutfs_cwskip_write_remove(&wr, item->node); - pgi = trim_page_intersection(sb, cinf, rd, right, &pg->start, - &pg->end); - if (pgi == PGI_INSIDE) { - rbtree_erase(&rd->node, &root); - put_pg(sb, rd); + /* insert into holes or drop and free */ + if (cmp < 0 && (!cached_prev || cached_prev->hole_after)) { + item->hole_after = 1; + scoutfs_cwskip_write_insert(&cached_wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_age(cinf, item); + } else { + call_free_item(sb, item); + } - } else if (pgi == PGI_BISECT_NEEDED) { - write_unlock(&cinf->rwlock); - right = alloc_pg(sb, 0); - if (!right) { - ret = -ENOMEM; - goto out; + /* always succeeds for our private list */ + scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item); } - goto retry; - } else if (pgi == PGI_BISECT) { - page_rbtree_walk(sb, &root, &right->start, &right->end, - NULL, NULL, &par, &pnode); - rbtree_insert(&right->node, par, pnode, &root); - right = NULL; - } - } + /* gaps after all cached prevs except the first are in the read range */ + if (!first && cached_prev && cached_prev->hole_after) + cached_prev->hole_after = 0; + first = false; - write_unlock(&cinf->rwlock); + pos = cached_item->key; + scoutfs_key_inc(&pos); + + } while (item && scoutfs_cwskip_write_next(&cached_wr, item->node->height, + (void **)&cached_prev, + (void **)&cached_item)); + scoutfs_cwskip_write_end(&cached_wr); + + } while (item); + scoutfs_cwskip_write_end(&wr); ret = 0; out: del_active_reader(cinf, &active); - - /* free any pages we left dangling on error */ - for_each_page_safe(&root, rd, pg_tmp) { - rbtree_erase(&rd->node, &root); - put_pg(sb, rd); - } - - put_pg(sb, right); - - return ret; -} - -/* - * Get a locked cached page for the caller to work with. This populates - * the cache on misses and can ensure that the locked page has enough - * room for an item allocation for the caller. Unfortunately, sparse - * doesn't seem to deal very well with the pattern of conditional lock - * acquisition. Callers manually add __acquire. - */ -static int get_cached_page(struct super_block *sb, - struct item_cache_info *cinf, - struct scoutfs_lock *lock, struct scoutfs_key *key, - bool write, bool alloc, int val_len, - struct cached_page **pg_ret) -{ - struct cached_page *pg = NULL; - struct rb_node **pnode; - struct rb_node *par; - int ret; - - if (WARN_ON_ONCE(alloc && !write)) - return -EINVAL; - - pg = get_pcpu_page(sb, cinf, key, write); - if (pg) { - __acquire(pg->rwlock); - if (!alloc || page_has_room(pg, val_len)) - goto found; - - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - pg = NULL; - } - -retry: - read_lock(&cinf->rwlock); - - pg = page_rbtree_walk(sb, &cinf->pg_root, key, key, NULL, NULL, - &par, &pnode); - if (pg == NULL) { - read_unlock(&cinf->rwlock); - if (lock->mode == SCOUTFS_LOCK_WRITE_ONLY) - ret = cache_empty_page(sb, cinf, key, &lock->start, - &lock->end); - else - ret = read_pages(sb, cinf, key, lock); - if (ret < 0 && ret != -ESTALE) - goto out; - goto retry; - } - - if (write) - write_lock(&pg->rwlock); - else - read_lock(&pg->rwlock); - - if (alloc && !page_has_room(pg, val_len)) { - read_unlock(&cinf->rwlock); - if (write) - write_unlock(&pg->rwlock); - else - read_unlock(&pg->rwlock); - - ret = try_split_page(sb, cinf, key, val_len); - if (ret < 0) - goto out; - goto retry; - } - - read_unlock(&cinf->rwlock); - - add_pcpu_page(sb, cinf, pg); -found: - __release(pg_rwlock); - lru_accessed(sb, cinf, pg); - ret = 0; -out: - if (ret < 0) - *pg_ret = NULL; - else - *pg_ret = pg; return ret; } @@ -1692,8 +704,11 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; + struct cached_item *prev; struct cached_item *item; - struct cached_page *pg; + bool valid; + int cmp; int ret; scoutfs_inc_counter(sb, item_lookup); @@ -1701,18 +716,24 @@ int scoutfs_item_lookup(struct super_block *sb, struct scoutfs_key *key, if ((ret = lock_safe(lock, key, SCOUTFS_LOCK_READ))) goto out; - ret = get_cached_page(sb, cinf, lock, key, false, false, 0, &pg); - if (ret < 0) - goto out; - __acquire(&pg->rwlock); + do { + scoutfs_cwskip_read_begin(&cinf->item_root, key, + (void **)&prev, (void **)&item, &cmp, &rd); - item = item_rbtree_walk(&pg->item_root, key, NULL, NULL, NULL); - if (!item || item->deletion) - ret = -ENOENT; - else - ret = copy_val(val, val_len, item->val, item->val_len); + if (!item_lookup_is_cached(cmp, prev)) + ret = -ERANGE; + else if (cmp != 0 || !item_is_positive(item)) + ret = -ENOENT; + else + ret = copy_val(val, val_len, item->val, item->val_len); + + valid = scoutfs_cwskip_read_valid(&rd); + if (valid && item) + mark_item_age(cinf, item); + + scoutfs_cwskip_read_end(&rd); + } while (!valid || (ret == -ERANGE || (ret = read_items(sb, cinf, key, lock)) == 0)); - read_unlock(&pg->rwlock); out: return ret; } @@ -1756,10 +777,13 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; struct cached_item *item; - struct cached_item *next; - struct cached_page *pg = NULL; + struct cached_item *prev; struct scoutfs_key pos; + struct scoutfs_key tmp; + bool first; + int cmp; int ret; scoutfs_inc_counter(sb, item_next); @@ -1768,50 +792,56 @@ int scoutfs_item_next(struct super_block *sb, struct scoutfs_key *key, if (scoutfs_key_compare(&lock->end, last) < 0) last = &lock->end; - if (scoutfs_key_compare(key, last) > 0) { - ret = -ENOENT; - goto out; - } + if (scoutfs_key_compare(key, last) > 0) + return -ENOENT; if ((ret = lock_safe(lock, key, SCOUTFS_LOCK_READ))) - goto out; + return ret; + first = true; pos = *key; + do { + scoutfs_cwskip_read_begin(&cinf->item_root, &pos, + (void **)&prev, (void **)&item, &cmp, &rd); + do { + if (!item_next_is_cached(first, cmp, prev)) { + ret = -ERANGE; - for (;;) { - ret = get_cached_page(sb, cinf, lock, &pos, false, false, 0, - &pg); - if (ret < 0) - goto out; - __acquire(&pg->rwlock); + } else if (!item || scoutfs_key_compare(&item->key, last) > 0) { + ret = -ENOENT; - item = item_rbtree_walk(&pg->item_root, &pos, &next, - NULL, NULL) ?: next; - while (item && scoutfs_key_compare(&item->key, last) <= 0) { - if (!item->deletion) { - *key = item->key; - ret = copy_val(val, val_len, item->val, - item->val_len); - goto unlock; + } else if (item_is_positive(item)) { + ret = copy_val(val, val_len, item->val, item->val_len); + tmp = item->key; + + } else { + tmp = item->key; + scoutfs_key_inc(&tmp); + ret = -ESRCH; } - item = next_item(item); + if (scoutfs_cwskip_read_valid(&rd)) { + pos = tmp; + first = false; + if (ret != -ESRCH && item) + mark_item_age(cinf, item); + } else { + ret = -ESRCH; + } + } while (ret == -ESRCH && + scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); + + if (ret == -ERANGE) { + ret = read_items(sb, cinf, &pos, lock); + if (ret == 0) + ret = -ESRCH; } - if (scoutfs_key_compare(&pg->end, last) >= 0) { - ret = -ENOENT; - goto unlock; - } + } while(ret == -ESRCH); - pos = pg->end; - read_unlock(&pg->rwlock); - - scoutfs_key_inc(&pos); - } - -unlock: - read_unlock(&pg->rwlock); -out: + if (ret >= 0) + *key = pos; return ret; } @@ -1833,16 +863,17 @@ static u64 item_seq(struct super_block *sb, struct scoutfs_lock *lock) /* * Mark the item dirty. Dirtying while holding a transaction pins the - * page holding the item and guarantees that the item can be deleted or - * updated (without increasing the value length) during the transaction - * without errors. + * item and guarantees that the item can be deleted or updated (without + * increasing the value length) during the transaction without errors. */ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_writer wr; struct cached_item *item; - struct cached_page *pg; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_dirty); @@ -1854,21 +885,22 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, false, 0, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, key, 0, + (void **)&prev, (void **)&item, &cmp, &wr); + if (!item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (cmp != 0 || !item_is_positive(item)) { + ret = -ENOENT; + } else { + item->seq = item_seq(sb, lock); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + ret = 0; + } + scoutfs_cwskip_write_end(&wr); + } while (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0)); - item = item_rbtree_walk(&pg->item_root, key, NULL, NULL, NULL); - if (!item || item->deletion) { - ret = -ENOENT; - } else { - item->seq = item_seq(sb, lock); - mark_item_dirty(sb, cinf, pg, NULL, item); - ret = 0; - } - - write_unlock(&pg->rwlock); out: return ret; } @@ -1884,11 +916,11 @@ static int item_create(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; struct cached_item *found; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_create); @@ -1900,33 +932,43 @@ static int item_create(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) + item = alloc_item(sb, key, seq, false, val, val_len); + if (!item) { + ret = -ENOMEM; goto out; - __acquire(pg->rwlock); - - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!force && found && !found->deletion) { - ret = -EEXIST; - goto unlock; } - item = alloc_item(pg, key, seq, false, val, val_len); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, key, item->node->height, + (void **)&prev, (void **)&found, &cmp, &wr); + if (!force && !item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (!force && cmp == 0 && item_is_positive(found)) { + ret = -EEXIST; + } else { + if (found) { + item->persistent = found->persistent; + clear_item_dirty(sb, cinf, found); + scoutfs_cwskip_write_remove(&wr, found->node); + update_age_total(cinf, -found->alloc_bytes); + call_free_item(sb, found); + } - if (found) { - item->persistent = found->persistent; - clear_item_dirty(sb, cinf, pg, found); - erase_item(pg, found); - } + if (force) + item->persistent = 1; + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + item = NULL; - if (force) - item->persistent = 1; + ret = 0; + } + scoutfs_cwskip_write_end(&wr); - ret = 0; -unlock: - write_unlock(&pg->rwlock); + } while (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0)); + + kfree(item); out: return ret; } @@ -1934,16 +976,14 @@ out: int scoutfs_item_create(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { - return item_create(sb, key, val, val_len, lock, - SCOUTFS_LOCK_READ, false); + return item_create(sb, key, val, val_len, lock, SCOUTFS_LOCK_READ, false); } int scoutfs_item_create_force(struct super_block *sb, struct scoutfs_key *key, void *val, int val_len, struct scoutfs_lock *lock) { - return item_create(sb, key, val, val_len, lock, - SCOUTFS_LOCK_WRITE_ONLY, true); + return item_create(sb, key, val, val_len, lock, SCOUTFS_LOCK_WRITE_ONLY, true); } /* @@ -1957,11 +997,13 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); - struct cached_item *item; + struct scoutfs_cwskip_writer wr; + struct cached_item *item = NULL; struct cached_item *found; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + bool need_alloc = false; + int lock_height; + int cmp; int ret; scoutfs_inc_counter(sb, item_update); @@ -1973,39 +1015,56 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); - - found = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!found || found->deletion) { - ret = -ENOENT; - goto unlock; - } - - if (val_len <= found->val_len) { - if (val_len) - memcpy(found->val, val, val_len); - if (val_len < found->val_len) - pg->erased_bytes += item_val_bytes(found->val_len) - - item_val_bytes(val_len); - found->val_len = val_len; - found->seq = seq; - mark_item_dirty(sb, cinf, pg, NULL, found); - } else { - item = alloc_item(pg, key, seq, false, val, val_len); - item->persistent = found->persistent; - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); - - clear_item_dirty(sb, cinf, pg, found); - erase_item(pg, found); - } - ret = 0; -unlock: - write_unlock(&pg->rwlock); + do { + if (need_alloc && !item) { + item = alloc_item(sb, key, seq, false, val, val_len); + if (!item) { + ret = -ENOMEM; + break; + } + lock_height = item->node->height; + } else { + lock_height = 0; + } + + scoutfs_cwskip_write_begin(&cinf->item_root, key, lock_height, + (void **)&prev, (void **)&found, &cmp, &wr); + if (!item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + } else if (cmp != 0 || !item_is_positive(found)) { + ret = -ENOENT; + } else { + if (val_len <= found->val_len) { + if (val_len) + memcpy(found->val, val, val_len); + found->val_len = val_len; + found->seq = seq; + mark_item_dirty(sb, cinf, found); + mark_item_age(cinf, item); + } else if (!item) { + need_alloc = true; + } else { + item->persistent = found->persistent; + + clear_item_dirty(sb, cinf, found); + scoutfs_cwskip_write_remove(&wr, found->node); + update_age_total(cinf, -found->alloc_bytes); + call_free_item(sb, found); + + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + item = NULL; + } + ret = 0; + } + scoutfs_cwskip_write_end(&wr); + + } while (need_alloc || (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0))); + + kfree(item); out: return ret; } @@ -2022,10 +1081,11 @@ int scoutfs_item_delta(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; + struct cached_item *alloc = NULL; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + int cmp; int ret; scoutfs_inc_counter(sb, item_delta); @@ -2037,47 +1097,56 @@ int scoutfs_item_delta(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, true, val_len, &pg); - if (ret < 0) + alloc = alloc_item(sb, key, seq, false, val, val_len); + if (!alloc) { + ret = -ENOMEM; goto out; - __acquire(pg->rwlock); + } - item = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (item) { + scoutfs_cwskip_write_begin(&cinf->item_root, key, alloc->node->height, + (void **)&prev, (void **)&item, &cmp, &wr); + if (cmp == 0) { if (!item->delta) { ret = -EIO; - goto unlock; + goto end; } ret = scoutfs_forest_combine_deltas(key, item->val, item->val_len, val, val_len); if (ret <= 0) { if (ret == 0) ret = -EIO; - goto unlock; + goto end; } if (ret == SCOUTFS_DELTA_COMBINED) { item->seq = seq; - mark_item_dirty(sb, cinf, pg, NULL, item); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); } else if (ret == SCOUTFS_DELTA_COMBINED_NULL) { - clear_item_dirty(sb, cinf, pg, item); - erase_item(pg, item); + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); } else { ret = -EIO; - goto unlock; + goto end; } ret = 0; } else { - item = alloc_item(pg, key, seq, false, val, val_len); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - mark_item_dirty(sb, cinf, pg, NULL, item); + item = alloc; + alloc = NULL; + + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); item->delta = 1; ret = 0; } - -unlock: - write_unlock(&pg->rwlock); +end: + scoutfs_cwskip_write_end(&wr); out: + kfree(alloc); return ret; } @@ -2094,10 +1163,13 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key, { DECLARE_ITEM_CACHE_INFO(sb, cinf); const u64 seq = item_seq(sb, lock); + struct scoutfs_cwskip_writer wr; + struct cached_item *alloc = NULL; struct cached_item *item; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; + struct cached_item *prev; + bool need_alloc = false; + int lock_height; + int cmp; int ret; scoutfs_inc_counter(sb, item_delete); @@ -2109,43 +1181,65 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key, if (ret < 0) goto out; - ret = get_cached_page(sb, cinf, lock, key, true, force, 0, &pg); - if (ret < 0) - goto out; - __acquire(pg->rwlock); - - item = item_rbtree_walk(&pg->item_root, key, NULL, &par, &pnode); - if (!force && (!item || item->deletion)) { - ret = -ENOENT; - goto unlock; - } - - if (!item) { - item = alloc_item(pg, key, seq, false, NULL, 0); - rbtree_insert(&item->node, par, pnode, &pg->item_root); - } - - if (force) - item->persistent = 1; - - if (!item->persistent) { - /* can just forget items that aren't yet persistent */ - clear_item_dirty(sb, cinf, pg, item); - erase_item(pg, item); - } else { - /* must emit deletion to clobber old persistent item */ - item->seq = seq; - item->deletion = 1; - pg->erased_bytes += item_val_bytes(item->val_len) - - item_val_bytes(0); - item->val_len = 0; - mark_item_dirty(sb, cinf, pg, NULL, item); - } - ret = 0; -unlock: - write_unlock(&pg->rwlock); + do { + if (need_alloc) { + need_alloc = false; + alloc = alloc_item(sb, key, seq, true, NULL, 0); + if (!item) { + ret = -ENOMEM; + goto out; + } + lock_height = alloc->node->height; + } else { + lock_height = 1; + } + + scoutfs_cwskip_write_begin(&cinf->item_root, key, lock_height, + (void **)&prev, (void **)&item, &cmp, &wr); + if (!force && !item_lookup_is_cached(cmp, prev)) { + ret = -ERANGE; + goto end; + } + if (!force && !item_is_positive(item)) { + ret = -ENOENT; + goto end; + } + + if (!item) { + if (!alloc) { + need_alloc = true; + goto end; + } + item = alloc; + alloc = NULL; + scoutfs_cwskip_write_insert(&wr, item->node); + update_age_total(cinf, item->alloc_bytes); + } + + if (force) + item->persistent = 1; + + if (!item->persistent) { + /* can just forget items that aren't yet persistent */ + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + } else { + /* must emit deletion to clobber old persistent item */ + item->seq = seq; + item->deletion = 1; + item->val_len = 0; + mark_item_dirty(sb, cinf, item); + mark_item_age(cinf, item); + } +end: + scoutfs_cwskip_write_end(&wr); + } while (need_alloc || (ret == -ERANGE && ((ret = read_items(sb, cinf, key, lock)) == 0))); + out: + kfree(alloc); return ret; } @@ -2161,22 +1255,14 @@ int scoutfs_item_delete_force(struct super_block *sb, struct scoutfs_key *key, return item_delete(sb, key, lock, SCOUTFS_LOCK_WRITE_ONLY, true); } -u64 scoutfs_item_dirty_pages(struct super_block *sb) +u64 scoutfs_item_dirty_bytes(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - return (u64)atomic_read(&cinf->dirty_pages); + return (u64)atomic64_read(&cinf->dirty_bytes); } -static int cmp_pg_start(void *priv, struct list_head *A, struct list_head *B) -{ - struct cached_page *a = list_entry(A, struct cached_page, dirty_head); - struct cached_page *b = list_entry(B, struct cached_page, dirty_head); - - return scoutfs_key_compare(&a->start, &b->start); -} - -static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) +static int cmp_dirty_item_key(void *priv, struct list_head *A, struct list_head *B) { struct cached_item *a = list_entry(A, struct cached_item, dirty_head); struct cached_item *b = list_entry(B, struct cached_item, dirty_head); @@ -2184,6 +1270,48 @@ static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) return scoutfs_key_compare(&a->key, &b->key); } +/* + * btree block insertion is iterating through the items in write_dirty's + * private list. The dirty items won't change. Each time we're called + * we return if we filled the descriptor with the current position and + * advance. + */ +static void *item_btree_iter_cb(struct super_block *sb, struct scoutfs_btree_item_desc *desc, + void *pos, void *arg) +{ + struct list_head *private_list = arg; + struct cached_item *item = pos; + + if (item == NULL) { + memset(desc, 0, sizeof(struct scoutfs_btree_item_desc)); + return NULL; + } + + desc->key = &item->key; + desc->seq = item->seq; + desc->flags = item->deletion ? SCOUTFS_ITEM_FLAG_DELETION : 0; + desc->val = item->val; + desc->val_len = item->val_len; + + if (item->dirty_head.next == private_list) + item = NULL; + else + item = list_next_entry(item, dirty_head); + + return item; +} + +static void splice_all_pcpu_dirty_lists(struct item_cache_info *cinf, struct list_head *list) +{ + struct pcpu_dirty_list *pdlist; + int cpu; + + for_each_online_cpu(cpu) { + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + list_splice_init(&pdlist->list, list); + } +} + /* * Write all the dirty items into dirty blocks in the forest of btrees. * If this succeeds then the dirty blocks can be submitted to commit @@ -2193,184 +1321,110 @@ static int cmp_item_key(void *priv, struct list_head *A, struct list_head *B) * dirty items have been written. * * This is called during transaction commit which prevents item writers - * from entering a transaction and dirtying items. The set of dirty - * items will be constant. - * - * But the pages that contain the dirty items can be changing. A - * neighbouring read lock can be invalidated and require bisecting a - * page, moving dirty items to a new page. That new page will be put - * after the original page on the dirty list. This will be done under - * the page rwlock and the global dirty_lock. - * - * We first sort the pages by their keys, then lock each page and copy - * its items into a private allocated singly-linked list of the items to - * dirty. Once we have that we can hand it off to the forest of btrees - * to write into items without causing any contention with other page - * users. + * from entering a transaction and modifying dirtying items. The dirty + * items will not be modified and no new dirty items will be added. + * We're the only user of the dirty lists. */ int scoutfs_item_write_dirty(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - struct scoutfs_btree_item_list *first; - struct scoutfs_btree_item_list **prev; - struct scoutfs_btree_item_list *lst; + struct pcpu_dirty_list *pdlist; struct cached_item *item; - struct cached_page *pg; - struct page *second = NULL; - struct page *page; - LIST_HEAD(pages); - LIST_HEAD(pos); - u64 max_seq = 0; - int bytes; - int off; + LIST_HEAD(list); + u64 max_seq; + int cpu; int ret; - if (atomic_read(&cinf->dirty_pages) == 0) - return 0; - scoutfs_inc_counter(sb, item_write_dirty); - /* sort page dirty list by keys */ - read_lock(&cinf->rwlock); - spin_lock(&cinf->dirty_lock); + /* gather all dirty items and sort by their key */ + splice_all_pcpu_dirty_lists(cinf, &list); + list_sort(NULL, &list, cmp_dirty_item_key); - /* sort cached pages by key, add our pos head */ - list_sort(NULL, &cinf->dirty_list, cmp_pg_start); - list_add(&pos, &cinf->dirty_list); - - read_unlock(&cinf->rwlock); - spin_unlock(&cinf->dirty_lock); - - page = alloc_page(GFP_NOFS); - if (!page) { - ret = -ENOMEM; - goto out; - } - list_add(&page->list, &pages); - - first = NULL; - prev = &first; - off = 0; - - while (!list_empty_careful(&pos)) { - if (!second) { - second = alloc_page(GFP_NOFS); - if (!second) { - ret = -ENOMEM; - goto out; - } - list_add(&second->list, &pages); - } - - /* read lock next sorted page, we're only dirty_list user */ - - spin_lock(&cinf->dirty_lock); - pg = list_entry(pos.next, struct cached_page, dirty_head); - if (!read_trylock(&pg->rwlock)) { - spin_unlock(&cinf->dirty_lock); - cpu_relax(); - continue; - } - spin_unlock(&cinf->dirty_lock); - - list_sort(NULL, &pg->dirty_list, cmp_item_key); - - list_for_each_entry(item, &pg->dirty_list, dirty_head) { - bytes = offsetof(struct scoutfs_btree_item_list, - val[item->val_len]); - max_seq = max(max_seq, item->seq); - - if (off + bytes > PAGE_SIZE) { - page = second; - second = NULL; - off = 0; - } - - lst = (void *)page_address(page) + off; - off += round_up(bytes, CACHED_ITEM_ALIGN); - - lst->next = NULL; - *prev = lst; - prev = &lst->next; - - lst->key = item->key; - lst->seq = item->seq; - lst->flags = item->deletion ? SCOUTFS_ITEM_FLAG_DELETION : 0; - lst->val_len = item->val_len; - memcpy(lst->val, item->val, item->val_len); - } - - spin_lock(&cinf->dirty_lock); - if (pg->dirty_head.next == &cinf->dirty_list) - list_del_init(&pos); - else - list_move(&pos, &pg->dirty_head); - spin_unlock(&cinf->dirty_lock); - - read_unlock(&pg->rwlock); - } + /* scan for the max seq, really seems like we could track this :/ */ + max_seq = 0; + list_for_each_entry(item, &list, dirty_head) + max_seq = max(max_seq, item->seq); /* store max item seq in forest's log_trees */ scoutfs_forest_set_max_seq(sb, max_seq); /* write all the dirty items into log btree blocks */ - ret = scoutfs_forest_insert_list(sb, first); -out: - list_for_each_entry_safe(page, second, &pages, list) { - list_del_init(&page->list); - __free_page(page); + item = list_first_entry_or_null(&list, struct cached_item, dirty_head); + ret = scoutfs_forest_insert_list(sb, item_btree_iter_cb, item, &list); + + /* return items to a pcpu list, we know ours exists :) */ + cpu = get_cpu(); + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + list_splice_init(&list, &pdlist->list); + list_for_each_entry(item, &pdlist->list, dirty_head) { + item->dirty_cpu = cpu; } + put_cpu(); return ret; } /* * The caller has successfully committed all the dirty btree blocks that - * contained the currently dirty items. Clear all the dirty items and - * pages. + * contained the currently dirty items. Clear all the dirty items. + * + * Deletion and delta items only existed to emit items into the btree + * logs. They aren't read from the item cache so once they're written + * we can remove them. + * + * The items in the private dirty list are still protected by being + * dirty and won't be removed from the main item list. For each item in + * the private list we search for it in the item list and remove it. + * We're likely to encounter runs of dirty items so we try iterating + * from our search position and clear as many dirty items as we can + * find. */ int scoutfs_item_write_done(struct super_block *sb) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_writer wr; + struct cached_item *found; struct cached_item *item; - struct cached_item *tmp; - struct cached_page *pg; + LIST_HEAD(list); + int cleared = 0; + int cmp; -retry: - spin_lock(&cinf->dirty_lock); + splice_all_pcpu_dirty_lists(cinf, &list); - while ((pg = list_first_entry_or_null(&cinf->dirty_list, - struct cached_page, - dirty_head))) { + while ((item = list_first_entry_or_null(&list, struct cached_item, dirty_head))) { - if (!write_trylock(&pg->rwlock)) { - spin_unlock(&cinf->dirty_lock); - cpu_relax(); - goto retry; - } + scoutfs_cwskip_write_begin(&cinf->item_root, &item->key, item->node->height, + NULL, (void **)&found, &cmp, &wr); + BUG_ON(cmp != 0 || found != item); + do { + if (!item->dirty) + break; - spin_unlock(&cinf->dirty_lock); - - list_for_each_entry_safe(item, tmp, &pg->dirty_list, - dirty_head) { - clear_item_dirty(sb, cinf, pg, item); + /* all dirty items are only on our private list */ + list_del_init(&item->dirty_head); + item->dirty = 0; + item->dirty_cpu = -1; + cleared++; if (item->delta) scoutfs_inc_counter(sb, item_delta_written); - /* free deletion items */ - if (item->deletion || item->delta) - erase_item(pg, item); - else + if (item->deletion || item->delta) { + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + } else { item->persistent = 1; - } + } - write_unlock(&pg->rwlock); + } while (scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); - spin_lock(&cinf->dirty_lock); + scoutfs_cwskip_write_end(&wr); } - spin_unlock(&cinf->dirty_lock); + scoutfs_add_counter(sb, item_clear_dirty, cleared); + atomic64_set(&cinf->dirty_bytes, 0); return 0; } @@ -2379,202 +1433,358 @@ retry: * Return true if the item cache covers the given range and set *dirty * to true if any items in the cached range are dirty. * - * This is relatively rarely called as locks are granted to make sure - * that we *don't* have existing cache covered by the lock which then - * must be inconsistent. Finding pages is the critical error case, - * under correct operation this will be a read locked walk of the page - * rbtree that doesn't find anything. + * This is called as locks are granted to make sure that we *don't* have + * existing cache covered by the lock which then must be inconsistent. + * Finding items is the critical error case. Under correct operation + * this will be a read search that doesn't find anything. + * + * The best way to think about searching for cached items is to see + * that the only way for there *not* to be cached items in the range is + * if there is a) no previous item before the start key or the previous + * item has hole_after set and b) there are no items in the range. If + * we see a prev with hole after, or any items within the end key, then + * the range is cached. */ bool scoutfs_item_range_cached(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end, bool *dirty) { DECLARE_ITEM_CACHE_INFO(sb, cinf); + struct scoutfs_cwskip_reader rd; + struct scoutfs_key pos = *start; + struct scoutfs_key rd_pos; struct cached_item *item; - struct cached_page *pg; - struct scoutfs_key pos; - bool cached; + struct cached_item *prev; + bool cached = false; + bool done = false; + bool rd_cached; + bool rd_dirty; + bool rd_done; - cached = false; *dirty = false; - pos = *start; + do { + scoutfs_cwskip_read_begin(&cinf->item_root, &pos, + (void **)&prev, (void **)&item, NULL, &rd); + do { + /* catches region starting with cache between items */ + rd_cached = prev && !prev->hole_after; - read_lock(&cinf->rwlock); + rd_dirty = false; + rd_done = false; + if (!item || scoutfs_key_compare(&item->key, end) > 0) { + rd_done = true; + } else { + rd_pos = item->key; + scoutfs_key_inc(&rd_pos); - while (!(*dirty) && scoutfs_key_compare(&pos, end) <= 0 && - (pg = page_rbtree_walk(sb, &cinf->pg_root, &pos, end, NULL, NULL, - NULL, NULL))) { - cached = true; - - read_lock(&pg->rwlock); - read_unlock(&cinf->rwlock); - - /* the dirty list isn't sorted :/ */ - list_for_each_entry(item, &pg->dirty_list, dirty_head) { - if (!scoutfs_key_compare_ranges(&item->key, &item->key, - start, end)) { - *dirty = true; - break; + rd_cached = true; + if (item->dirty) { + rd_dirty = true; + rd_done = true; + } } - } - pos = pg->end; - scoutfs_key_inc(&pos); + if (scoutfs_cwskip_read_valid(&rd)) { + pos = rd_pos; + cached |= rd_cached; + *dirty |= rd_dirty; + done |= rd_done; + } + } while (!done && scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); - read_unlock(&pg->rwlock); - read_lock(&cinf->rwlock); - } - - read_unlock(&cinf->rwlock); + } while (!done); return cached; } /* - * Remove the cached items in the given range. We drop pages that are - * fully inside the range and trim any pages that intersect it. This is - * being by locking for a lock that can't be used so there can't be item - * calls within the range. It can race with all our other page uses. + * Remove the cached items in the given range. This is called by lock + * invalidation which is preventing use of the lock while its + * invalidating. There can be no read or write item calls for the + * specific key range. There can be item calls working with the + * neighbouring items that we might reference while invalidating the + * edges of the range. This can be racing with memory pressure + * shrinking the cache. + * + * We have to remove the negative cached space covered by the range as + * well as the cached items themselves. This is done by setting + * hole_after in the item before items we remove. We can have to + * remove only a negative cached region so we have to do this when there + * isn't a referenced node after the key. */ void scoutfs_item_invalidate(struct super_block *sb, struct scoutfs_key *start, struct scoutfs_key *end) { DECLARE_ITEM_CACHE_INFO(sb, cinf); - struct cached_page *right = NULL; - struct cached_page *pg; - struct rb_node **pnode; - struct rb_node *par; - int pgi; + struct scoutfs_cwskip_writer wr; + struct scoutfs_key key = *start; + struct cached_item *prev; + struct cached_item *item; + bool first = true; + int cmp; scoutfs_inc_counter(sb, item_invalidate); -retry: - write_lock(&cinf->rwlock); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &key, 1, + (void **)&prev, (void **)&item, &cmp, &wr); + do { + if (!(first && cmp == 0) && prev && !prev->hole_after) + prev->hole_after = 1; + first = false; - while ((pg = page_rbtree_walk(sb, &cinf->pg_root, start, end, NULL, - NULL, &par, &pnode))) { + if (item) { + key = item->key; + scoutfs_key_inc(&key); + } else { + scoutfs_key_set_ones(&key); + } - scoutfs_inc_counter(sb, item_invalidate_page); + if (!item || scoutfs_key_compare(&item->key, end) > 0) + break; - write_lock(&pg->rwlock); + /* cluster locking must sync before invalidating */ + WARN_ON_ONCE(item->dirty); - pgi = trim_page_intersection(sb, cinf, pg, right, start, end); - trace_scoutfs_item_invalidate_page(sb, start, end, - &pg->start, &pg->end, pgi); - BUG_ON(pgi == PGI_DISJOINT); /* walk wouldn't ret disjoint */ + scoutfs_inc_counter(sb, item_invalidate_item); - if (pgi == PGI_INSIDE) { - /* free entirely invalidated page */ - lru_remove(sb, cinf, pg); - rbtree_erase(&pg->node, &cinf->pg_root); - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - put_pg(sb, pg); - continue; + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); - } else if (pgi == PGI_BISECT_NEEDED) { - /* allocate so we can bisect a larger page */ - write_unlock(&cinf->rwlock); - write_unlock(&pg->rwlock); - right = alloc_pg(sb, __GFP_NOFAIL); - goto retry; + } while (scoutfs_cwskip_write_next(&wr, 1, (void **)&prev, (void **)&item)); + scoutfs_cwskip_write_end(&wr); - } else if (pgi == PGI_BISECT) { - /* inv was entirely inside page, done after bisect */ - write_trylock_will_succeed(&right->rwlock); - rbtree_insert(&right->node, par, pnode, &cinf->pg_root); - lru_accessed(sb, cinf, right); - write_unlock(&right->rwlock); - write_unlock(&pg->rwlock); - right = NULL; - break; - } + } while (scoutfs_key_compare(&key, end) <= 0); +} - /* OLAP trimmed edge, keep searching */ - write_unlock(&pg->rwlock); - } - - write_unlock(&cinf->rwlock); - - put_pg(sb, right); +static bool can_shrink_item(struct cached_item *item, u64 shrink_age, u64 first_reader_seq) +{ + return item && + atomic64_read(&item->age) <= shrink_age && + item->seq < first_reader_seq && + !item->dirty; } /* - * Shrink the size the item cache. We're operating against the fast - * path lock ordering and we skip pages if we can't acquire locks. We - * can run into dirty pages or pages with items that weren't visible to - * the earliest active reader which must be skipped. + * Shrink the size the item cache. + * + * As items were accessed we tried to mark them with coarse age values + * that divide them into fractions of the total cached items. We have + * no specific indexing of items by age, instead we randomly search the + * list looking for items that are old enough to shrink. + * + * We cast a very wide net when searching for items that are old enough. + * If we searched for a precise small age window then the random + * searching has to do more work before it finds the ages it's looking + * for. Instead we only search for two broad age categories: either + * items that are older than the most recently accessed half of the + * items, or all items. This ensures that the random search will find + * items to shrink reasonably often. + * + * While we initially search to a random position in the list, we try to + * shrink contiguous runs of items. We choose a small size that is + * still larger than can be read and inserted in a single operation. + * The worst case would be to randomly free individual items leading to + * later reads that discard most of their items while inserting into a + * single item hole. + * + * All of this can go wrong. Access patterns can lead to weird age + * groupings, the cache can be entirely dirty, invalidation can remove + * the entire cache out from under us if the entire system is in one + * lock (a handful of enormous files in one inode group). This is all + * a best effort that stops when it has too many attempts that don't + * make progress. + * + * Finally, while we work with items the caller really cares about + * allocated pages. We track the bytes allocated to items and + * translate that to units of pages for the caller. We have no idea if + * our frees make up freed contiguous pages, and we're not really + * freeing items before returning, we're asking RCU to free later for + * us. So while we can return and tell the caller we freed our objects + * it's mostly a lie that we hope works out in the end. */ -static int item_lru_shrink(struct shrinker *shrink, - struct shrink_control *sc) +static int item_shrink(struct shrinker *shrink, struct shrink_control *sc) { - struct item_cache_info *cinf = container_of(shrink, - struct item_cache_info, - shrinker); +#define ITEM_SHRINK_SCAN_LIMIT (2 * SCOUTFS_BLOCK_LG_SIZE) +#define ITEM_SHRINK_ATTEMPT_LIMIT 64 + struct item_cache_info *cinf = container_of(shrink, struct item_cache_info, shrinker); struct super_block *sb = cinf->sb; - struct cached_page *tmp; - struct cached_page *pg; + struct scoutfs_cwskip_reader rd; + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + struct cached_item *prev; + struct scoutfs_key key; u64 first_reader_seq; - int nr; + s64 shrink_bytes; + u64 shrink_age; + u64 cur_age; + int attempts; + int scanned; + bool found; if (sc->nr_to_scan == 0) goto out; - nr = sc->nr_to_scan; + + scoutfs_inc_counter(sb, item_shrink); /* can't invalidate pages with items that weren't visible to first reader */ first_reader_seq = first_active_reader_seq(cinf); - write_lock(&cinf->rwlock); - spin_lock(&cinf->lru_lock); + shrink_bytes = (u64)sc->nr_to_scan << PAGE_SHIFT; - list_for_each_entry_safe(pg, tmp, &cinf->lru_list, lru_head) { - - if (first_reader_seq <= pg->max_seq) { - scoutfs_inc_counter(sb, item_shrink_page_reader); - continue; - } - - if (!write_trylock(&pg->rwlock)) { - scoutfs_inc_counter(sb, item_shrink_page_trylock); - continue; - } - - if (!list_empty(&pg->dirty_list)) { - scoutfs_inc_counter(sb, item_shrink_page_dirty); - write_unlock(&pg->rwlock); - continue; - } - - scoutfs_inc_counter(sb, item_shrink_page); - - __lru_remove(sb, cinf, pg); - rbtree_erase(&pg->node, &cinf->pg_root); - invalidate_pcpu_page(pg); - write_unlock(&pg->rwlock); - - put_pg(sb, pg); - - if (--nr == 0) - break; + /* can shrink oldest half if shrinking less than half, otherwise everything */ + cur_age = atomic64_read(&cinf->current_age); + if ((shrink_bytes < (atomic64_read(&cinf->age_total) >> 1)) && (cur_age > ITEM_AGE_NR)) { + shrink_age = cur_age - ITEM_AGE_HALF; + } else { + scoutfs_inc_counter(sb, item_shrink_all); + shrink_age = U64_MAX; } - write_unlock(&cinf->rwlock); - spin_unlock(&cinf->lru_lock); + attempts = 0; + + do { + attempts++; + + /* find the key of a shrink candidate */ + scoutfs_inc_counter(sb, item_shrink_read_search); + scanned = 0; + found = false; + scoutfs_cwskip_read_begin(&cinf->item_root, NULL, + (void **)&prev, (void **)&item, NULL, &rd); + do { + if (!item) { + if (!prev) + shrink_bytes = 0; + break; + } + + /* keys don't change */ + key = item->key; + + if (can_shrink_item(item, shrink_age, first_reader_seq)) { + found = true; + break; + } + + scoutfs_key_inc(&key); + scoutfs_inc_counter(sb, item_shrink_searched); + scanned += item->alloc_bytes; + + } while (scanned < ITEM_SHRINK_SCAN_LIMIT && + scoutfs_cwskip_read_next(&rd, (void **)&prev, (void **)&item)); + scoutfs_cwskip_read_end(&rd); + + if (!found) + continue; + + /* try to shrink items in a region after the key */ + scoutfs_inc_counter(sb, item_shrink_write_search); + scanned = 0; + scoutfs_cwskip_write_begin(&cinf->item_root, &key, 1, + (void **)&prev, (void **)&item, NULL, &wr); + do { + if (!item) + break; + + key = item->key; + scoutfs_key_inc(&key); + scanned += item->alloc_bytes; + + if (can_shrink_item(item, shrink_age, first_reader_seq)) { + scoutfs_inc_counter(sb, item_shrink_removed); + if (prev && !prev->hole_after) + prev->hole_after = 1; + scoutfs_cwskip_write_remove(&wr, item->node); + update_age_total(cinf, -item->alloc_bytes); + call_free_item(sb, item); + shrink_bytes -= item->alloc_bytes; + attempts = 0; + } else { + scoutfs_inc_counter(sb, item_shrink_skipped); + } + } while (shrink_bytes > 0 && scanned < ITEM_SHRINK_SCAN_LIMIT && + scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); + + } while (shrink_bytes > 0 && attempts < ITEM_SHRINK_ATTEMPT_LIMIT); + + if (attempts >= ITEM_SHRINK_ATTEMPT_LIMIT) + scoutfs_inc_counter(sb, item_shrink_exhausted); + out: - return min_t(unsigned long, cinf->lru_pages, INT_MAX); + return min_t(u64, atomic64_read(&cinf->age_total) >> PAGE_SHIFT, INT_MAX); +} + +/* + * Free all the items in batches so as not to overwhelm rcu. Only used + * during teardown when there must be no more item use. + */ +static void free_all_items(struct super_block *sb, struct item_cache_info *cinf) +{ + struct scoutfs_cwskip_writer wr; + struct cached_item *item; + struct scoutfs_key key; + int i; + + /* free items in batches of rcu critical sections */ + scoutfs_key_set_zeros(&key); + do { + scoutfs_cwskip_write_begin(&cinf->item_root, &key, + SCOUTFS_CWSKIP_MAX_HEIGHT, + NULL, (void **)&item, NULL, &wr); + if (!item) + break; + i = 0; + do { + clear_item_dirty(sb, cinf, item); + scoutfs_cwskip_write_remove(&wr, item->node); + call_free_item(sb, item); + } while (++i < 1024 && scoutfs_cwskip_write_next(&wr, 1, NULL, (void **)&item)); + scoutfs_cwskip_write_end(&wr); + + synchronize_rcu(); + } while (item); + + WARN_ON_ONCE(!scoutfs_cwskip_empty(&cinf->item_root)); } static int item_cpu_callback(struct notifier_block *nfb, unsigned long action, void *hcpu) { - struct item_cache_info *cinf = container_of(nfb, - struct item_cache_info, - notifier); - struct super_block *sb = cinf->sb; - unsigned long cpu = (unsigned long)hcpu; + struct item_cache_info *cinf = container_of(nfb, struct item_cache_info, notifier); + unsigned long dead_cpu = (unsigned long)hcpu; + struct pcpu_age_counters *pac; + struct pcpu_dirty_list *pdlist; + struct cached_item *item; + LIST_HEAD(list); + int our_cpu; - if (action == CPU_DEAD) - drop_pcpu_pages(sb, cinf, cpu); + if (action == CPU_DEAD) { + our_cpu = get_cpu(); + + /* age tracking */ + pac = per_cpu_ptr(cinf->pcpu_age, dead_cpu); + add_global_age_marked(cinf, atomic64_read(&pac->age_marked)); + atomic64_set(&pac->age_marked, 0); + atomic64_add(atomic64_xchg(&pac->total, 0), &cinf->age_total); + + /* dirty item lists */ + pdlist = per_cpu_ptr(cinf->pcpu_dirty, dead_cpu); + list_splice_init(&pdlist->list, &list); + + our_cpu = get_cpu(); + list_for_each_entry(item, &list, dirty_head) + item->dirty_cpu = our_cpu; + pdlist = per_cpu_ptr(cinf->pcpu_dirty, our_cpu); + spin_lock(&pdlist->lock); + list_splice_init(&list, &pdlist->list); + spin_unlock(&pdlist->lock); + + put_cpu(); + } return NOTIFY_OK; } @@ -2582,6 +1792,8 @@ static int item_cpu_callback(struct notifier_block *nfb, int scoutfs_item_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct pcpu_dirty_list *pdlist; + struct pcpu_age_counters *pac; struct item_cache_info *cinf; int cpu; @@ -2590,24 +1802,38 @@ int scoutfs_item_setup(struct super_block *sb) return -ENOMEM; cinf->sb = sb; - rwlock_init(&cinf->rwlock); - cinf->pg_root = RB_ROOT; - spin_lock_init(&cinf->dirty_lock); - INIT_LIST_HEAD(&cinf->dirty_list); - atomic_set(&cinf->dirty_pages, 0); - spin_lock_init(&cinf->lru_lock); - INIT_LIST_HEAD(&cinf->lru_list); + scoutfs_cwskip_init_root(&cinf->item_root, key_item_cmp, sizeof(struct cached_item)); + atomic64_set(&cinf->current_age, 1); + atomic64_set(&cinf->age_marked, 1ULL << ITEM_AGE_MARK_SHIFT); + atomic64_set(&cinf->age_total, 0); + atomic64_set(&cinf->dirty_bytes, 0); spin_lock_init(&cinf->active_lock); INIT_LIST_HEAD(&cinf->active_list); - cinf->pcpu_pages = alloc_percpu(struct item_percpu_pages); - if (!cinf->pcpu_pages) + cinf->pcpu_dirty = alloc_percpu(struct pcpu_dirty_list); + if (!cinf->pcpu_dirty) { + kfree(cinf); return -ENOMEM; + } - for_each_possible_cpu(cpu) - init_pcpu_pages(cinf, cpu); + cinf->pcpu_age = alloc_percpu(struct pcpu_age_counters); + if (!cinf->pcpu_age) { + kfree(cinf); + free_percpu(cinf->pcpu_dirty); + return -ENOMEM; + } - cinf->shrinker.shrink = item_lru_shrink; + for_each_possible_cpu(cpu) { + pac = per_cpu_ptr(cinf->pcpu_age, cpu); + pac->age_marked = cinf->age_marked; + atomic64_set(&pac->total, 0); + + pdlist = per_cpu_ptr(cinf->pcpu_dirty, cpu); + spin_lock_init(&pdlist->lock); + INIT_LIST_HEAD(&pdlist->list); + } + + cinf->shrinker.shrink = item_shrink; cinf->shrinker.seeks = DEFAULT_SEEKS; register_shrinker(&cinf->shrinker); cinf->notifier.notifier_call = item_cpu_callback; @@ -2624,9 +1850,6 @@ void scoutfs_item_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache_info *cinf = sbi->item_cache_info; - struct cached_page *tmp; - struct cached_page *pg; - int cpu; if (cinf) { BUG_ON(!list_empty(&cinf->active_list)); @@ -2634,18 +1857,10 @@ void scoutfs_item_destroy(struct super_block *sb) unregister_hotcpu_notifier(&cinf->notifier); unregister_shrinker(&cinf->shrinker); - for_each_possible_cpu(cpu) - drop_pcpu_pages(sb, cinf, cpu); - free_percpu(cinf->pcpu_pages); + free_all_items(sb, cinf); - rbtree_postorder_for_each_entry_safe(pg, tmp, &cinf->pg_root, - node) { - RB_CLEAR_NODE(&pg->node); - INIT_LIST_HEAD(&pg->lru_head); - INIT_LIST_HEAD(&pg->dirty_list); - INIT_LIST_HEAD(&pg->dirty_head); - put_pg(sb, pg); - } + free_percpu(cinf->pcpu_dirty); + free_percpu(cinf->pcpu_age); kfree(cinf); sbi->item_cache_info = NULL; diff --git a/kmod/src/item.h b/kmod/src/item.h index 431866d5..a2332945 100644 --- a/kmod/src/item.h +++ b/kmod/src/item.h @@ -26,7 +26,7 @@ int scoutfs_item_delete_force(struct super_block *sb, struct scoutfs_key *key, struct scoutfs_lock *lock); -u64 scoutfs_item_dirty_pages(struct super_block *sb); +u64 scoutfs_item_dirty_bytes(struct super_block *sb); int scoutfs_item_write_dirty(struct super_block *sb); int scoutfs_item_write_done(struct super_block *sb); bool scoutfs_item_range_cached(struct super_block *sb, diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 80db3247..22524fa8 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -403,24 +403,24 @@ TRACE_EVENT(scoutfs_sync_fs, ); TRACE_EVENT(scoutfs_trans_write_func, - TP_PROTO(struct super_block *sb, u64 dirty_block_bytes, u64 dirty_item_pages), + TP_PROTO(struct super_block *sb, u64 dirty_block_bytes, u64 dirty_item_bytes), - TP_ARGS(sb, dirty_block_bytes, dirty_item_pages), + TP_ARGS(sb, dirty_block_bytes, dirty_item_bytes), TP_STRUCT__entry( SCSB_TRACE_FIELDS __field(__u64, dirty_block_bytes) - __field(__u64, dirty_item_pages) + __field(__u64, dirty_item_bytes) ), TP_fast_assign( SCSB_TRACE_ASSIGN(sb); __entry->dirty_block_bytes = dirty_block_bytes; - __entry->dirty_item_pages = dirty_item_pages; + __entry->dirty_item_bytes = dirty_item_bytes; ), - TP_printk(SCSBF" dirty_block_bytes %llu dirty_item_pages %llu", - SCSB_TRACE_ARGS, __entry->dirty_block_bytes, __entry->dirty_item_pages) + TP_printk(SCSBF" dirty_block_bytes %llu dirty_item_bytes %llu", + SCSB_TRACE_ARGS, __entry->dirty_block_bytes, __entry->dirty_item_bytes) ); DECLARE_EVENT_CLASS(scoutfs_trans_hold_release_class, diff --git a/kmod/src/trans.c b/kmod/src/trans.c index 14e45c15..e22a58b6 100644 --- a/kmod/src/trans.c +++ b/kmod/src/trans.c @@ -207,7 +207,7 @@ void scoutfs_trans_write_func(struct work_struct *work) } trace_scoutfs_trans_write_func(sb, scoutfs_block_writer_dirty_bytes(sb, &tri->wri), - scoutfs_item_dirty_pages(sb)); + scoutfs_item_dirty_bytes(sb)); if (tri->deadline_expired) scoutfs_inc_counter(sb, trans_commit_timer); @@ -422,16 +422,18 @@ static void release_holders(struct super_block *sb) */ static bool commit_before_hold(struct super_block *sb, struct trans_info *tri) { + u64 dirty_blocks = (scoutfs_item_dirty_bytes(sb) >> SCOUTFS_BLOCK_LG_SHIFT) + 1; + /* - * In theory each dirty item page could be straddling two full - * blocks, requiring 4 allocations for each item cache page. - * That's much too conservative, typically many dirty item cache - * pages that are near each other all land in one block. This + * In theory each dirty item could be added to a full block that + * has to split, requiring 2 meta block allocs for each dirty + * item. That's much too conservative, typically many dirty + * items that are near each other all land in one block. This * rough estimate is still so far beyond what typically happens * that it accounts for having to dirty parent blocks and * whatever dirtying is done during the transaction hold. */ - if (scoutfs_alloc_meta_low(sb, &tri->alloc, scoutfs_item_dirty_pages(sb) * 2)) { + if (scoutfs_alloc_meta_low(sb, &tri->alloc, dirty_blocks * 4)) { scoutfs_inc_counter(sb, trans_commit_dirty_meta_full); return true; }