From 6bacd95aeac1a02f3f4e1f7b34386ac41997875e Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Wed, 19 Aug 2020 09:28:56 -0700 Subject: [PATCH] scoutfs: fs uses item cache instead of forest Use the new item cache for all the item work in the fs instead of calling into the forest of btrees. Most of this is mechanical conversion from the _forest calls to the _item calls. The item cache no longer supports the kvec argument for describing values so all the callers pass in the value pointer and length directly. The item cache doesn't support saving items as they're deleted and later restoring them from an error unwinding path. There were only two users of this. Directory entries can easily guarantee that deletion won't fail by dirtying the items first in the item cache. Xattr updates were a little trickier. They can combine dirtying, creating, updating, and deleting to atomically switch between items that describe different versions of a multi-item value. This also fixed a bug in the srch xattrs where replacing an xattr would create a new id for the xattr and leave existing srch items referencing a now deleted id. Replacing now reuses the old id. And finally we add back in the locking and transaction item cache integration. Signed-off-by: Zach Brown --- kmod/src/data.c | 23 ++++--- kmod/src/dir.c | 64 ++++++++++---------- kmod/src/inode.c | 41 +++++-------- kmod/src/ioctl.c | 3 +- kmod/src/lock.c | 50 ++++++++++++++++ kmod/src/trans.c | 9 ++- kmod/src/xattr.c | 152 +++++++++++++++++++++++++++++++++++++---------- 7 files changed, 236 insertions(+), 106 deletions(-) diff --git a/kmod/src/data.c b/kmod/src/data.c index ed42bb8a..9360b481 100644 --- a/kmod/src/data.c +++ b/kmod/src/data.c @@ -27,11 +27,10 @@ #include "inode.h" #include "key.h" #include "data.h" -#include "kvec.h" #include "trans.h" #include "counters.h" #include "scoutfs_trace.h" -#include "forest.h" +#include "item.h" #include "ioctl.h" #include "btree.h" #include "lock.h" @@ -323,7 +322,6 @@ static int load_unpacked_extents(struct super_block *sb, u64 ino, struct rb_node *parent; struct rb_node **node; void *buf = NULL; - struct kvec val; u64 prev_blkno; bool saw_final; int size; @@ -359,13 +357,16 @@ static int load_unpacked_extents(struct super_block *sb, u64 ino, for (p = 0; !saw_final; p++) { init_packed_extent_key(&key, ino, iblock, p); - kvec_init(&val, buf, SCOUTFS_PACKEXT_MAX_BYTES); /* maybe search for next initial item, lookup more parts */ if (p == 0 && last > iblock) - ret = scoutfs_forest_next(sb, &key, &end, &val, lock); + ret = scoutfs_item_next(sb, &key, &end, buf, + SCOUTFS_PACKEXT_MAX_BYTES, + lock); else - ret = scoutfs_forest_lookup(sb, &key, &val, lock); + ret = scoutfs_item_lookup(sb, &key, buf, + SCOUTFS_PACKEXT_MAX_BYTES, + lock); if (ret < 0) { if (p == 0 && ret == -ENOENT && empty_enoent) ret = 0; @@ -475,7 +476,6 @@ static int store_packed_extents(struct super_block *sb, u64 ino, struct unpacked_extent *final; struct unpacked_extent *ext; struct scoutfs_key key; - struct kvec val; void *buf = NULL; u64 prev_blkno; u64 iblock; @@ -491,7 +491,7 @@ static int store_packed_extents(struct super_block *sb, u64 ino, if (RB_EMPTY_ROOT(&unpe->extents)) { for (p = 0; p < unpe->existing_parts; p++) { init_packed_extent_key(&key, ino, unpe->iblock, p); - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); BUG_ON(ret); /* XXX inconsistent between parts */ } unpe->existing_parts = 0; @@ -544,11 +544,10 @@ static int store_packed_extents(struct super_block *sb, u64 ino, /* store full item or after packing final extent */ init_packed_extent_key(&key, ino, unpe->iblock, p); - kvec_init(&val, buf, size); if (p < unpe->existing_parts) - ret = scoutfs_forest_update(sb, &key, &val, lock); + ret = scoutfs_item_update(sb, &key, buf, size, lock); else - ret = scoutfs_forest_create(sb, &key, &val, lock); + ret = scoutfs_item_create(sb, &key, buf, size, lock); BUG_ON(ret); /* XXX inconsistent between parts */ pe = buf; @@ -560,7 +559,7 @@ static int store_packed_extents(struct super_block *sb, u64 ino, /* delete any remaining previous part items */ for (i = p; i < unpe->existing_parts; i++) { init_packed_extent_key(&key, ino, unpe->iblock, i); - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); BUG_ON(ret); /* XXX inconsistent between parts */ } diff --git a/kmod/src/dir.c b/kmod/src/dir.c index 89fda146..83ab48c2 100644 --- a/kmod/src/dir.c +++ b/kmod/src/dir.c @@ -27,8 +27,7 @@ #include "super.h" #include "trans.h" #include "xattr.h" -#include "kvec.h" -#include "forest.h" +#include "item.h" #include "lock.h" #include "hash.h" #include "counters.h" @@ -271,7 +270,6 @@ static int lookup_dirent(struct super_block *sb, u64 dir_ino, const char *name, struct scoutfs_key last_key; struct scoutfs_key key; struct scoutfs_dirent *dent = NULL; - struct kvec val; int ret; dent = alloc_dirent(SCOUTFS_NAME_LEN); @@ -282,10 +280,10 @@ static int lookup_dirent(struct super_block *sb, u64 dir_ino, const char *name, init_dirent_key(&key, SCOUTFS_DIRENT_TYPE, dir_ino, hash, 0); init_dirent_key(&last_key, SCOUTFS_DIRENT_TYPE, dir_ino, hash, U64_MAX); - kvec_init(&val, dent, dirent_bytes(SCOUTFS_NAME_LEN)); for (;;) { - ret = scoutfs_forest_next(sb, &key, &last_key, &val, lock); + ret = scoutfs_item_next(sb, &key, &last_key, dent, + dirent_bytes(SCOUTFS_NAME_LEN), lock); if (ret < 0) break; @@ -484,7 +482,6 @@ static int KC_DECLARE_READDIR(scoutfs_readdir, struct file *file, struct scoutfs_key key; struct scoutfs_key last_key; struct scoutfs_lock *dir_lock; - struct kvec val; int name_len; u64 pos; int ret; @@ -500,7 +497,6 @@ static int KC_DECLARE_READDIR(scoutfs_readdir, struct file *file, init_dirent_key(&last_key, SCOUTFS_READDIR_TYPE, scoutfs_ino(inode), SCOUTFS_DIRENT_LAST_POS, 0); - kvec_init(&val, dent, dirent_bytes(SCOUTFS_NAME_LEN)); ret = scoutfs_lock_inode(sb, SCOUTFS_LOCK_READ, 0, inode, &dir_lock); if (ret) @@ -510,7 +506,9 @@ static int KC_DECLARE_READDIR(scoutfs_readdir, struct file *file, init_dirent_key(&key, SCOUTFS_READDIR_TYPE, scoutfs_ino(inode), kc_readdir_pos(file, ctx), 0); - ret = scoutfs_forest_next(sb, &key, &last_key, &val, dir_lock); + ret = scoutfs_item_next(sb, &key, &last_key, dent, + dirent_bytes(SCOUTFS_NAME_LEN), + dir_lock); if (ret < 0) { if (ret == -ENOENT) ret = 0; @@ -567,7 +565,6 @@ static int add_entry_items(struct super_block *sb, u64 dir_ino, u64 hash, struct scoutfs_dirent *dent; bool del_ent = false; bool del_rdir = false; - struct kvec val; int ret; dent = alloc_dirent(name_len); @@ -586,25 +583,27 @@ static int add_entry_items(struct super_block *sb, u64 dir_ino, u64 hash, init_dirent_key(&ent_key, SCOUTFS_DIRENT_TYPE, dir_ino, hash, pos); init_dirent_key(&rdir_key, SCOUTFS_READDIR_TYPE, dir_ino, pos, 0); init_dirent_key(&lb_key, SCOUTFS_LINK_BACKREF_TYPE, ino, dir_ino, pos); - kvec_init(&val, dent, dirent_bytes(name_len)); - ret = scoutfs_forest_create(sb, &ent_key, &val, dir_lock); + ret = scoutfs_item_create(sb, &ent_key, dent, dirent_bytes(name_len), + dir_lock); if (ret) goto out; del_ent = true; - ret = scoutfs_forest_create(sb, &rdir_key, &val, dir_lock); + ret = scoutfs_item_create(sb, &rdir_key, dent, dirent_bytes(name_len), + dir_lock); if (ret) goto out; del_rdir = true; - ret = scoutfs_forest_create(sb, &lb_key, &val, inode_lock); + ret = scoutfs_item_create(sb, &lb_key, dent, dirent_bytes(name_len), + inode_lock); out: if (ret < 0) { if (del_ent) - scoutfs_forest_delete_dirty(sb, &ent_key); + scoutfs_item_delete(sb, &ent_key, dir_lock); if (del_rdir) - scoutfs_forest_delete_dirty(sb, &rdir_key); + scoutfs_item_delete(sb, &rdir_key, dir_lock); } kfree(dent); @@ -626,23 +625,20 @@ static int del_entry_items(struct super_block *sb, u64 dir_ino, u64 hash, struct scoutfs_key rdir_key; struct scoutfs_key ent_key; struct scoutfs_key lb_key; - LIST_HEAD(dir_saved); - LIST_HEAD(inode_saved); int ret; init_dirent_key(&ent_key, SCOUTFS_DIRENT_TYPE, dir_ino, hash, pos); init_dirent_key(&rdir_key, SCOUTFS_READDIR_TYPE, dir_ino, pos, 0); init_dirent_key(&lb_key, SCOUTFS_LINK_BACKREF_TYPE, ino, dir_ino, pos); - ret = scoutfs_forest_delete_save(sb, &ent_key, &dir_saved, dir_lock) ?: - scoutfs_forest_delete_save(sb, &rdir_key, &dir_saved, dir_lock) ?: - scoutfs_forest_delete_save(sb, &lb_key, &inode_saved, inode_lock); - if (ret < 0) { - scoutfs_forest_restore(sb, &dir_saved, dir_lock); - scoutfs_forest_restore(sb, &inode_saved, inode_lock); - } else { - scoutfs_forest_free_batch(sb, &dir_saved); - scoutfs_forest_free_batch(sb, &inode_saved); + ret = scoutfs_item_dirty(sb, &ent_key, dir_lock) ?: + scoutfs_item_dirty(sb, &rdir_key, dir_lock) ?: + scoutfs_item_dirty(sb, &lb_key, inode_lock); + if (ret == 0) { + ret = scoutfs_item_delete(sb, &ent_key, dir_lock) ?: + scoutfs_item_delete(sb, &rdir_key, dir_lock) ?: + scoutfs_item_delete(sb, &lb_key, inode_lock); + BUG_ON(ret); /* _dirty should have guaranteed success */ } return ret; @@ -1002,7 +998,6 @@ static int symlink_item_ops(struct super_block *sb, int op, u64 ino, size_t size) { struct scoutfs_key key; - struct kvec val; unsigned bytes; unsigned nr; int ret; @@ -1017,14 +1012,16 @@ static int symlink_item_ops(struct super_block *sb, int op, u64 ino, init_symlink_key(&key, ino, i); bytes = min_t(u64, size, SCOUTFS_MAX_VAL_SIZE); - kvec_init(&val, (void *)target, bytes); if (op == SYM_CREATE) - ret = scoutfs_forest_create(sb, &key, &val, lock); + ret = scoutfs_item_create(sb, &key, (void *)target, + bytes, lock); else if (op == SYM_LOOKUP) - ret = scoutfs_forest_lookup_exact(sb, &key, &val, lock); + ret = scoutfs_item_lookup_exact(sb, &key, + (void *)target, bytes, + lock); else if (op == SYM_DELETE) - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); if (ret) break; @@ -1239,7 +1236,6 @@ int scoutfs_dir_add_next_linkref(struct super_block *sb, u64 ino, struct scoutfs_key last_key; struct scoutfs_key key; struct scoutfs_lock *lock = NULL; - struct kvec val; int len; int ret; @@ -1255,13 +1251,13 @@ int scoutfs_dir_add_next_linkref(struct super_block *sb, u64 ino, init_dirent_key(&key, SCOUTFS_LINK_BACKREF_TYPE, ino, dir_ino, dir_pos); init_dirent_key(&last_key, SCOUTFS_LINK_BACKREF_TYPE, ino, U64_MAX, U64_MAX); - kvec_init(&val, &ent->dent, dirent_bytes(SCOUTFS_NAME_LEN)); ret = scoutfs_lock_ino(sb, SCOUTFS_LOCK_READ, 0, ino, &lock); if (ret) goto out; - ret = scoutfs_forest_next(sb, &key, &last_key, &val, lock); + ret = scoutfs_item_next(sb, &key, &last_key, &ent->dent, + dirent_bytes(SCOUTFS_NAME_LEN), lock); scoutfs_unlock(sb, lock, SCOUTFS_LOCK_READ); lock = NULL; if (ret < 0) diff --git a/kmod/src/inode.c b/kmod/src/inode.c index efab16fd..5d914159 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -30,8 +30,7 @@ #include "xattr.h" #include "trans.h" #include "msg.h" -#include "kvec.h" -#include "forest.h" +#include "item.h" #include "client.h" #include "cmp.h" @@ -283,7 +282,6 @@ int scoutfs_inode_refresh(struct inode *inode, struct scoutfs_lock *lock, struct super_block *sb = inode->i_sb; struct scoutfs_key key; struct scoutfs_inode sinode; - struct kvec val; const u64 refresh_gen = lock->refresh_gen; int ret; @@ -299,11 +297,11 @@ int scoutfs_inode_refresh(struct inode *inode, struct scoutfs_lock *lock, return 0; init_inode_key(&key, scoutfs_ino(inode)); - kvec_init(&val, &sinode, sizeof(sinode)); mutex_lock(&si->item_mutex); if (atomic64_read(&si->last_refreshed) < refresh_gen) { - ret = scoutfs_forest_lookup_exact(sb, &key, &val, lock); + ret = scoutfs_item_lookup_exact(sb, &key, &sinode, + sizeof(sinode), lock); if (ret == 0) { load_inode(inode, &sinode); atomic64_set(&si->last_refreshed, refresh_gen); @@ -759,15 +757,13 @@ int scoutfs_dirty_inode_item(struct inode *inode, struct scoutfs_lock *lock) struct super_block *sb = inode->i_sb; struct scoutfs_inode sinode; struct scoutfs_key key; - struct kvec val; int ret; store_inode(&sinode, inode); - kvec_init(&val, &sinode, sizeof(sinode)); init_inode_key(&key, scoutfs_ino(inode)); - ret = scoutfs_forest_update(sb, &key, &val, lock); + ret = scoutfs_item_update(sb, &key, &sinode, sizeof(sinode), lock); if (!ret) trace_scoutfs_dirty_inode(inode); return ret; @@ -899,7 +895,7 @@ static int update_index_items(struct super_block *sb, scoutfs_inode_init_index_key(&ins, type, major, minor, ino); ins_lock = find_index_lock(lock_list, type, major, minor, ino); - ret = scoutfs_forest_create_force(sb, &ins, NULL, ins_lock); + ret = scoutfs_item_create_force(sb, &ins, NULL, 0, ins_lock); if (ret || !will_del_index(si, type, major, minor)) return ret; @@ -911,9 +907,9 @@ static int update_index_items(struct super_block *sb, del_lock = find_index_lock(lock_list, type, si->item_majors[type], si->item_minors[type], ino); - ret = scoutfs_forest_delete_force(sb, &del, del_lock); + ret = scoutfs_item_delete_force(sb, &del, del_lock); if (ret) { - err = scoutfs_forest_delete(sb, &ins, ins_lock); + err = scoutfs_item_delete(sb, &ins, ins_lock); BUG_ON(err); } @@ -972,7 +968,6 @@ void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock, const u64 ino = scoutfs_ino(inode); struct scoutfs_key key; struct scoutfs_inode sinode; - struct kvec val; int ret; int err; @@ -988,9 +983,8 @@ void scoutfs_update_inode_item(struct inode *inode, struct scoutfs_lock *lock, BUG_ON(ret); init_inode_key(&key, ino); - kvec_init(&val, &sinode, sizeof(sinode)); - err = scoutfs_forest_update(sb, &key, &val, lock); + err = scoutfs_item_update(sb, &key, &sinode, sizeof(sinode), lock); if (err) { scoutfs_err(sb, "inode %llu update err %d", ino, err); BUG_ON(err); @@ -1265,7 +1259,7 @@ static int remove_index(struct super_block *sb, u64 ino, u8 type, u64 major, scoutfs_inode_init_index_key(&key, type, major, minor, ino); lock = find_index_lock(ind_locks, type, major, minor, ino); - ret = scoutfs_forest_delete_force(sb, &key, lock); + ret = scoutfs_item_delete_force(sb, &key, lock); if (ret == -ENOENT) ret = 0; return ret; @@ -1375,7 +1369,6 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir, struct scoutfs_key key; struct scoutfs_inode sinode; struct inode *inode; - struct kvec val; int ret; inode = new_inode(sb); @@ -1405,9 +1398,8 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir, store_inode(&sinode, inode); init_inode_key(&key, scoutfs_ino(inode)); - kvec_init(&val, &sinode, sizeof(sinode)); - ret = scoutfs_forest_create(sb, &key, &val, lock); + ret = scoutfs_item_create(sb, &key, &sinode, sizeof(sinode), lock); if (ret) { iput(inode); return ERR_PTR(ret); @@ -1435,7 +1427,7 @@ static int remove_orphan_item(struct super_block *sb, u64 ino) init_orphan_key(&key, sbi->rid, ino); - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); if (ret == -ENOENT) ret = 0; @@ -1457,7 +1449,6 @@ static int delete_inode_items(struct super_block *sb, u64 ino) struct scoutfs_key key; LIST_HEAD(ind_locks); bool release = false; - struct kvec val; umode_t mode; u64 ind_seq; u64 size; @@ -1468,9 +1459,9 @@ static int delete_inode_items(struct super_block *sb, u64 ino) return ret; init_inode_key(&key, ino); - kvec_init(&val, &sinode, sizeof(sinode)); - ret = scoutfs_forest_lookup_exact(sb, &key, &val, lock); + ret = scoutfs_item_lookup_exact(sb, &key, &sinode, sizeof(sinode), + lock); if (ret < 0) { if (ret == -ENOENT) ret = 0; @@ -1523,7 +1514,7 @@ retry: goto out; } - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); if (ret) goto out; @@ -1592,7 +1583,7 @@ int scoutfs_scan_orphans(struct super_block *sb) init_orphan_key(&last, sbi->rid, ~0ULL); while (1) { - ret = scoutfs_forest_next(sb, &key, &last, NULL, lock); + ret = scoutfs_item_next(sb, &key, &last, NULL, 0, lock); if (ret == -ENOENT) /* No more orphan items */ break; if (ret < 0) @@ -1626,7 +1617,7 @@ int scoutfs_orphan_inode(struct inode *inode) init_orphan_key(&key, sbi->rid, scoutfs_ino(inode)); - ret = scoutfs_forest_create(sb, &key, NULL, lock); + ret = scoutfs_item_create(sb, &key, NULL, 0, lock); return ret; } diff --git a/kmod/src/ioctl.c b/kmod/src/ioctl.c index 2bff5c48..932c4ce3 100644 --- a/kmod/src/ioctl.c +++ b/kmod/src/ioctl.c @@ -27,6 +27,7 @@ #include "ioctl.h" #include "super.h" #include "inode.h" +#include "item.h" #include "forest.h" #include "data.h" #include "client.h" @@ -110,7 +111,7 @@ static long scoutfs_ioc_walk_inodes(struct file *file, unsigned long arg) for (nr = 0; nr < walk.nr_entries; ) { - ret = scoutfs_forest_next(sb, &key, &last_key, NULL, lock); + ret = scoutfs_item_next(sb, &key, &last_key, NULL, 0, lock); if (ret < 0 && ret != -ENOENT) break; diff --git a/kmod/src/lock.c b/kmod/src/lock.c index fba8fef5..19413458 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -34,6 +34,7 @@ #include "client.h" #include "data.h" #include "xattr.h" +#include "item.h" /* * scoutfs uses a lock service to manage item cache consistency between @@ -195,6 +196,8 @@ retry: ino++; } } + + scoutfs_item_invalidate(sb, &lock->start, &lock->end); } return ret; @@ -570,6 +573,50 @@ static void queue_inv_work(struct lock_info *linfo) mod_delayed_work(linfo->workq, &linfo->inv_dwork, 0); } +/* + * The given lock is processing a received a grant response. Trigger a + * bug if the cache is inconsistent. + * + * We only have two modes that can create dirty items. We can't have + * dirty items when transitioning from write_only to write because the + * writer can't trust the cached items in the cache for reading. And we + * don't currently transition directly from write to write_only, we + * first go through null. So if we have dirty items as we're granted a + * mode it's always incorrect. + * + * And we can't have cached items that we're going to use for reading if + * the previous mode didn't allow reading. + * + * Inconsistencies have come from all sorts of bugs: invalidation missed + * items, the cache was populated outside of locking coverage, lock + * holders performed the wrong item operations under their lock, + * overlapping locks, out of order granting or invalidating, etc. + */ +static void bug_on_inconsistent_grant_cache(struct super_block *sb, + struct scoutfs_lock *lock, + int old_mode, int new_mode) +{ + bool cached; + bool dirty; + + cached = scoutfs_item_range_cached(sb, &lock->start, &lock->end, + &dirty); + if (dirty || + (cached && (!lock_mode_can_read(old_mode) || + !lock_mode_can_read(new_mode)))) { + scoutfs_err(sb, "granted lock item cache inconsistency, cached %u dirty %u old_mode %d new_mode %d: start "SK_FMT" end "SK_FMT" refresh_gen %llu mode %u waiters: rd %u wr %u wo %u users: rd %u wr %u wo %u", + cached, dirty, old_mode, new_mode, SK_ARG(&lock->start), + SK_ARG(&lock->end), lock->refresh_gen, lock->mode, + lock->waiters[SCOUTFS_LOCK_READ], + lock->waiters[SCOUTFS_LOCK_WRITE], + lock->waiters[SCOUTFS_LOCK_WRITE_ONLY], + lock->users[SCOUTFS_LOCK_READ], + lock->users[SCOUTFS_LOCK_WRITE], + lock->users[SCOUTFS_LOCK_WRITE_ONLY]); + BUG(); + } +} + /* * Each lock has received a grant response message from the server. * @@ -608,6 +655,9 @@ static void lock_grant_worker(struct work_struct *work) if (lock->mode != nl->old_mode) continue; + bug_on_inconsistent_grant_cache(sb, lock, nl->old_mode, + nl->new_mode); + if (!lock_mode_can_read(nl->old_mode) && lock_mode_can_read(nl->new_mode)) { lock->refresh_gen = diff --git a/kmod/src/trans.c b/kmod/src/trans.c index bd06503a..af659bd9 100644 --- a/kmod/src/trans.c +++ b/kmod/src/trans.c @@ -28,6 +28,7 @@ #include "radix.h" #include "block.h" #include "msg.h" +#include "item.h" #include "scoutfs_trace.h" /* @@ -169,7 +170,8 @@ void scoutfs_trans_write_func(struct work_struct *work) trace_scoutfs_trans_write_func(sb, scoutfs_block_writer_dirty_bytes(sb, &tri->wri)); - if (!scoutfs_block_writer_has_dirty(sb, &tri->wri)) { + if (!scoutfs_block_writer_has_dirty(sb, &tri->wri) && + !scoutfs_item_dirty_bytes(sb)) { if (sbi->trans_deadline_expired) { /* * If we're not writing data then we only advance the @@ -192,9 +194,11 @@ void scoutfs_trans_write_func(struct work_struct *work) /* XXX this all needs serious work for dealing with errors */ ret = (s = "data submit", scoutfs_inode_walk_writeback(sb, true)) ?: + (s = "item dirty", scoutfs_item_write_dirty(sb)) ?: (s = "meta write", scoutfs_block_writer_write(sb, &tri->wri)) ?: (s = "data wait", scoutfs_inode_walk_writeback(sb, false)) ?: (s = "commit log trees", commit_btrees(sb)) ?: + scoutfs_item_write_done(sb) ?: (s = "advance seq", scoutfs_client_advance_seq(sb, &trans_seq)) ?: (s = "get log trees", scoutfs_trans_get_log_trees(sb)); out: @@ -364,8 +368,7 @@ static bool acquired_hold(struct super_block *sb, vals = tri->reserved_vals + cnt->vals; /* XXX arbitrarily limit to 8 meg transactions */ - if (scoutfs_block_writer_dirty_bytes(sb, &tri->wri) >= - (8 * 1024 * 1024)) { + if (scoutfs_item_dirty_bytes(sb) >= (8 * 1024 * 1024)) { scoutfs_inc_counter(sb, trans_commit_full); queue_trans_work(sbi); goto out; diff --git a/kmod/src/xattr.c b/kmod/src/xattr.c index d7c9d112..4666eecd 100644 --- a/kmod/src/xattr.c +++ b/kmod/src/xattr.c @@ -20,7 +20,7 @@ #include "inode.h" #include "key.h" #include "super.h" -#include "kvec.h" +#include "item.h" #include "forest.h" #include "trans.h" #include "xattr.h" @@ -160,7 +160,6 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, { struct super_block *sb = inode->i_sb; struct scoutfs_key last; - struct kvec val; u8 last_part; int total; u8 part; @@ -183,8 +182,9 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, for (;;) { key->skx_part = part; - kvec_init(&val, (void *)xat + total, bytes - total); - ret = scoutfs_forest_next(sb, key, &last, &val, lock); + ret = scoutfs_item_next(sb, key, &last, + (void *)xat + total, bytes - total, + lock); if (ret < 0) { /* XXX corruption, ran out of parts */ if (ret == -ENOENT && part > 0) @@ -260,7 +260,6 @@ static int create_xattr_items(struct inode *inode, u64 id, struct scoutfs_key key; unsigned int part_bytes; unsigned int total; - struct kvec val; int ret; init_xattr_key(&key, scoutfs_ino(inode), @@ -271,12 +270,13 @@ static int create_xattr_items(struct inode *inode, u64 id, while (total < bytes) { part_bytes = min_t(unsigned int, bytes - total, SCOUTFS_XATTR_MAX_PART_SIZE); - kvec_init(&val, (void *)xat + total, part_bytes); - ret = scoutfs_forest_create(sb, &key, &val, lock); + ret = scoutfs_item_create(sb, &key, + (void *)xat + total, part_bytes, + lock); if (ret) { while (key.skx_part-- > 0) - scoutfs_forest_delete_dirty(sb, &key); + scoutfs_item_delete(sb, &key, lock); break; } @@ -288,24 +288,114 @@ static int create_xattr_items(struct inode *inode, u64 id, } /* - * Delete and save the items that make up the given xattr. If this - * returns an error then the deleted and saved items are left on the - * list for the caller to restore. + * Delete the items that make up the given xattr. If this returns an + * error then no items have been deleted. */ static int delete_xattr_items(struct inode *inode, u32 name_hash, u64 id, - u8 nr_parts, struct list_head *list, - struct scoutfs_lock *lock) + u8 nr_parts, struct scoutfs_lock *lock) { struct super_block *sb = inode->i_sb; struct scoutfs_key key; - int ret; + int ret = 0; + int i; init_xattr_key(&key, scoutfs_ino(inode), name_hash, id); - do { - ret = scoutfs_forest_delete_save(sb, &key, list, lock); - } while (ret == 0 && ++key.skx_part < nr_parts); + /* dirty additional existing old items */ + for (i = 1; i < nr_parts; i++) { + key.skx_part = i; + ret = scoutfs_item_dirty(sb, &key, lock); + if (ret) + goto out; + } + for (i = 0; i < nr_parts; i++) { + key.skx_part = i; + ret = scoutfs_item_delete(sb, &key, lock); + if (ret) + break; + } +out: + return ret; +} + +/* + * The caller needs to overwrite existing old xattr items with new + * items. We carefully stage the changes so that we can always unwind + * to the original items if we return an error. Both items have at + * least one part. Either the old or new can have more parts. We dirty + * and create first because we can always unwind those. We delete last + * after dirtying so that it can't fail and we don't have to restore the + * deleted items. + */ +static int change_xattr_items(struct inode *inode, u64 id, + struct scoutfs_xattr *new_xat, + unsigned int new_bytes, u8 new_parts, + u8 old_parts, struct scoutfs_lock *lock) +{ + struct super_block *sb = inode->i_sb; + struct scoutfs_key key; + int last_created = -1; + int bytes; + int off; + int i; + int ret; + + init_xattr_key(&key, scoutfs_ino(inode), + xattr_name_hash(new_xat->name, new_xat->name_len), id); + + /* dirty existing old items */ + for (i = 0; i < old_parts; i++) { + key.skx_part = i; + ret = scoutfs_item_dirty(sb, &key, lock); + if (ret) + goto out; + } + + /* create any new items past the old */ + for (i = old_parts; i < new_parts; i++) { + off = i * SCOUTFS_XATTR_MAX_PART_SIZE; + bytes = min_t(unsigned int, new_bytes - off, + SCOUTFS_XATTR_MAX_PART_SIZE); + + key.skx_part = i; + ret = scoutfs_item_create(sb, &key, (void *)new_xat + off, + bytes, lock); + if (ret) + goto out; + + last_created = i; + } + + /* update dirtied overlapping existing items, last partial first */ + for (i = old_parts - 1; i >= 0; i--) { + off = i * SCOUTFS_XATTR_MAX_PART_SIZE; + bytes = min_t(unsigned int, new_bytes - off, + SCOUTFS_XATTR_MAX_PART_SIZE); + + key.skx_part = i; + ret = scoutfs_item_update(sb, &key, (void *)new_xat + off, + bytes, lock); + /* only last partial can fail, then we unwind created */ + if (ret < 0) + goto out; + } + + /* delete any dirtied old items past new */ + for (i = new_parts; i < old_parts; i++) { + key.skx_part = i; + scoutfs_item_delete(sb, &key, lock); + } + + ret = 0; +out: + if (ret < 0) { + /* delete any newly created items */ + for (i = old_parts; i <= last_created; i++) { + key.skx_part = i; + scoutfs_item_delete(sb, &key, lock); + } + } return ret; } @@ -407,7 +497,6 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, struct prefix_tags tgs; bool undo_srch = false; LIST_HEAD(ind_locks); - LIST_HEAD(saved); u8 found_parts; unsigned int bytes; u64 ind_seq; @@ -478,7 +567,10 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, /* prepare our xattr */ if (value) { - id = si->next_xattr_id++; + if (found_parts) + id = le64_to_cpu(key.skx_id); + else + id = si->next_xattr_id++; xat->name_len = name_len; xat->val_len = cpu_to_le16(size); memcpy(xat->name, name, name_len); @@ -511,18 +603,17 @@ retry: undo_srch = true; } - ret = 0; - if (found_parts) + if (found_parts && value) + ret = change_xattr_items(inode, id, xat, bytes, + xattr_nr_parts(xat), found_parts, lck); + else if (found_parts) ret = delete_xattr_items(inode, le64_to_cpu(key.skx_name_hash), le64_to_cpu(key.skx_id), found_parts, - &saved, lck); - if (value && ret == 0) + lck); + else ret = create_xattr_items(inode, id, xat, bytes, lck); - if (ret < 0) { - scoutfs_forest_restore(sb, &saved, lck); + if (ret < 0) goto release; - } - scoutfs_forest_free_batch(sb, &saved); /* XXX do these want i_mutex or anything? */ inode_inc_iversion(inode); @@ -665,7 +756,6 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino, struct prefix_tags tgs; bool release = false; unsigned int bytes; - struct kvec val; u64 hash; int ret; @@ -681,8 +771,8 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino, init_xattr_key(&last, ino, U32_MAX, U64_MAX); for (;;) { - kvec_init(&val, (void *)xat, bytes); - ret = scoutfs_forest_next(sb, &key, &last, &val, lock); + ret = scoutfs_item_next(sb, &key, &last, (void *)xat, bytes, + lock); if (ret < 0) { if (ret == -ENOENT) ret = 0; @@ -698,7 +788,7 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino, break; release = true; - ret = scoutfs_forest_delete(sb, &key, lock); + ret = scoutfs_item_delete(sb, &key, lock); if (ret < 0) break;