diff --git a/kmod/src/count.h b/kmod/src/count.h new file mode 100644 index 00000000..921dea89 --- /dev/null +++ b/kmod/src/count.h @@ -0,0 +1,169 @@ +#ifndef _SCOUTFS_COUNT_H_ +#define _SCOUTFS_COUNT_H_ + +struct scoutfs_item_count { + signed items; + signed keys; + signed vals; +}; + +#define DECLARE_ITEM_COUNT(name) \ + struct scoutfs_item_count name = { 0, } + +/* + * Allocating an inode creates a new set of indexed items. + */ +static inline void scoutfs_count_alloc_inode(struct scoutfs_item_count *cnt) +{ + const int nr_indices = SCOUTFS_INODE_INDEX_NR; + + cnt->items += 1 + nr_indices; + cnt->keys += sizeof(struct scoutfs_inode_key) + + (nr_indices * sizeof(struct scoutfs_inode_index_key)); + cnt->vals += sizeof(struct scoutfs_inode); +} + +/* + * Dirtying an inode dirties the inode item and can delete and create + * the full set of indexed items. + */ +static inline void scoutfs_count_dirty_inode(struct scoutfs_item_count *cnt) +{ + const int nr_indices = 2 * SCOUTFS_INODE_INDEX_NR; + + cnt->items += 1 + nr_indices; + cnt->keys += sizeof(struct scoutfs_inode_key) + + (nr_indices * sizeof(struct scoutfs_inode_index_key)); + cnt->vals += sizeof(struct scoutfs_inode); +} + +/* + * Adding a dirent adds the entry key, readdir key, and backref. + */ +static inline void scoutfs_count_dirents(struct scoutfs_item_count *cnt, + unsigned name_len) +{ + + cnt->items += 3; + cnt->keys += offsetof(struct scoutfs_dirent_key, name[name_len]) + + sizeof(struct scoutfs_readdir_key) + + offsetof(struct scoutfs_link_backref_key, name[name_len]); + cnt->vals += 2 * offsetof(struct scoutfs_dirent, name[name_len]); +} + +static inline void scoutfs_count_sym_target(struct scoutfs_item_count *cnt, + unsigned size) +{ + + cnt->items += 1; + cnt->keys += sizeof(struct scoutfs_symlink_key); + cnt->vals += size; +} + +static inline void scoutfs_count_orphan(struct scoutfs_item_count *cnt) +{ + + cnt->items += 1; + cnt->keys += sizeof(struct scoutfs_orphan_key); +} + +static inline void scoutfs_count_mknod(struct scoutfs_item_count *cnt, + unsigned name_len) +{ + scoutfs_count_alloc_inode(cnt); + scoutfs_count_dirents(cnt, name_len); + scoutfs_count_dirty_inode(cnt); +} + +static inline void scoutfs_count_link(struct scoutfs_item_count *cnt, + unsigned name_len) +{ + scoutfs_count_dirents(cnt, name_len); + scoutfs_count_dirty_inode(cnt); + scoutfs_count_dirty_inode(cnt); +} + +/* + * Unlink can add orphan items. + */ +static inline void scoutfs_count_unlink(struct scoutfs_item_count *cnt, + unsigned name_len) +{ + scoutfs_count_dirents(cnt, name_len); + scoutfs_count_dirty_inode(cnt); + scoutfs_count_dirty_inode(cnt); + scoutfs_count_orphan(cnt); +} + +static inline void scoutfs_count_symlink(struct scoutfs_item_count *cnt, + unsigned name_len, unsigned size) +{ + scoutfs_count_mknod(cnt, name_len); + scoutfs_count_sym_target(cnt, size); +} + +/* + * Setting an xattr can create a full set of items for an xattr with a + * max name and length. Any existing items will be dirtied rather than + * deleted so we won't have more items than a max xattr's worth. + */ +static inline void scoutfs_count_xattr_set(struct scoutfs_item_count *cnt, + unsigned name_len, unsigned size) +{ + unsigned parts = DIV_ROUND_UP(size, SCOUTFS_XATTR_PART_SIZE); + + scoutfs_count_dirty_inode(cnt); + + cnt->items += parts; + cnt->keys += parts * (offsetof(struct scoutfs_xattr_key, + name[name_len]) + + sizeof(struct scoutfs_xattr_key_footer)); + cnt->vals += parts * (sizeof(struct scoutfs_xattr_val_header) + + SCOUTFS_XATTR_PART_SIZE); +} + +/* + * Both insertion and removal modifications can dirty three extents + * at most: insertion can delete two existing neighbours and create a + * third new extent and removal can delete an existing extent and create + * two new remaining extents. + */ +static inline void scoutfs_count_extents(struct scoutfs_item_count *cnt, + unsigned nr_mod, unsigned sz) +{ + + cnt->items += nr_mod * 3; + cnt->keys += (nr_mod * 3) * sz; +} + +/* + * write_begin can refill local free extents after a bulk alloc rpc, + * alloc an block, delete an offline mapping, and insert the new allocated + * mapping. + */ +static inline void scoutfs_count_write_begin(struct scoutfs_item_count *cnt) +{ + BUILD_BUG_ON(sizeof(struct scoutfs_free_extent_blkno_key) != + sizeof(struct scoutfs_free_extent_blocks_key)); + + scoutfs_count_dirty_inode(cnt); + + scoutfs_count_extents(cnt, 2 * (SCOUTFS_BULK_ALLOC_COUNT + 1), + sizeof(struct scoutfs_free_extent_blkno_key)); + scoutfs_count_extents(cnt, 2, + sizeof(struct scoutfs_file_extent_key)); +} + +/* + * Truncating a block can free an allocated block, delete an online + * mapping, and create an offline mapping. + */ +static inline void scoutfs_count_trunc_block(struct scoutfs_item_count *cnt) +{ + scoutfs_count_extents(cnt, 2 * 1, + sizeof(struct scoutfs_free_extent_blkno_key)); + scoutfs_count_extents(cnt, 2, + sizeof(struct scoutfs_file_extent_key)); +} + +#endif diff --git a/kmod/src/data.c b/kmod/src/data.c index 76cdd2a5..77dc5f98 100644 --- a/kmod/src/data.c +++ b/kmod/src/data.c @@ -510,8 +510,9 @@ out: * If 'offline' is given then blocks are freed but the extent items are * left behind and their _OFFLINE flag is set. * - * This is the low level extent item manipulation code. Callers manage - * higher order locking and transactional consistency. + * This is the low level extent item manipulation code. We hold and + * release the transaction so the caller doesn't have to deal with + * partial progress. */ int scoutfs_data_truncate_items(struct super_block *sb, u64 ino, u64 iblock, u64 len, bool offline) @@ -526,8 +527,10 @@ int scoutfs_data_truncate_items(struct super_block *sb, u64 ino, u64 iblock, struct native_extent ext; struct native_extent ofl; struct native_extent fr; + DECLARE_ITEM_COUNT(cnt); bool rem_fr = false; bool ins_ext = false; + bool holding = false; int ret = 0; int err; @@ -588,6 +591,12 @@ int scoutfs_data_truncate_items(struct super_block *sb, u64 ino, u64 iblock, if (offline && (ext.flags & SCOUTFS_FILE_EXTENT_OFFLINE)) continue; + scoutfs_count_trunc_block(&cnt); + ret = scoutfs_hold_trans(sb, &cnt); + if (ret) + break; + holding = true; + /* free the old extent if it was allocated */ if (ext.blkno) { fr = ext; @@ -618,8 +627,13 @@ int scoutfs_data_truncate_items(struct super_block *sb, u64 ino, u64 iblock, rem_fr = false; ins_ext = false; + scoutfs_release_trans(sb); + holding = false; } + if (holding) + scoutfs_release_trans(sb); + if (ret) { if (ins_ext) { err = insert_extent(sb, &ext, ino, @@ -1034,12 +1048,14 @@ static int scoutfs_write_begin(struct file *file, { struct inode *inode = mapping->host; struct super_block *sb = inode->i_sb; + DECLARE_ITEM_COUNT(cnt); int ret; trace_printk("ino %llu pos %llu len %u\n", scoutfs_ino(inode), (u64)pos, len); - ret = scoutfs_hold_trans(sb); + scoutfs_count_write_begin(&cnt); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) goto out; diff --git a/kmod/src/dir.c b/kmod/src/dir.c index ce579e8c..0d77d95a 100644 --- a/kmod/src/dir.c +++ b/kmod/src/dir.c @@ -455,6 +455,7 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t rdev) { struct super_block *sb = dir->i_sb; + DECLARE_ITEM_COUNT(cnt); struct inode *inode; int ret; @@ -462,7 +463,8 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, if (ret) return ret; - ret = scoutfs_hold_trans(sb); + scoutfs_count_mknod(&cnt, dentry->d_name.len); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) return ret; @@ -515,6 +517,7 @@ static int scoutfs_link(struct dentry *old_dentry, { struct inode *inode = old_dentry->d_inode; struct super_block *sb = dir->i_sb; + DECLARE_ITEM_COUNT(cnt); int ret; if (inode->i_nlink >= SCOUTFS_LINK_MAX) @@ -524,7 +527,8 @@ static int scoutfs_link(struct dentry *old_dentry, if (ret) return ret; - ret = scoutfs_hold_trans(sb); + scoutfs_count_link(&cnt, dentry->d_name.len); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) return ret; @@ -559,12 +563,14 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry) struct scoutfs_key_buf *keys[3] = {NULL,}; struct scoutfs_key_buf rdir_key; struct scoutfs_readdir_key rkey; + DECLARE_ITEM_COUNT(cnt); int ret = 0; if (S_ISDIR(inode->i_mode) && i_size_read(inode)) return -ENOTEMPTY; - ret = scoutfs_hold_trans(sb); + scoutfs_count_unlink(&cnt, dentry->d_name.len); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) return ret; @@ -718,6 +724,7 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry, struct scoutfs_key_buf key; struct inode *inode = NULL; SCOUTFS_DECLARE_KVEC(val); + DECLARE_ITEM_COUNT(cnt); int ret; /* path_max includes null as does our value for nd_set_link */ @@ -728,7 +735,8 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry, if (ret) return ret; - ret = scoutfs_hold_trans(sb); + scoutfs_count_symlink(&cnt, dentry->d_name.len, name_len); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) return ret; diff --git a/kmod/src/format.h b/kmod/src/format.h index dd991c2d..58adf6ba 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -160,7 +160,7 @@ struct scoutfs_segment_block { #define SCOUTFS_ORPHAN_KEY 10 #define SCOUTFS_FREE_EXTENT_BLKNO_KEY 11 #define SCOUTFS_FREE_EXTENT_BLOCKS_KEY 12 -#define SCOUTFS_INODE_INDEX_CTIME_KEY 13 +#define SCOUTFS_INODE_INDEX_CTIME_KEY 13 /* don't forget first and last */ #define SCOUTFS_INODE_INDEX_MTIME_KEY 14 #define SCOUTFS_INODE_INDEX_SIZE_KEY 15 #define SCOUTFS_INODE_INDEX_META_SEQ_KEY 16 @@ -170,6 +170,11 @@ struct scoutfs_segment_block { #define SCOUTFS_NET_ADDR_KEY 254 #define SCOUTFS_NET_LISTEN_KEY 255 +#define SCOUTFS_INODE_INDEX_FIRST SCOUTFS_INODE_INDEX_CTIME_KEY +#define SCOUTFS_INODE_INDEX_LAST SCOUTFS_INODE_INDEX_DATA_SEQ_KEY +#define SCOUTFS_INODE_INDEX_NR \ + (SCOUTFS_INODE_INDEX_LAST - SCOUTFS_INODE_INDEX_FIRST + 1) + /* value is struct scoutfs_inode */ struct scoutfs_inode_key { __u8 type; @@ -433,6 +438,9 @@ struct scoutfs_net_manifest_entries { struct scoutfs_manifest_entry ments[0]; } __packed; +/* XXX I dunno, totally made up */ +#define SCOUTFS_BULK_ALLOC_COUNT 32 + struct scoutfs_net_segnos { __le16 nr; __le64 segnos[0]; diff --git a/kmod/src/inode.c b/kmod/src/inode.c index c1a65eef..b5774682 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -574,9 +574,11 @@ void scoutfs_update_inode_item(struct inode *inode) void scoutfs_dirty_inode(struct inode *inode, int flags) { struct super_block *sb = inode->i_sb; + DECLARE_ITEM_COUNT(cnt); int ret; - ret = scoutfs_hold_trans(sb); + scoutfs_count_dirty_inode(&cnt); + ret = scoutfs_hold_trans(sb, &cnt); if (ret == 0) { ret = scoutfs_dirty_inode_item(inode); if (ret == 0) @@ -777,12 +779,15 @@ static int remove_orphan_item(struct super_block *sb, u64 ino) static int __delete_inode(struct super_block *sb, struct scoutfs_key_buf *key, u64 ino, umode_t mode) { + DECLARE_ITEM_COUNT(cnt); bool release = false; int ret; trace_delete_inode(sb, ino, mode); - ret = scoutfs_hold_trans(sb); + /* XXX this is obviously not done yet :) */ + scoutfs_count_dirty_inode(&cnt); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) goto out; release = true; diff --git a/kmod/src/ioctl.c b/kmod/src/ioctl.c index e5109853..e4cdeb7d 100644 --- a/kmod/src/ioctl.c +++ b/kmod/src/ioctl.c @@ -27,7 +27,6 @@ #include "ioctl.h" #include "super.h" #include "inode.h" -#include "trans.h" #include "item.h" #include "data.h" #include "net.h" @@ -307,13 +306,8 @@ static long scoutfs_ioc_release(struct file *file, unsigned long arg) /* drop all clean and dirty cached blocks in the range */ truncate_inode_pages_range(&inode->i_data, start, end_inc); - ret = scoutfs_hold_trans(sb); - if (ret) - goto out; - ret = scoutfs_data_truncate_items(sb, scoutfs_ino(inode), iblock, len, true); - scoutfs_release_trans(sb); out: mutex_unlock(&inode->i_mutex); mnt_drop_write_file(file); diff --git a/kmod/src/item.c b/kmod/src/item.c index 4acf1abd..ba213f8c 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -293,7 +293,7 @@ static void update_dirty_parents(struct cached_item *item) scoutfs_item_rb_propagate(rb_parent(&item->node), NULL); } -static void mark_item_dirty(struct item_cache *cac, +static void mark_item_dirty(struct super_block *sb, struct item_cache *cac, struct cached_item *item) { if (WARN_ON_ONCE(RB_EMPTY_NODE(&item->node))) @@ -307,10 +307,13 @@ static void mark_item_dirty(struct item_cache *cac, cac->dirty_key_bytes += item->key->key_len; cac->dirty_val_bytes += scoutfs_kvec_length(item->val); + scoutfs_trans_track_item(sb, 1, item->key->key_len, + scoutfs_kvec_length(item->val)); + update_dirty_parents(item); } -static void clear_item_dirty(struct item_cache *cac, +static void clear_item_dirty(struct super_block *sb, struct item_cache *cac, struct cached_item *item) { if (WARN_ON_ONCE(RB_EMPTY_NODE(&item->node))) @@ -324,6 +327,9 @@ static void clear_item_dirty(struct item_cache *cac, cac->dirty_key_bytes -= item->key->key_len; cac->dirty_val_bytes -= scoutfs_kvec_length(item->val); + scoutfs_trans_track_item(sb, -1, -item->key->key_len, + -scoutfs_kvec_length(item->val)); + WARN_ON_ONCE(cac->nr_dirty_items < 0 || cac->dirty_key_bytes < 0 || cac->dirty_val_bytes < 0); @@ -339,7 +345,7 @@ static void erase_item(struct super_block *sb, struct item_cache *cac, { trace_printk("erasing item %p\n", item); - clear_item_dirty(cac, item); + clear_item_dirty(sb, cac, item); rb_erase_augmented(&item->node, &cac->items, &scoutfs_item_rb_cb); free_item(sb, item); } @@ -354,11 +360,11 @@ static void become_deletion_item(struct super_block *sb, struct cached_item *item, struct kvec *del_val) { - clear_item_dirty(cac, item); + clear_item_dirty(sb, cac, item); scoutfs_kvec_clone(del_val, item->val); scoutfs_kvec_init_null(item->val); item->deletion = 1; - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); scoutfs_inc_counter(sb, item_delete); } @@ -905,7 +911,7 @@ int scoutfs_item_create(struct super_block *sb, struct scoutfs_key_buf *key, ret = insert_item(sb, cac, item, false); if (!ret) { scoutfs_inc_counter(sb, item_create); - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); } spin_unlock_irqrestore(&cac->lock, flags); @@ -950,7 +956,7 @@ int scoutfs_item_create_ephemeral(struct super_block *sb, BUG_ON(ret); scoutfs_inc_counter(sb, item_create_ephemeral); - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); spin_unlock_irqrestore(&cac->lock, flags); @@ -975,9 +981,9 @@ void scoutfs_item_update_ephemeral(struct super_block *sb, if (item && item->ephemeral) { trace_printk("updating ephemeral item %p\n", item); scoutfs_inc_counter(sb, item_update_ephemeral); - clear_item_dirty(cac, item); + clear_item_dirty(sb, cac, item); scoutfs_kvec_clone(item->val, val); - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); } spin_unlock_irqrestore(&cac->lock, flags); @@ -1173,7 +1179,7 @@ int scoutfs_item_set_batch(struct super_block *sb, struct list_head *list, list_for_each_entry_safe(item, tmp, list, entry) { list_del_init(&item->entry); insert_item(sb, cac, item, true); - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); } ret = 0; @@ -1220,7 +1226,7 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key_buf *key) item = find_item(sb, &cac->items, key); if (item) { - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); ret = 0; } else if (check_range(sb, &cac->ranges, key, end)) { ret = -ENOENT; @@ -1275,9 +1281,9 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key_buf *key, item = find_item(sb, &cac->items, key); if (item) { - clear_item_dirty(cac, item); + clear_item_dirty(sb, cac, item); scoutfs_kvec_swap(up_val, item->val); - mark_item_dirty(cac, item); + mark_item_dirty(sb, cac, item); ret = 0; } else if (check_range(sb, &cac->ranges, key, end)) { ret = -ENOENT; @@ -1612,7 +1618,7 @@ int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg) key_bytes -= item->key->key_len; - clear_item_dirty(cac, item); + clear_item_dirty(sb, cac, item); del = item; item = next_dirty(item); diff --git a/kmod/src/net.c b/kmod/src/net.c index 1d2662c2..d7b16b8a 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -368,9 +368,6 @@ static struct send_buf *alloc_sbuf(unsigned data_len) return sbuf; } -/* XXX I dunno, totally made up */ -#define BULK_COUNT 32 - static struct send_buf *process_bulk_alloc(struct super_block *sb,void *req, int req_len) { @@ -386,16 +383,16 @@ static struct send_buf *process_bulk_alloc(struct super_block *sb,void *req, return ERR_PTR(-EINVAL); sbuf = alloc_sbuf(offsetof(struct scoutfs_net_segnos, - segnos[BULK_COUNT])); + segnos[SCOUTFS_BULK_ALLOC_COUNT])); if (!sbuf) return ERR_PTR(-ENOMEM); ns = (void *)sbuf->nh->data; - ns->nr = cpu_to_le16(BULK_COUNT); + ns->nr = cpu_to_le16(SCOUTFS_BULK_ALLOC_COUNT); down_read(&nti->ring_commit_rwsem); - for (i = 0; i < BULK_COUNT; i++) { + for (i = 0; i < SCOUTFS_BULK_ALLOC_COUNT; i++) { ret = scoutfs_alloc_segno(sb, &segno); if (ret) { while (i-- > 0) diff --git a/kmod/src/super.c b/kmod/src/super.c index aa2870dc..90367575 100644 --- a/kmod/src/super.c +++ b/kmod/src/super.c @@ -211,7 +211,6 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) get_random_bytes_arch(&sbi->node_id, sizeof(sbi->node_id)); spin_lock_init(&sbi->next_ino_lock); - atomic_set(&sbi->trans_holds, 0); init_waitqueue_head(&sbi->trans_hold_wq); spin_lock_init(&sbi->trans_write_lock); INIT_DELAYED_WORK(&sbi->trans_write_work, scoutfs_trans_write_func); diff --git a/kmod/src/super.h b/kmod/src/super.h index 39cc354a..350dbca9 100644 --- a/kmod/src/super.h +++ b/kmod/src/super.h @@ -12,6 +12,7 @@ struct manifest; struct segment_cache; struct compact_info; struct data_info; +struct trans_info; struct lock_info; struct net_info; struct inode_sb_info; @@ -33,7 +34,6 @@ struct scoutfs_sb_info { struct data_info *data_info; struct inode_sb_info *inode_sb_info; - atomic_t trans_holds; wait_queue_head_t trans_hold_wq; struct task_struct *trans_task; @@ -46,6 +46,7 @@ struct scoutfs_sb_info { struct workqueue_struct *trans_write_workq; bool trans_deadline_expired; + struct trans_info *trans_info; struct lock_info *lock_info; struct net_info *net_info; diff --git a/kmod/src/trans.c b/kmod/src/trans.c index c55da90b..4c41de88 100644 --- a/kmod/src/trans.c +++ b/kmod/src/trans.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "super.h" #include "trans.h" @@ -53,6 +54,33 @@ /* sync dirty data at least this often */ #define TRANS_SYNC_DELAY (HZ * 10) +/* + * XXX move the rest of the super trans_ fields here. + */ +struct trans_info { + spinlock_t lock; + unsigned reserved_items; + unsigned reserved_keys; + unsigned reserved_vals; + unsigned holders; + bool writing; +}; + +#define DECLARE_TRANS_INFO(sb, name) \ + struct trans_info *name = SCOUTFS_SB(sb)->trans_info + +static bool drained_holders(struct trans_info *tri) +{ + bool drained; + + spin_lock(&tri->lock); + tri->writing = true; + drained = tri->holders == 0; + spin_unlock(&tri->lock); + + return drained; +} + /* * This work func is responsible for writing out all the dirty blocks * that make up the current dirty transaction. It prevents writers from @@ -82,6 +110,7 @@ void scoutfs_trans_write_func(struct work_struct *work) struct scoutfs_sb_info *sbi = container_of(work, struct scoutfs_sb_info, trans_write_work.work); struct super_block *sb = sbi->sb; + DECLARE_TRANS_INFO(sb, tri); struct scoutfs_bio_completion comp; struct scoutfs_segment *seg; u64 segno; @@ -90,8 +119,7 @@ void scoutfs_trans_write_func(struct work_struct *work) scoutfs_bio_init_comp(&comp); sbi->trans_task = current; - wait_event(sbi->trans_hold_wq, - atomic_cmpxchg(&sbi->trans_holds, 0, -1) == 0); + wait_event(sbi->trans_hold_wq, drained_holders(tri)); trace_printk("items dirty %d\n", scoutfs_item_has_dirty(sb)); @@ -108,7 +136,8 @@ void scoutfs_trans_write_func(struct work_struct *work) scoutfs_seg_submit_write(sb, seg, &comp) ?: scoutfs_inode_walk_writeback(sb, false) ?: scoutfs_bio_wait_comp(sb, &comp) ?: - scoutfs_net_record_segment(sb, seg, 0); + scoutfs_net_record_segment(sb, seg, 0) ?: + scoutfs_net_advance_seq(sb, &sbi->trans_seq); if (ret) goto out; @@ -135,7 +164,10 @@ out: spin_unlock(&sbi->trans_write_lock); wake_up(&sbi->trans_write_wq); - atomic_set(&sbi->trans_holds, 0); + spin_lock(&tri->lock); + tri->writing = false; + spin_unlock(&tri->lock); + wake_up(&sbi->trans_hold_wq); sbi->trans_task = NULL; @@ -226,99 +258,184 @@ void scoutfs_trans_restart_sync_deadline(struct super_block *sb) } /* - * The holder that creates the most dirty item data is adding a full - * size xattr. The largest xattr can have a 255 byte name and 64KB - * value. - * - * XXX Assuming the worst case here too aggressively limits the number - * of concurrent holders that can work without being blocked when they - * know they'll dirty much less. We may want to have callers pass in - * their item, key, and val budgets if that's not too fragile. + * Each thread reserves space in the segment for their dirty items while + * they hold the transaction. This is calculated before the first + * transaction hold is acquired. It includes all the potential nested + * item manipulation that could happen with the transaction held. + * Including nested holds avoids having to deal with writing out partial + * transactions while a caller still holds the transaction. */ -#define HOLD_WORST_ITEMS \ - SCOUTFS_XATTR_MAX_PARTS - -#define HOLD_WORST_KEYS \ - (SCOUTFS_XATTR_MAX_PARTS * \ - (sizeof(struct scoutfs_xattr_key) + \ - SCOUTFS_XATTR_MAX_NAME_LEN + \ - sizeof(struct scoutfs_xattr_key_footer))) - -#define HOLD_WORST_VALS \ - (sizeof(struct scoutfs_xattr_val_header) + \ - SCOUTFS_XATTR_MAX_SIZE) +#define SCOUTFS_RESERVATION_MAGIC 0xd57cd13b +struct scoutfs_reservation { + unsigned magic; + unsigned holders; + struct scoutfs_item_count reserved; + struct scoutfs_item_count actual; +}; /* - * We're able to hold the transaction if the current dirty item bytes - * and the presumed worst case item dirtying of all the holders, - * including us, all fit in a segment. + * Try to hold the transaction. If a caller already holds the trans then + * we piggy back on their hold. We wait if the writer is trying to + * write out the transation. And if our items won't fit then we kick off + * a write. */ -static bool hold_acquired(struct super_block *sb) +static bool acquired_hold(struct super_block *sb, + struct scoutfs_reservation *rsv, + struct scoutfs_item_count *cnt) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - int with_us; - int holds; - int before; - u32 items; - u32 keys; - u32 vals; + DECLARE_TRANS_INFO(sb, tri); + bool acquired = false; + unsigned items; + unsigned keys; + unsigned vals; + bool fits; - holds = atomic_read(&sbi->trans_holds); - for (;;) { - /* transaction is being committed */ - if (holds < 0) - return false; + spin_lock(&tri->lock); -#if 0 /* XXX where will we do this in the shared universe? */ - /* only hold when there's no level 0 segments, XXX for now */ - if (scoutfs_manifest_level_count(sb, 0) > 0) { - scoutfs_compact_kick(sb); - return false; - } -#endif + trace_printk("cnt %u.%u.%u, rsv %p holders %u reserved %u.%u.%u actual %d.%d.%d, trans holders %u writing %u reserved %u.%u.%u\n", + cnt->items, cnt->keys, cnt->vals, rsv, rsv->holders, + rsv->reserved.items, rsv->reserved.keys, + rsv->reserved.vals, rsv->actual.items, rsv->actual.keys, + rsv->actual.vals, tri->holders, tri->writing, + tri->reserved_items, tri->reserved_keys, + tri->reserved_vals); - /* see if we all would fill the segment */ - with_us = holds + 1; - items = with_us * HOLD_WORST_ITEMS; - keys = with_us * HOLD_WORST_KEYS; - vals = with_us * HOLD_WORST_VALS; - if (!scoutfs_item_dirty_fits_single(sb, items, keys, vals)) { - scoutfs_sync_fs(sb, 0); - return false; - } + /* use a caller's existing reservation */ + if (rsv->holders) + goto hold; - before = atomic_cmpxchg(&sbi->trans_holds, holds, with_us); - if (before == holds) - return true; - holds = before; + /* wait until the writing thread is finished */ + if (tri->writing) + goto out; + + /* see if we can reserve space for our item count */ + items = tri->reserved_items + cnt->items; + keys = tri->reserved_keys + cnt->keys; + vals = tri->reserved_vals + cnt->vals; + fits = scoutfs_item_dirty_fits_single(sb, items, keys, vals); + if (!fits) { + queue_trans_work(sbi); + goto out; } + + tri->reserved_items = items; + tri->reserved_keys = keys; + tri->reserved_vals = vals; + + rsv->reserved.items = cnt->items; + rsv->reserved.keys = cnt->keys; + rsv->reserved.vals = cnt->vals; + +hold: + rsv->holders++; + tri->holders++; + acquired = true; + +out: + + spin_unlock(&tri->lock); + + return acquired; } -int scoutfs_hold_trans(struct super_block *sb) +int scoutfs_hold_trans(struct super_block *sb, struct scoutfs_item_count *cnt) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_reservation *rsv; + int ret; if (current == sbi->trans_task) return 0; - return wait_event_interruptible(sbi->trans_hold_wq, hold_acquired(sb)); + rsv = current->journal_info; + if (rsv == NULL) { + rsv = kzalloc(sizeof(struct scoutfs_reservation), GFP_NOFS); + if (!rsv) + return -ENOMEM; + + rsv->magic = SCOUTFS_RESERVATION_MAGIC; + current->journal_info = rsv; + } + + BUG_ON(rsv->magic != SCOUTFS_RESERVATION_MAGIC); + + ret = wait_event_interruptible(sbi->trans_hold_wq, + acquired_hold(sb, rsv, cnt)); + if (ret && rsv->holders == 0) { + current->journal_info = NULL; + kfree(rsv); + } + return ret; } -/* - * As we release we'll almost certainly have dirtied less than the - * worst case dirty assumption that holders might be throttled waiting - * for. We always try and wake blocked holders in case they now have - * room to dirty. - */ -void scoutfs_release_trans(struct super_block *sb) +void scoutfs_trans_track_item(struct super_block *sb, signed items, + signed keys, signed vals) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_reservation *rsv = current->journal_info; if (current == sbi->trans_task) return; - atomic_dec(&sbi->trans_holds); - wake_up(&sbi->trans_hold_wq); + BUG_ON(!rsv || rsv->magic != SCOUTFS_RESERVATION_MAGIC); + + rsv->actual.items += items; + rsv->actual.keys += keys; + rsv->actual.vals += vals; + + WARN_ON_ONCE(rsv->actual.items > rsv->reserved.items); + WARN_ON_ONCE(rsv->actual.keys > rsv->reserved.keys); + WARN_ON_ONCE(rsv->actual.vals > rsv->reserved.vals); +} + +/* + * As we drop the last hold in the reservation we try and wake other + * hold attempts that were waiting for space. As we drop the last trans + * holder we try to wake a writing thread that was waiting for us to + * finish. + */ +void scoutfs_release_trans(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_reservation *rsv; + DECLARE_TRANS_INFO(sb, tri); + bool wake = false; + + if (current == sbi->trans_task) + return; + + rsv = current->journal_info; + BUG_ON(!rsv || rsv->magic != SCOUTFS_RESERVATION_MAGIC); + + spin_lock(&tri->lock); + + trace_printk("rsv %p holders %u reserved %u.%u.%u actual %d.%d.%d, trans holders %u writing %u reserved %u.%u.%u\n", + rsv, rsv->holders, rsv->reserved.items, + rsv->reserved.keys, rsv->reserved.vals, + rsv->actual.items, rsv->actual.keys, rsv->actual.vals, + tri->holders, tri->writing, tri->reserved_items, + tri->reserved_keys, tri->reserved_vals); + + BUG_ON(rsv->holders <= 0); + BUG_ON(tri->holders <= 0); + + if (--rsv->holders == 0) { + tri->reserved_items -= rsv->reserved.items; + tri->reserved_keys -= rsv->reserved.keys; + tri->reserved_vals -= rsv->reserved.vals; + current->journal_info = NULL; + kfree(rsv); + wake = true; + } + + if (--tri->holders == 0) + wake = true; + + spin_unlock(&tri->lock); + + if (wake) + wake_up(&sbi->trans_hold_wq); } /* @@ -336,10 +453,21 @@ void scoutfs_trans_wake_holders(struct super_block *sb) int scoutfs_setup_trans(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct trans_info *tri; + + tri = kzalloc(sizeof(struct trans_info), GFP_KERNEL); + if (!tri) + return -ENOMEM; + + spin_lock_init(&tri->lock); sbi->trans_write_workq = alloc_workqueue("scoutfs_trans", 0, 1); - if (!sbi->trans_write_workq) + if (!sbi->trans_write_workq) { + kfree(tri); return -ENOMEM; + } + + sbi->trans_info = tri; return 0; } @@ -351,9 +479,12 @@ int scoutfs_setup_trans(struct super_block *sb) void scoutfs_shutdown_trans(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + DECLARE_TRANS_INFO(sb, tri); if (sbi->trans_write_workq) { cancel_delayed_work_sync(&sbi->trans_write_work); destroy_workqueue(sbi->trans_write_workq); } + + kfree(tri); } diff --git a/kmod/src/trans.h b/kmod/src/trans.h index 396ad6be..6f52553e 100644 --- a/kmod/src/trans.h +++ b/kmod/src/trans.h @@ -1,15 +1,20 @@ #ifndef _SCOUTFS_TRANS_H_ #define _SCOUTFS_TRANS_H_ +#include "net.h" +#include "count.h" + void scoutfs_trans_write_func(struct work_struct *work); int scoutfs_sync_fs(struct super_block *sb, int wait); int scoutfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync); void scoutfs_trans_restart_sync_deadline(struct super_block *sb); -int scoutfs_hold_trans(struct super_block *sb); +int scoutfs_hold_trans(struct super_block *sb, struct scoutfs_item_count *cnt); void scoutfs_release_trans(struct super_block *sb); void scoutfs_trans_wake_holders(struct super_block *sb); +void scoutfs_trans_track_item(struct super_block *sb, signed items, + signed keys, signed vals); int scoutfs_setup_trans(struct super_block *sb); void scoutfs_shutdown_trans(struct super_block *sb); diff --git a/kmod/src/xattr.c b/kmod/src/xattr.c index a427763d..2bd066d4 100644 --- a/kmod/src/xattr.c +++ b/kmod/src/xattr.c @@ -262,6 +262,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, struct scoutfs_xattr_val_header vh; size_t name_len = strlen(name); SCOUTFS_DECLARE_KVEC(val); + DECLARE_ITEM_COUNT(cnt); struct scoutfs_lock lck; unsigned int bytes; unsigned int off; @@ -314,7 +315,8 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, else sif = 0; - ret = scoutfs_hold_trans(sb); + scoutfs_count_xattr_set(&cnt, name_len, size); + ret = scoutfs_hold_trans(sb, &cnt); if (ret) goto unlock;