diff --git a/kmod/src/Makefile b/kmod/src/Makefile index d808143b..bffe9ec6 100644 --- a/kmod/src/Makefile +++ b/kmod/src/Makefile @@ -2,6 +2,6 @@ obj-$(CONFIG_SCOUTFS_FS) := scoutfs.o CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include -scoutfs-y += bio.o block.o btree.o buddy.o counters.o crc.o dir.o filerw.o \ - kvec.o inode.o ioctl.o item.o manifest.o msg.o name.o ring.o \ - seg.o scoutfs_trace.o super.o trans.o xattr.o +scoutfs-y += alloc.o bio.o block.o btree.o buddy.o counters.o crc.o dir.o \ + filerw.o kvec.o inode.o ioctl.o item.o manifest.o msg.o name.o \ + ring.o seg.o scoutfs_trace.o super.o trans.o xattr.o diff --git a/kmod/src/alloc.c b/kmod/src/alloc.c new file mode 100644 index 00000000..dbacf288 --- /dev/null +++ b/kmod/src/alloc.c @@ -0,0 +1,334 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "ring.h" +#include "alloc.h" + +/* + * scoutfs allocates segments by storing regions of a bitmap in a radix. + * As the regions are modified their index in the radix is marked dirty + * for writeout. + * + * Frees are tracked in a separate radix. They're only applied to the + * free regions as a transaction is written. The frees can't satisfy + * allocation until they're committed so that we don't overwrite stable + * referenced data. + * + * The allocated segments are large enough to be effectively + * independent. We allocate by sweeping a cursor through the volume. + * This gives racing unlocked readers more time to try to sample a stale + * freed segment, when its safe to do so, before it is reallocated and + * rewritten and they're forced to retry their racey read. + * + * XXX + * - make sure seg fits in long index + * - frees can delete region, leave non-NULL nul behind for logging + */ + +struct seg_alloc { + spinlock_t lock; + struct radix_tree_root regs; + struct radix_tree_root pending; + u64 next_segno; +}; + +#define DECLARE_SEG_ALLOC(sb, name) \ + struct seg_alloc *name = SCOUTFS_SB(sb)->seg_alloc + +enum { + DIRTY_RADIX_TAG = 0, +}; + +int scoutfs_alloc_segno(struct super_block *sb, u64 *segno) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_ring_alloc_region *reg; + DECLARE_SEG_ALLOC(sb, sal); + unsigned long flags; + unsigned long ind; + int ret; + int nr; + + spin_lock_irqsave(&sal->lock, flags); + + /* start by sweeping through the device for the first time */ + if (sal->next_segno == le64_to_cpu(super->alloc_uninit)) { + le64_add_cpu(&super->alloc_uninit, 1); + *segno = sal->next_segno++; + if (sal->next_segno == le64_to_cpu(super->total_segs)) + sal->next_segno = 0; + ret = 0; + goto out; + } + + /* then fall back to the allocator */ + ind = sal->next_segno >> SCOUTFS_ALLOC_REGION_SHIFT; + nr = sal->next_segno & SCOUTFS_ALLOC_REGION_MASK; + + do { + ret = radix_tree_gang_lookup(&sal->regs, (void **)®, ind, 1); + } while (ret == 0 && ind && (ind = 0, nr = 0, 1)); + + if (ret == 0) { + ret = -ENOSPC; + goto out; + } + + nr = find_next_bit_le(reg->bits, SCOUTFS_ALLOC_REGION_BITS, nr); + if (nr >= SCOUTFS_ALLOC_REGION_BITS) { + /* XXX corruption? shouldn't find empty regions */ + ret = -EIO; + goto out; + } + + clear_bit_le(nr, reg->bits); + radix_tree_tag_set(&sal->regs, ind, DIRTY_RADIX_TAG); + + *segno = (ind << SCOUTFS_ALLOC_REGION_SHIFT) + nr; + + /* once this wraps it will never equal alloc_uninit */ + sal->next_segno = *segno + 1; + if (sal->next_segno == le64_to_cpu(super->total_segs)) + sal->next_segno = 0; + + ret = 0; +out: + spin_unlock_irqrestore(&sal->lock, flags); + + trace_printk("segno %llu ret %d\n", *segno, ret); + return ret; +} + +/* + * Record newly freed sgements in pending regions. These can't be + * applied to the main allocator regions until the next commit so that + * they're not still referenced by the stable tree in event of a crash. + * + * The pending regions are merged into dirty regions for the next commit. + */ +int scoutfs_alloc_free(struct super_block *sb, u64 segno) +{ + struct scoutfs_ring_alloc_region *reg; + struct scoutfs_ring_alloc_region *ins; + DECLARE_SEG_ALLOC(sb, sal); + unsigned long flags; + unsigned long ind; + int ret; + int nr; + + ind = segno >> SCOUTFS_ALLOC_REGION_SHIFT; + nr = segno & SCOUTFS_ALLOC_REGION_MASK; + + ins = kzalloc(sizeof(struct scoutfs_ring_alloc_region), GFP_NOFS); + if (!ins) { + ret = -ENOMEM; + goto out; + } + + ins->eh.type = SCOUTFS_RING_ADD_ALLOC; + ins->eh.len = cpu_to_le16(sizeof(struct scoutfs_ring_alloc_region)); + ins->index = cpu_to_le64(ind); + + ret = radix_tree_preload(GFP_NOFS); + if (ret) { + goto out; + } + + spin_lock_irqsave(&sal->lock, flags); + + reg = radix_tree_lookup(&sal->pending, ind); + if (!reg) { + reg = ins; + ins = NULL; + radix_tree_insert(&sal->pending, ind, reg); + } + + set_bit_le(nr, reg->bits); + + spin_unlock_irqrestore(&sal->lock, flags); + radix_tree_preload_end(); +out: + kfree(ins); + trace_printk("freeing segno %llu ind %lu nr %d ret %d\n", + segno, ind, nr, ret); + return ret; +} + +/* + * Add a new clean region from the ring. It can be replacing existing + * clean stale entries during replay as we make our way through the + * ring. + */ +int scoutfs_alloc_add(struct super_block *sb, + struct scoutfs_ring_alloc_region *ins) +{ + struct scoutfs_ring_alloc_region *existing; + struct scoutfs_ring_alloc_region *reg; + DECLARE_SEG_ALLOC(sb, sal); + unsigned long flags; + int ret; + + reg = kmalloc(sizeof(struct scoutfs_ring_alloc_region), GFP_NOFS); + if (!reg) { + ret = -ENOMEM; + goto out; + } + + memcpy(reg, ins, sizeof(struct scoutfs_ring_alloc_region)); + + ret = radix_tree_preload(GFP_NOFS); + if (ret) { + kfree(reg); + goto out; + } + + spin_lock_irqsave(&sal->lock, flags); + + existing = radix_tree_lookup(&sal->regs, le64_to_cpu(reg->index)); + if (existing) + radix_tree_delete(&sal->regs, le64_to_cpu(reg->index)); + radix_tree_insert(&sal->regs, le64_to_cpu(reg->index), reg); + + spin_unlock_irqrestore(&sal->lock, flags); + radix_tree_preload_end(); + + if (existing) + kfree(existing); + + ret = 0; +out: + trace_printk("inserted reg ind %llu ret %d\n", + le64_to_cpu(ins->index), ret); + return ret; +} + +/* + * Append all the dirty alloc regions to the end of the ring. First we + * apply the pending frees to create the final set of dirty regions. + * + * This can't fail and always returns 0. + */ +int scoutfs_alloc_dirty_ring(struct super_block *sb) +{ + struct scoutfs_ring_alloc_region *regs[16]; + struct scoutfs_ring_alloc_region *reg; + DECLARE_SEG_ALLOC(sb, sal); + unsigned long start; + unsigned long ind; + int nr; + int i; + int b; + + /* + * Merge pending free regions into dirty regions. If the dirty + * region doesn't exist we can just move the pending region over. + * If it does we or the pending bits in the region. + */ + start = 0; + do { + nr = radix_tree_gang_lookup(&sal->pending, (void **)regs, + start, ARRAY_SIZE(regs)); + for (i = 0; i < nr; i++) { + ind = le64_to_cpu(regs[i]->index); + + reg = radix_tree_lookup(&sal->regs, ind); + if (!reg) { + radix_tree_insert(&sal->regs, ind, regs[i]); + } else { + for (b = 0; b < ARRAY_SIZE(reg->bits); b++) + reg->bits[i] |= regs[i]->bits[i]; + kfree(regs[i]); + } + + radix_tree_delete(&sal->pending, ind); + radix_tree_tag_set(&sal->regs, ind, DIRTY_RADIX_TAG); + start = ind + 1; + } + } while (nr); + + /* and append all the dirty regions to the ring */ + start = 0; + do { + nr = radix_tree_gang_lookup_tag(&sal->regs, (void **)regs, + start, ARRAY_SIZE(regs), + DIRTY_RADIX_TAG); + for (i = 0; i < nr; i++) { + reg = regs[i]; + ind = le64_to_cpu(reg->index); + + scoutfs_ring_append(sb, ®->eh); + radix_tree_tag_clear(&sal->regs, ind, DIRTY_RADIX_TAG); + start = ind + 1; + } + } while (nr); + + return 0; +} + +int scoutfs_alloc_setup(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct seg_alloc *sal; + + /* bits need to be aligned so hosts can use native bitops */ + BUILD_BUG_ON(offsetof(struct scoutfs_ring_alloc_region, bits) & + (sizeof(long) - 1)); + + sal = kzalloc(sizeof(struct seg_alloc), GFP_KERNEL); + if (!sal) + return -ENOMEM; + sbi->seg_alloc = sal; + + spin_lock_init(&sal->lock); + /* inserts preload with _NOFS */ + INIT_RADIX_TREE(&sal->pending, GFP_ATOMIC); + INIT_RADIX_TREE(&sal->regs, GFP_ATOMIC); + /* XXX read next_segno from super? */ + + return 0; +} + +static void destroy_radix_regs(struct radix_tree_root *radix) +{ + struct scoutfs_ring_alloc_region *regs[16]; + int nr; + int i; + + + do { + nr = radix_tree_gang_lookup(radix, (void **)regs, + 0, ARRAY_SIZE(regs)); + for (i = 0; i < nr; i++) { + radix_tree_delete(radix, le64_to_cpu(regs[i]->index)); + kfree(regs[i]); + } + } while (nr); +} + +void scoutfs_alloc_destroy(struct super_block *sb) +{ + DECLARE_SEG_ALLOC(sb, sal); + + if (sal) { + destroy_radix_regs(&sal->pending); + destroy_radix_regs(&sal->regs); + kfree(sal); + } +} diff --git a/kmod/src/alloc.h b/kmod/src/alloc.h new file mode 100644 index 00000000..4d3d398b --- /dev/null +++ b/kmod/src/alloc.h @@ -0,0 +1,16 @@ +#ifndef _SCOUTFS_ALLOC_H_ +#define _SCOUTFS_ALLOC_H_ + +struct scoutfs_alloc_region; + +int scoutfs_alloc_segno(struct super_block *sb, u64 *segno); +int scoutfs_alloc_free(struct super_block *sb, u64 segno); + +int scoutfs_alloc_add(struct super_block *sb, + struct scoutfs_ring_alloc_region *ins); +int scoutfs_alloc_dirty_ring(struct super_block *sb); + +int scoutfs_alloc_setup(struct super_block *sb); +void scoutfs_alloc_destroy(struct super_block *sb); + +#endif diff --git a/kmod/src/bio.c b/kmod/src/bio.c index fe41a689..119cd13e 100644 --- a/kmod/src/bio.c +++ b/kmod/src/bio.c @@ -131,17 +131,40 @@ void scoutfs_bio_submit(struct super_block *sb, int rw, struct page **pages, dec_end_io(args, 1, ret); } -struct end_io_completion { - struct completion comp; - int err; -}; - -static void end_io_complete(struct super_block *sb, void *data, int err) +void scoutfs_bio_init_comp(struct scoutfs_bio_completion *comp) { - struct end_io_completion *comp = data; + /* this initial pending is dropped by wait */ + atomic_set(&comp->pending, 1); + init_completion(&comp->comp); + comp->err = 0; +} - comp->err = err; - complete(&comp->comp); +static void comp_end_io(struct super_block *sb, void *data, int err) +{ + struct scoutfs_bio_completion *comp = data; + + if (err && !comp->err) + comp->err = err; + + if (atomic_dec_and_test(&comp->pending)) + complete(&comp->comp); +} + +void scoutfs_bio_submit_comp(struct super_block *sb, int rw, + struct page **pages, u64 blkno, + unsigned int nr_blocks, + struct scoutfs_bio_completion *comp) +{ + atomic_inc(&comp->pending); + scoutfs_bio_submit(sb, rw, pages, blkno, nr_blocks, comp_end_io, comp); +} + +int scoutfs_bio_wait_comp(struct super_block *sb, + struct scoutfs_bio_completion *comp) +{ + comp_end_io(sb, comp, 0); + wait_for_completion(&comp->comp); + return comp->err; } /* @@ -152,13 +175,11 @@ static void end_io_complete(struct super_block *sb, void *data, int err) int scoutfs_bio_read(struct super_block *sb, struct page **pages, u64 blkno, unsigned int nr_blocks) { - struct end_io_completion comp; + struct scoutfs_bio_completion comp; - init_completion(&comp.comp); - scoutfs_bio_submit(sb, READ, pages, blkno, nr_blocks, - end_io_complete, &comp); - wait_for_completion(&comp.comp); - return comp.err; + scoutfs_bio_init_comp(&comp); + scoutfs_bio_submit_comp(sb, READ, pages, blkno, nr_blocks, &comp); + return scoutfs_bio_wait_comp(sb, &comp); } /* return pointer to the blk 4k block offset amongst the pages */ diff --git a/kmod/src/bio.h b/kmod/src/bio.h index 094f6038..d2e3390a 100644 --- a/kmod/src/bio.h +++ b/kmod/src/bio.h @@ -9,12 +9,30 @@ * BIO_MAX_PAGES then this would just use a single bio directly. */ +/* + * Track aggregate IO completion for multiple multi-bio submissions. + */ +struct scoutfs_bio_completion { + atomic_t pending; + struct completion comp; + long err; +}; + typedef void (*scoutfs_bio_end_io_t)(struct super_block *sb, void *data, int err); void scoutfs_bio_submit(struct super_block *sb, int rw, struct page **pages, u64 blkno, unsigned int nr_blocks, scoutfs_bio_end_io_t end_io, void *data); + +void scoutfs_bio_init_comp(struct scoutfs_bio_completion *comp); +void scoutfs_bio_submit_comp(struct super_block *sb, int rw, + struct page **pages, u64 blkno, + unsigned int nr_blocks, + struct scoutfs_bio_completion *comp); +int scoutfs_bio_wait_comp(struct super_block *sb, + struct scoutfs_bio_completion *comp); + int scoutfs_bio_read(struct super_block *sb, struct page **pages, u64 blkno, unsigned int nr_blocks); diff --git a/kmod/src/dir.c b/kmod/src/dir.c index 88a78610..f979fec6 100644 --- a/kmod/src/dir.c +++ b/kmod/src/dir.c @@ -27,6 +27,8 @@ #include "trans.h" #include "name.h" #include "xattr.h" +#include "kvec.h" +#include "item.h" /* * Directory entries are stored in entries with offsets calculated from @@ -95,167 +97,39 @@ static unsigned int dentry_type(unsigned int type) return DT_UNKNOWN; } - -/* - * XXX This crc nonsense is a quick hack. We'll want something a - * lot stronger like siphash. - */ -static u32 name_hash(const char *name, unsigned int len, u32 salt) -{ - u32 h = crc32c(salt, name, len) & SCOUTFS_DIRENT_OFF_MASK; - - return max_t(u32, 2, min_t(u32, h, SCOUTFS_DIRENT_LAST_POS)); -} - -static unsigned int dent_bytes(unsigned int name_len) -{ - return sizeof(struct scoutfs_dirent) + name_len; -} - -/* - * Each dirent stores the values that are needed to build the keys of - * the items that are removed on unlink so that we don't to search through - * items on unlink. - */ -struct dentry_info { - u64 lref_counter; - u32 hash; -}; - -static struct kmem_cache *scoutfs_dentry_cachep; - -static void scoutfs_d_release(struct dentry *dentry) -{ - struct dentry_info *di = dentry->d_fsdata; - - if (di) { - kmem_cache_free(scoutfs_dentry_cachep, di); - dentry->d_fsdata = NULL; - } -} - -static const struct dentry_operations scoutfs_dentry_ops = { - .d_release = scoutfs_d_release, -}; - -static struct dentry_info *alloc_dentry_info(struct dentry *dentry) -{ - struct dentry_info *di; - - /* XXX read mb? */ - if (dentry->d_fsdata) - return dentry->d_fsdata; - - di = kmem_cache_zalloc(scoutfs_dentry_cachep, GFP_NOFS); - if (!di) - return ERR_PTR(-ENOMEM); - - spin_lock(&dentry->d_lock); - if (!dentry->d_fsdata) { - dentry->d_fsdata = di; - d_set_d_op(dentry, &scoutfs_dentry_ops); - } - - spin_unlock(&dentry->d_lock); - - if (di != dentry->d_fsdata) - kmem_cache_free(scoutfs_dentry_cachep, di); - - return dentry->d_fsdata; -} - -static void update_dentry_info(struct dentry_info *di, struct scoutfs_key *key, - struct scoutfs_dirent *dent) -{ - di->lref_counter = le64_to_cpu(dent->counter); - di->hash = scoutfs_key_offset(key); -} - -static u64 last_dirent_key_offset(u32 h) -{ - return min_t(u64, (u64)h + SCOUTFS_DIRENT_COLL_NR - 1, - SCOUTFS_DIRENT_LAST_POS); -} - -/* - * Lookup searches for an entry for the given name amongst the entries - * stored in the item at the name's hash. - */ static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry, unsigned int flags) { - struct scoutfs_inode_info *si = SCOUTFS_I(dir); struct super_block *sb = dir->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_dirent *dent = NULL; - struct scoutfs_btree_val val; - struct dentry_info *di; - struct scoutfs_key last; - struct scoutfs_key key; - unsigned int item_len; - unsigned int name_len; + struct scoutfs_dirent_key dkey; + struct scoutfs_dirent dent; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); struct inode *inode; u64 ino = 0; - u32 h = 0; int ret; - di = alloc_dentry_info(dentry); - if (IS_ERR(di)) { - ret = PTR_ERR(di); - goto out; - } - if (dentry->d_name.len > SCOUTFS_NAME_LEN) { ret = -ENAMETOOLONG; goto out; } - item_len = offsetof(struct scoutfs_dirent, name[dentry->d_name.len]); - dent = kmalloc(item_len, GFP_KERNEL); - if (!dent) { - ret = -ENOMEM; - goto out; - } + dkey.type = SCOUTFS_DIRENT_KEY; + dkey.ino = cpu_to_be64(scoutfs_ino(dir)); + scoutfs_kvec_init(key, &dkey, sizeof(dkey), + (void *)dentry->d_name.name, dentry->d_name.len); - h = name_hash(dentry->d_name.name, dentry->d_name.len, si->salt); + scoutfs_kvec_init(val, &dent, sizeof(dent)); - scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, h); - scoutfs_set_key(&last, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, - last_dirent_key_offset(h)); - - scoutfs_btree_init_val(&val, dent, item_len); - - for (;;) { - ret = scoutfs_btree_next(sb, meta, &key, &last, &key, &val); - if (ret < 0) { - if (ret == -ENOENT) - ret = 0; - break; - } - - /* XXX more verification */ - /* XXX corruption */ - if (ret <= sizeof(struct scoutfs_dirent)) { - ret = -EIO; - break; - } - - - name_len = ret - sizeof(struct scoutfs_dirent); - if (scoutfs_names_equal(dentry->d_name.name, dentry->d_name.len, - dent->name, name_len)) { - ino = le64_to_cpu(dent->ino); - update_dentry_info(di, &key, dent); - ret = 0; - break; - } - - scoutfs_inc_key(&key); + ret = scoutfs_item_lookup_exact(sb, key, val, sizeof(dent)); + if (ret == -ENOENT) { + ino = 0; + ret = 0; + } else if (ret == 0) { + ino = le64_to_cpu(dent.ino); } out: - kfree(dent); - if (ret < 0) inode = ERR_PTR(ret); else if (ino == 0) @@ -299,47 +173,48 @@ static int scoutfs_readdir(struct file *file, void *dirent, filldir_t filldir) { struct inode *inode = file_inode(file); struct super_block *sb = inode->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_btree_val val; struct scoutfs_dirent *dent; - struct scoutfs_key key; - struct scoutfs_key last; + struct scoutfs_readdir_key rkey; + struct scoutfs_readdir_key last_rkey; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(last_key); + SCOUTFS_DECLARE_KVEC(val); unsigned int item_len; unsigned int name_len; - u32 pos; + u64 pos; int ret; if (!dir_emit_dots(file, dirent, filldir)) return 0; + rkey.type = SCOUTFS_READDIR_KEY; + rkey.ino = cpu_to_be64(scoutfs_ino(inode)); + /* pos set in each loop */ + scoutfs_kvec_init(key, &rkey, sizeof(rkey)); + + last_rkey.type = SCOUTFS_READDIR_KEY; + last_rkey.ino = cpu_to_be64(scoutfs_ino(inode)); + last_rkey.pos = cpu_to_be64(SCOUTFS_DIRENT_LAST_POS); + scoutfs_kvec_init(last_key, &last_rkey, sizeof(last_rkey)); + item_len = offsetof(struct scoutfs_dirent, name[SCOUTFS_NAME_LEN]); dent = kmalloc(item_len, GFP_KERNEL); if (!dent) return -ENOMEM; - scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_DIRENT_KEY, - file->f_pos); - scoutfs_set_key(&last, scoutfs_ino(inode), SCOUTFS_DIRENT_KEY, - SCOUTFS_DIRENT_LAST_POS); - - scoutfs_btree_init_val(&val, dent, item_len); - for (;;) { - ret = scoutfs_btree_next(sb, meta, &key, &last, &key, &val); + rkey.pos = cpu_to_be64(file->f_pos); + scoutfs_kvec_init(val, dent, item_len); + ret = scoutfs_item_next_same_min(sb, key, last_key, val, + offsetof(struct scoutfs_dirent, name[1])); if (ret < 0) { if (ret == -ENOENT) ret = 0; break; } - /* XXX corruption */ - if (ret <= sizeof(dent)) { - ret = -EIO; - break; - } - name_len = ret - sizeof(struct scoutfs_dirent); - pos = scoutfs_key_offset(&key); + pos = be64_to_cpu(rkey.pos); if (filldir(dirent, dent->name, name_len, pos, le64_to_cpu(dent->ino), dentry_type(dent->type))) { @@ -348,13 +223,13 @@ static int scoutfs_readdir(struct file *file, void *dirent, filldir_t filldir) } file->f_pos = pos + 1; - scoutfs_inc_key(&key); } kfree(dent); return ret; } +#if 0 static void set_lref_key(struct scoutfs_key *key, u64 ino, u64 ctr) { scoutfs_set_key(key, ino, SCOUTFS_LINK_BACKREF_KEY, ctr); @@ -380,66 +255,74 @@ static int update_lref_item(struct super_block *sb, struct scoutfs_key *key, return ret; } +#endif static int add_entry_items(struct inode *dir, struct dentry *dentry, struct inode *inode) { - struct dentry_info *di = dentry->d_fsdata; struct super_block *sb = dir->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_inode_info *si = SCOUTFS_I(dir); - struct scoutfs_btree_val val; + struct scoutfs_dirent_key dkey; struct scoutfs_dirent dent; - struct scoutfs_key first; - struct scoutfs_key last; - struct scoutfs_key key; - struct scoutfs_key lref_key; - int bytes; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); int ret; - u64 h; - - /* caller should have allocated the dentry info */ - if (WARN_ON_ONCE(di == NULL)) - return -EINVAL; if (dentry->d_name.len > SCOUTFS_NAME_LEN) return -ENAMETOOLONG; ret = scoutfs_dirty_inode_item(dir); if (ret) - goto out; + return ret; - bytes = dent_bytes(dentry->d_name.len); - h = name_hash(dentry->d_name.name, dentry->d_name.len, si->salt); - scoutfs_set_key(&first, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, h); - scoutfs_set_key(&last, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, - last_dirent_key_offset(h)); - - ret = scoutfs_btree_hole(sb, meta, &first, &last, &key); - if (ret) - goto out; - - set_lref_key(&lref_key, scoutfs_ino(inode), - atomic64_inc_return(&SCOUTFS_I(inode)->link_counter)); - ret = update_lref_item(sb, &lref_key, scoutfs_ino(dir), - scoutfs_key_offset(&key), false); - if (ret) - goto out; + /* dirent item for lookup */ + dkey.type = SCOUTFS_DIRENT_KEY; + dkey.ino = cpu_to_be64(scoutfs_ino(dir)); + scoutfs_kvec_init(key, &dkey, sizeof(dkey), + (void *)dentry->d_name.name, dentry->d_name.len); dent.ino = cpu_to_le64(scoutfs_ino(inode)); - dent.counter = lref_key.offset; dent.type = mode_to_type(inode->i_mode); + scoutfs_kvec_init(val, &dent, sizeof(dent)); - scoutfs_btree_init_val(&val, &dent, sizeof(dent), - (void *)dentry->d_name.name, - dentry->d_name.len); - - ret = scoutfs_btree_insert(sb, meta, &key, &val); + ret = scoutfs_item_create(sb, key, val); if (ret) - scoutfs_btree_delete(sb, meta, &lref_key); - else - update_dentry_info(di, &key, &dent); -out: + return ret; + +#if 0 + struct scoutfs_inode_info *si = SCOUTFS_I(dir); + + /* readdir item for .. readdir */ + si->readdir_pos++; + rkey.type = SCOUTFS_READDIR_KEY; + rkey.ino = cpu_to_le64(scoutfs_ino(dir)); + rkey.pos = cpu_to_le64(si->readdir_pos); + scoutfs_kvec_init(key, &rkey, sizeof(rkey)); + + scoutfs_kvec_init(val, &dent, sizeof(dent), + dentry->d_name.name, dentry->d_name.len); + + ret = scoutfs_item_create(sb, key, val); + if (ret) + goto out_dent; + + /* backref item for inode to path resolution */ + lrkey.type = SCOUTFS_LINK_BACKREF_KEY; + lrey.ino = cpu_to_le64(scoutfs_ino(inode)); + lrey.dir = cpu_to_le64(scoutfs_ino(dir)); + scoutfs_kvec_init(key, &lrkey, sizeof(lrkey), + dentry->d_name.name, dentry->d_name.len); + + ret = scoutfs_item_create(sb, key, NULL); + if (ret) { + scoutfs_kvec_init(key, &rkey, sizeof(rkey)); + scoutfs_item_delete(sb, key); +out_dent: + scoutfs_kvec_init(key, &dkey, sizeof(dkey), + dentry->d_name.name, dentry->d_name.len); + scoutfs_item_delete(sb, key); + } +#endif + return ret; } @@ -448,13 +331,8 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, { struct super_block *sb = dir->i_sb; struct inode *inode; - struct dentry_info *di; int ret; - di = alloc_dentry_info(dentry); - if (IS_ERR(di)) - return PTR_ERR(di); - ret = scoutfs_hold_trans(sb); if (ret) return ret; @@ -508,16 +386,11 @@ static int scoutfs_link(struct dentry *old_dentry, { struct inode *inode = old_dentry->d_inode; struct super_block *sb = dir->i_sb; - struct dentry_info *di; int ret; if (inode->i_nlink >= SCOUTFS_LINK_MAX) return -EMLINK; - di = alloc_dentry_info(dentry); - if (IS_ERR(di)) - return PTR_ERR(di); - ret = scoutfs_hold_trans(sb); if (ret) return ret; @@ -548,17 +421,14 @@ out: static int scoutfs_unlink(struct inode *dir, struct dentry *dentry) { struct super_block *sb = dir->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); struct inode *inode = dentry->d_inode; struct timespec ts = current_kernel_time(); - struct dentry_info *di; - struct scoutfs_key key; - struct scoutfs_key lref_key; + struct scoutfs_dirent_key dkey; + SCOUTFS_DECLARE_KVEC(key); int ret = 0; - if (WARN_ON_ONCE(!dentry->d_fsdata)) - return -EINVAL; - di = dentry->d_fsdata; + /* will need to add deletion items */ + return -EINVAL; if (S_ISDIR(inode->i_mode) && i_size_read(inode)) return -ENOTEMPTY; @@ -567,17 +437,18 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry) if (ret) return ret; - set_lref_key(&lref_key, scoutfs_ino(inode), di->lref_counter); - scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, di->hash); - - /* - * Dirty most of the metadata up front so that later btree - * operations can't fail. - */ ret = scoutfs_dirty_inode_item(dir) ?: - scoutfs_dirty_inode_item(inode) ?: - scoutfs_btree_dirty(sb, meta, &lref_key) ?: - scoutfs_btree_dirty(sb, meta, &key); + scoutfs_dirty_inode_item(inode); + if (ret) + goto out; + + /* XXX same items as add_entry_items */ + dkey.type = SCOUTFS_DIRENT_KEY; + dkey.ino = cpu_to_be64(scoutfs_ino(dir)); + scoutfs_kvec_init(key, &dkey, sizeof(dkey), + (void *)dentry->d_name.name, dentry->d_name.len); + + ret = scoutfs_item_delete(sb, key); if (ret) goto out; @@ -593,10 +464,6 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry) goto out; } - /* XXX: In thoery this can't fail but we should trap errors anyway */ - scoutfs_btree_delete(sb, meta, &key); - scoutfs_btree_delete(sb, meta, &lref_key); - dir->i_ctime = ts; dir->i_mtime = ts; i_size_write(dir, i_size_read(dir) - dentry->d_name.len); @@ -637,6 +504,9 @@ static void *scoutfs_follow_link(struct dentry *dentry, struct nameidata *nd) int ret; int k; + /* update for kvec items */ + return ERR_PTR(-EINVAL); + /* XXX corruption */ if (size == 0 || size > SCOUTFS_SYMLINK_MAX_SIZE) return ERR_PTR(-EIO); @@ -712,21 +582,19 @@ static int scoutfs_symlink(struct inode *dir, struct dentry *dentry, struct scoutfs_btree_val val; struct inode *inode = NULL; struct scoutfs_key key; - struct dentry_info *di; const int name_len = strlen(symname) + 1; int off; int bytes; int ret; int k = 0; + /* update for kvec items */ + return -EINVAL; + /* path_max includes null as does our value for nd_set_link */ if (name_len > PATH_MAX || name_len > SCOUTFS_SYMLINK_MAX_SIZE) return -ENAMETOOLONG; - di = alloc_dentry_info(dentry); - if (IS_ERR(di)) - return PTR_ERR(di); - ret = scoutfs_hold_trans(sb); if (ret) return ret; @@ -961,6 +829,9 @@ int scoutfs_dir_get_ino_path(struct super_block *sb, u64 ino, u64 *ctr, int ret; int nr; + /* update for kvec items */ + return -EINVAL; + if (*ctr == U64_MAX) return 0; @@ -1017,22 +888,3 @@ const struct inode_operations scoutfs_dir_iops = { .removexattr = scoutfs_removexattr, .symlink = scoutfs_symlink, }; - -void scoutfs_dir_exit(void) -{ - if (scoutfs_dentry_cachep) { - kmem_cache_destroy(scoutfs_dentry_cachep); - scoutfs_dentry_cachep = NULL; - } -} - -int scoutfs_dir_init(void) -{ - scoutfs_dentry_cachep = kmem_cache_create("scoutfs_dentry_info", - sizeof(struct dentry_info), 0, - SLAB_RECLAIM_ACCOUNT, NULL); - if (!scoutfs_dentry_cachep) - return -ENOMEM; - - return 0; -} diff --git a/kmod/src/dir.h b/kmod/src/dir.h index 4953af9e..2327518b 100644 --- a/kmod/src/dir.h +++ b/kmod/src/dir.h @@ -7,9 +7,6 @@ extern const struct file_operations scoutfs_dir_fops; extern const struct inode_operations scoutfs_dir_iops; extern const struct inode_operations scoutfs_symlink_iops; -int scoutfs_dir_init(void); -void scoutfs_dir_exit(void); - struct scoutfs_path_component { struct list_head head; unsigned int len; diff --git a/kmod/src/format.h b/kmod/src/format.h index ff1b69dd..2f126610 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -56,7 +56,8 @@ struct scoutfs_ring_entry_header { __le16 len; } __packed; -#define SCOUTFS_RING_ADD_MANIFEST 1 +#define SCOUTFS_RING_ADD_MANIFEST 1 +#define SCOUTFS_RING_ADD_ALLOC 2 struct scoutfs_ring_add_manifest { struct scoutfs_ring_entry_header eh; @@ -68,26 +69,55 @@ struct scoutfs_ring_add_manifest { /* first and last key bytes */ } __packed; +#define SCOUTFS_ALLOC_REGION_SHIFT 8 +#define SCOUTFS_ALLOC_REGION_BITS (1 << SCOUTFS_ALLOC_REGION_SHIFT) +#define SCOUTFS_ALLOC_REGION_MASK (SCOUTFS_ALLOC_REGION_BITS - 1) + +/* + * The bits need to be aligned so that the host can use native long + * bitops on the bits in memory. + */ +struct scoutfs_ring_alloc_region { + struct scoutfs_ring_entry_header eh; + __le64 index; + __u8 pad[5]; + __le64 bits[SCOUTFS_ALLOC_REGION_BITS / 64]; +} __packed; + /* * This is absurdly huge. If there was only ever 1 item per segment and * 2^64 items the tree could get this deep. */ #define SCOUTFS_MANIFEST_MAX_LEVEL 20 +/* + * The packed entries in the block are terminated by a header with a 0 length. + */ struct scoutfs_ring_block { struct scoutfs_block_header hdr; - __le32 nr_entries; struct scoutfs_ring_entry_header entries[0]; } __packed; +/* + * We really want these to be a power of two size so that they're naturally + * aligned. This ensures that they won't cross page boundaries and we + * can use pointers to them in the page vecs that make up segments without + * funny business. + * + * We limit segment sizes to 8 megs (23 bits) and value lengths to 512 bytes + * (9 bits). The item offsets and lengths then take up 64 bits. + * + * We then operate on the items in on-stack nice native structs. + */ struct scoutfs_segment_item { __le64 seq; - __le32 key_off; - __le32 val_off; - __le16 key_len; - __le16 val_len; + __le32 key_off_len; + __le32 val_off_len; } __packed; +#define SCOUTFS_SEGMENT_ITEM_OFF_SHIFT 9 +#define SCOUTFS_SEGMENT_ITEM_LEN_MASK ((1 << SCOUTFS_SEGMENT_ITEM_OFF_SHIFT)-1) + /* * Each large segment starts with a segment block that describes the * rest of the blocks that make up the segment. @@ -98,20 +128,12 @@ struct scoutfs_segment_block { __le64 segno; __le64 max_seq; __le32 nr_items; - /* item array with gaps so they don't cross 4k blocks */ + __le32 _moar_pads; + struct scoutfs_segment_item items[0]; /* packed keys */ /* packed vals */ } __packed; -/* the first block in the segment has the header and items */ -#define SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS \ - ((SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_segment_block)) / \ - sizeof(struct scoutfs_segment_item)) - -/* the rest of the header blocks are full of items */ -#define SCOUTFS_SEGMENT_ITEMS_PER_BLOCK \ - (SCOUTFS_BLOCK_SIZE / sizeof(struct scoutfs_segment_item)) - /* * Block references include the sequence number so that we can detect * readers racing with writers and so that we can tell that we don't @@ -186,18 +208,34 @@ struct scoutfs_key { #define SCOUTFS_XATTR_NAME_HASH_KEY 3 #define SCOUTFS_XATTR_VAL_HASH_KEY 4 #define SCOUTFS_DIRENT_KEY 5 -#define SCOUTFS_LINK_BACKREF_KEY 6 -#define SCOUTFS_SYMLINK_KEY 7 -#define SCOUTFS_EXTENT_KEY 8 -#define SCOUTFS_ORPHAN_KEY 9 +#define SCOUTFS_READDIR_KEY 6 +#define SCOUTFS_LINK_BACKREF_KEY 7 +#define SCOUTFS_SYMLINK_KEY 8 +#define SCOUTFS_EXTENT_KEY 9 +#define SCOUTFS_ORPHAN_KEY 10 #define SCOUTFS_MAX_ITEM_LEN 512 +/* value is struct scoutfs_inode */ struct scoutfs_inode_key { __u8 type; __be64 ino; } __packed; +/* value is struct scoutfs_dirent without the name */ +struct scoutfs_dirent_key { + __u8 type; + __be64 ino; + __u8 name[0]; +} __packed; + +/* value is struct scoutfs_dirent with the name */ +struct scoutfs_readdir_key { + __u8 type; + __be64 ino; + __be64 pos; +} __packed; + struct scoutfs_btree_root { u8 height; struct scoutfs_block_ref ref; @@ -270,6 +308,8 @@ struct scoutfs_super_block { __le64 id; __u8 uuid[SCOUTFS_UUID_BYTES]; __le64 next_ino; + __le64 alloc_uninit; + __le64 total_segs; __le64 total_blocks; __le64 free_blocks; __le64 ring_blkno; diff --git a/kmod/src/inode.c b/kmod/src/inode.c index 0ccfe006..c34babf7 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -27,7 +27,6 @@ #include "scoutfs_trace.h" #include "xattr.h" #include "trans.h" -#include "btree.h" #include "msg.h" #include "kvec.h" #include "item.h" @@ -269,13 +268,17 @@ static void store_inode(struct scoutfs_inode *cinode, struct inode *inode) int scoutfs_dirty_inode_item(struct inode *inode) { struct super_block *sb = inode->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_key key; + struct scoutfs_inode_key ikey; + struct scoutfs_inode sinode; + SCOUTFS_DECLARE_KVEC(key); int ret; - scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0); + store_inode(&sinode, inode); - ret = scoutfs_btree_dirty(sb, meta, &key); + set_inode_key(&ikey, scoutfs_ino(inode)); + scoutfs_kvec_init(key, &ikey, sizeof(ikey)); + + ret = scoutfs_item_dirty(sb, key); if (!ret) trace_scoutfs_dirty_inode(inode); return ret; @@ -283,8 +286,8 @@ int scoutfs_dirty_inode_item(struct inode *inode) /* * Every time we modify the inode in memory we copy it to its inode - * item. This lets us write out blocks of items without having to track - * down dirty vfs inodes and safely copy them into items before writing. + * item. This lets us write out items without having to track down + * dirty vfs inodes. * * The caller makes sure that the item is dirty and pinned so they don't * have to deal with errors and unwinding after they've modified the @@ -293,17 +296,19 @@ int scoutfs_dirty_inode_item(struct inode *inode) void scoutfs_update_inode_item(struct inode *inode) { struct super_block *sb = inode->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_btree_val val; + struct scoutfs_inode_key ikey; struct scoutfs_inode sinode; - struct scoutfs_key key; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); int err; - scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0); - scoutfs_btree_init_val(&val, &sinode, sizeof(sinode)); store_inode(&sinode, inode); - err = scoutfs_btree_update(sb, meta, &key, &val); + set_inode_key(&ikey, scoutfs_ino(inode)); + scoutfs_kvec_init(key, &ikey, sizeof(ikey)); + scoutfs_kvec_init(val, &sinode, sizeof(sinode)); + + err = scoutfs_item_update(sb, key, val); BUG_ON(err); trace_scoutfs_update_inode(inode); @@ -381,11 +386,11 @@ static int alloc_ino(struct super_block *sb, u64 *ino) struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir, umode_t mode, dev_t rdev) { - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); struct scoutfs_inode_info *ci; - struct scoutfs_btree_val val; + struct scoutfs_inode_key ikey; struct scoutfs_inode sinode; - struct scoutfs_key key; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); struct inode *inode; u64 ino; int ret; @@ -413,11 +418,12 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir, inode->i_rdev = rdev; set_inode_ops(inode); - scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0); - scoutfs_btree_init_val(&val, &sinode, sizeof(sinode)); store_inode(&sinode, inode); + set_inode_key(&ikey, scoutfs_ino(inode)); + scoutfs_kvec_init(key, &ikey, sizeof(ikey)); + scoutfs_kvec_init(val, &sinode, sizeof(sinode)); - ret = scoutfs_btree_insert(inode->i_sb, meta, &key, &val); + ret = scoutfs_item_create(sb, key, val); if (ret) { iput(inode); return ERR_PTR(ret); diff --git a/kmod/src/item.c b/kmod/src/item.c index f65388ec..df2d187a 100644 --- a/kmod/src/item.c +++ b/kmod/src/item.c @@ -14,20 +14,31 @@ #include #include #include +#include #include "super.h" #include "format.h" #include "kvec.h" #include "manifest.h" #include "item.h" +#include "seg.h" struct item_cache { spinlock_t lock; struct rb_root root; + + unsigned long nr_dirty_items; + unsigned long dirty_key_bytes; + unsigned long dirty_val_bytes; }; +/* + * The dirty bits track if the given item is dirty and if its child + * subtrees contain any dirty items. + */ struct cached_item { struct rb_node node; + long dirty; SCOUTFS_DECLARE_KVEC(key); SCOUTFS_DECLARE_KVEC(val); @@ -56,12 +67,53 @@ static struct cached_item *find_item(struct rb_root *root, struct kvec *key) return NULL; } +/* + * We store the dirty bits in a single value so that the simple + * augmented rbtree implementation gets a single scalar value to compare + * and store. + */ +#define ITEM_DIRTY 0x1 +#define LEFT_DIRTY 0x2 +#define RIGHT_DIRTY 0x4 + +/* + * Return the given dirty bit if the item with the given node is dirty + * or has dirty children. + */ +static long node_dirty_bit(struct rb_node *node, long dirty) +{ + struct cached_item *item; + + if (node) { + item = container_of(node, struct cached_item, node); + if (item->dirty) + return dirty; + } + + return 0; +} + +static long compute_item_dirty(struct cached_item *item) +{ + return (item->dirty & ITEM_DIRTY) | + node_dirty_bit(item->node.rb_left, LEFT_DIRTY) | + node_dirty_bit(item->node.rb_right, RIGHT_DIRTY); +} + +RB_DECLARE_CALLBACKS(static, scoutfs_item_rb_cb, struct cached_item, node, + long, dirty, compute_item_dirty); + +/* + * Always insert the given item. If there's an existing item it is + * returned. This can briefly leave duplicate items in the tree until + * the caller removes the existing item. + */ static struct cached_item *insert_item(struct rb_root *root, struct cached_item *ins) { struct rb_node **node = &root->rb_node; struct rb_node *parent = NULL; - struct cached_item *found = NULL; + struct cached_item *existing = NULL; struct cached_item *item; int cmp; @@ -71,22 +123,23 @@ static struct cached_item *insert_item(struct rb_root *root, cmp = scoutfs_kvec_memcmp(ins->key, item->key); if (cmp < 0) { + if (ins->dirty) + item->dirty |= LEFT_DIRTY; node = &(*node)->rb_left; } else if (cmp > 0) { + if (ins->dirty) + item->dirty |= RIGHT_DIRTY; node = &(*node)->rb_right; } else { - rb_replace_node(&item->node, &ins->node, root); - found = item; + existing = item; break; } } - if (!found) { - rb_link_node(&ins->node, parent, node); - rb_insert_color(&ins->node, root); - } + rb_link_node(&ins->node, parent, node); + rb_insert_augmented(&ins->node, root, &scoutfs_item_rb_cb); - return found; + return existing; } /* @@ -139,12 +192,96 @@ int scoutfs_item_lookup_exact(struct super_block *sb, struct kvec *key, ret = scoutfs_item_lookup(sb, key, val); if (ret == size) ret = 0; - else if (ret >= 0 && ret != size) + else if (ret >= 0) ret = -EIO; return ret; } +/* + * Return the next cached item starting with the given key. + * + * -ENOENT is returned if there are no cached items past the given key. + * If the last key is specified then -ENOENT is returned if there are no + * cached items up until that last key, inclusive. + * + * The found key is copied to the caller's key. -ENOBUFS is returned if + * the found key didn't fit in the caller's key. + * + * The found value is copied into the callers value. The number of + * value bytes copied is returned. The copied value can be truncated by + * the caller's value buffer length. + */ +int scoutfs_item_next(struct super_block *sb, struct kvec *key, + struct kvec *last, struct kvec *val) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *item; + unsigned long flags; + int ret; + + /* + * This partial copy and paste of lookup is stubbed out for now. + * we'll want the negative caching fixes to be able to iterate + * without constantly searching the manifest between cached + * items. + */ + return -EINVAL; + + do { + spin_lock_irqsave(&cac->lock, flags); + + item = find_item(&cac->root, key); + if (!item) { + ret = -ENOENT; + } else if (scoutfs_kvec_length(item->key) > + scoutfs_kvec_length(key)) { + ret = -ENOBUFS; + } else { + scoutfs_kvec_memcpy_truncate(key, item->key); + if (val) + ret = scoutfs_kvec_memcpy(val, item->val); + else + ret = 0; + } + + spin_unlock_irqrestore(&cac->lock, flags); + + } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + + trace_printk("ret %d\n", ret); + + return ret; +} + +/* + * Like _next but requires that the found keys be the same length as the + * search key and that values be of at least a minimum size. It treats + * size mismatches as a sign of corruption. A found key larger than the + * found key buffer gives -ENOBUFS and is a sign of corruption. + */ +int scoutfs_item_next_same_min(struct super_block *sb, struct kvec *key, + struct kvec *last, struct kvec *val, int len) +{ + int key_len = scoutfs_kvec_length(key); + int ret; + + trace_printk("key len %u min val len %d\n", key_len, len); + + if (WARN_ON_ONCE(!val || scoutfs_kvec_length(val) < len)) + return -EINVAL; + + ret = scoutfs_item_next(sb, key, last, val); + if (ret == -ENOBUFS || + (ret >= 0 && (scoutfs_kvec_length(key) != key_len || ret < len))) + ret = -EIO; + + trace_printk("ret %d\n", ret); + + return ret; +} + static void free_item(struct cached_item *item) { if (!IS_ERR_OR_NULL(item)) { @@ -154,21 +291,77 @@ static void free_item(struct cached_item *item) } } +/* + * The caller might have modified the item's dirty flags. Ascend + * through parents updating their dirty flags until there's no change. + */ +static void update_dirty_parents(struct cached_item *item) +{ + struct cached_item *parent; + struct rb_node *node; + long dirty; + + while ((node = rb_parent(&item->node))) { + parent = container_of(node, struct cached_item, node); + dirty = compute_item_dirty(parent); + + if (parent->dirty == dirty) + break; + + parent->dirty = dirty; + item = parent; + } +} + +static void mark_item_dirty(struct item_cache *cac, + struct cached_item *item) +{ + if (WARN_ON_ONCE(RB_EMPTY_NODE(&item->node))) + return; + + if (item->dirty & ITEM_DIRTY) + return; + + item->dirty |= ITEM_DIRTY; + cac->nr_dirty_items++; + cac->dirty_key_bytes += scoutfs_kvec_length(item->key); + cac->dirty_val_bytes += scoutfs_kvec_length(item->val); + + update_dirty_parents(item); +} + +static void clear_item_dirty(struct item_cache *cac, + struct cached_item *item) +{ + if (WARN_ON_ONCE(RB_EMPTY_NODE(&item->node))) + return; + + if (!(item->dirty & ITEM_DIRTY)) + return; + + item->dirty &= ~ITEM_DIRTY; + cac->nr_dirty_items--; + cac->dirty_key_bytes -= scoutfs_kvec_length(item->key); + cac->dirty_val_bytes -= scoutfs_kvec_length(item->val); + + update_dirty_parents(item); +} + /* * Add an item with the key and value to the item cache. The new item * is clean. Any existing item at the key will be removed and freed. */ -int scoutfs_item_insert(struct super_block *sb, struct kvec *key, - struct kvec *val) +static int add_item(struct super_block *sb, struct kvec *key, struct kvec *val, + bool dirty) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; - struct cached_item *found; + struct cached_item *existing; struct cached_item *item; unsigned long flags; int ret; - item = kmalloc(sizeof(struct cached_item), GFP_NOFS); + item = kzalloc(sizeof(struct cached_item), GFP_NOFS); if (!item) return -ENOMEM; @@ -180,9 +373,265 @@ int scoutfs_item_insert(struct super_block *sb, struct kvec *key, } spin_lock_irqsave(&cac->lock, flags); - found = insert_item(&cac->root, item); + existing = insert_item(&cac->root, item); + if (existing) { + clear_item_dirty(cac, existing); + rb_erase_augmented(&item->node, &cac->root, + &scoutfs_item_rb_cb); + } + mark_item_dirty(cac, item); spin_unlock_irqrestore(&cac->lock, flags); - free_item(found); + free_item(existing); + + return 0; +} + +/* + * Add a clean item to the cache. This is used to populate items while + * reading segments. + */ +int scoutfs_item_insert(struct super_block *sb, struct kvec *key, + struct kvec *val) +{ + return add_item(sb, key, val, false); +} + +/* + * Create a new dirty item in the cache. + */ +int scoutfs_item_create(struct super_block *sb, struct kvec *key, + struct kvec *val) +{ + return add_item(sb, key, val, true); +} + +/* + * If the item with the key exists make sure it's cached and dirty. -ENOENT + * will be returned if it doesn't exist. + */ +int scoutfs_item_dirty(struct super_block *sb, struct kvec *key) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *item; + unsigned long flags; + int ret; + + do { + spin_lock_irqsave(&cac->lock, flags); + + item = find_item(&cac->root, key); + if (item) { + mark_item_dirty(cac, item); + ret = 0; + } else { + ret = -ENOENT; + } + + spin_unlock_irqrestore(&cac->lock, flags); + + } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + + trace_printk("ret %d\n", ret); + + return ret; +} + +/* + * Set the value of an existing item in the tree. The item is marked dirty + * and the previous value is freed. The provided value may be null. + * + * Returns -ENOENT if the item doesn't exist. + */ +int scoutfs_item_update(struct super_block *sb, struct kvec *key, + struct kvec *val) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + SCOUTFS_DECLARE_KVEC(up_val); + struct cached_item *item; + unsigned long flags; + int ret; + + if (val) { + ret = scoutfs_kvec_dup_flatten(up_val, val); + if (ret) + return -ENOMEM; + } else { + scoutfs_kvec_init_null(up_val); + } + + spin_lock_irqsave(&cac->lock, flags); + + /* XXX update seq */ + item = find_item(&cac->root, key); + if (item) { + scoutfs_kvec_swap(up_val, item->val); + mark_item_dirty(cac, item); + } else { + ret = -ENOENT; + } + + spin_unlock_irqrestore(&cac->lock, flags); + + scoutfs_kvec_kfree(up_val); + + trace_printk("ret %d\n", ret); + + return ret; +} + +/* + * XXX how nice, it'd just creates a cached deletion item. It doesn't + * have to read. + */ +int scoutfs_item_delete(struct super_block *sb, struct kvec *key) +{ + return WARN_ON_ONCE(-EINVAL); +} + +/* + * Return the first dirty node in the subtree starting at the given node. + */ +static struct cached_item *first_dirty(struct rb_node *node) +{ + struct cached_item *ret = NULL; + struct cached_item *item; + + while (node) { + item = container_of(node, struct cached_item, node); + + if (item->dirty & LEFT_DIRTY) { + node = item->node.rb_left; + } else if (item->dirty & ITEM_DIRTY) { + ret = item; + break; + } else if (item->dirty & RIGHT_DIRTY) { + node = item->node.rb_right; + } + } + + return ret; +} + +/* + * Find the next dirty item after a given item. First we see if we have + * a dirty item in our right subtree. If not we ascend through parents + * skipping those that are less than us. If we find a parent that's + * greater than us then we see if it's dirty, if not we start the search + * all over again by checking its right subtree then ascending. + */ +static struct cached_item *next_dirty(struct cached_item *item) +{ + struct rb_node *parent; + struct rb_node *node; + + while (item) { + if (item->dirty & RIGHT_DIRTY) + return first_dirty(item->node.rb_right); + + /* find next greatest parent */ + node = &item->node; + while ((parent = rb_parent(node)) && parent->rb_right == node) + node = parent; + if (!parent) + break; + + /* done if our next greatest parent itself is dirty */ + item = container_of(parent, struct cached_item, node); + if (item->dirty & ITEM_DIRTY) + return item; + + /* continue to check right subtree */ + } + + return NULL; +} + +/* + * The total number of bytes that will be stored in segments if we were + * to write out all the currently dirty items. + * + * XXX this isn't strictly correct because item's aren't of a uniform + * size. We might need more segments when large items leave gaps at the + * tail of each segment as it is filled with sorted items. It's close + * enough for now. + */ +long scoutfs_item_dirty_bytes(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + unsigned long flags; + long bytes; + + spin_lock_irqsave(&cac->lock, flags); + + bytes = (cac->nr_dirty_items * sizeof(struct scoutfs_segment_item)) + + cac->dirty_key_bytes + cac->dirty_val_bytes; + + spin_unlock_irqrestore(&cac->lock, flags); + + bytes += DIV_ROUND_UP(bytes, sizeof(struct scoutfs_segment_block)) * + sizeof(struct scoutfs_segment_block); + + return bytes; +} + +/* + * Find the initial sorted dirty items that will fit in a segment. Give + * the caller the number of items and the total bytes of their keys. + */ +static void count_seg_items(struct item_cache *cac, u32 *nr_items, + u32 *key_bytes) +{ + struct cached_item *item; + u32 total; + + *nr_items = 0; + *key_bytes = 0; + total = sizeof(struct scoutfs_segment_block); + + for (item = first_dirty(cac->root.rb_node); item; + item = next_dirty(item)) { + + total += sizeof(struct scoutfs_segment_item) + + scoutfs_kvec_length(item->key) + + scoutfs_kvec_length(item->val); + + if (total > SCOUTFS_SEGMENT_SIZE) + break; + + (*nr_items)++; + (*key_bytes) += scoutfs_kvec_length(item->key); + } +} + +/* + * Fill the given segment with sorted dirty items. + * + * The caller is responsible for the consistency of the dirty items once + * they're in its seg. We can consider them clean once we store them. + */ +int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *item; + u32 key_bytes; + u32 nr_items; + + count_seg_items(cac, &nr_items, &key_bytes); + if (nr_items) { + item = first_dirty(cac->root.rb_node); + scoutfs_seg_first_item(sb, seg, item->key, item->val, + nr_items, key_bytes); + clear_item_dirty(cac, item); + + while ((item = next_dirty(item))) { + scoutfs_seg_append_item(sb, seg, item->key, item->val); + clear_item_dirty(cac, item); + } + } return 0; } @@ -207,8 +656,8 @@ void scoutfs_item_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct item_cache *cac = sbi->item_cache; - struct rb_node *node; struct cached_item *item; + struct rb_node *node; if (cac) { for (node = rb_first(&cac->root); node; ) { @@ -219,5 +668,4 @@ void scoutfs_item_destroy(struct super_block *sb) kfree(cac); } - } diff --git a/kmod/src/item.h b/kmod/src/item.h index bfaae9db..62d93815 100644 --- a/kmod/src/item.h +++ b/kmod/src/item.h @@ -3,12 +3,27 @@ #include +struct scoutfs_segment; + int scoutfs_item_lookup(struct super_block *sb, struct kvec *key, struct kvec *val); int scoutfs_item_lookup_exact(struct super_block *sb, struct kvec *key, struct kvec *val, int size); +int scoutfs_item_next(struct super_block *sb, struct kvec *key, + struct kvec *last, struct kvec *val); +int scoutfs_item_next_same_min(struct super_block *sb, struct kvec *key, + struct kvec *last, struct kvec *val, int len); int scoutfs_item_insert(struct super_block *sb, struct kvec *key, struct kvec *val); +int scoutfs_item_create(struct super_block *sb, struct kvec *key, + struct kvec *val); +int scoutfs_item_dirty(struct super_block *sb, struct kvec *key); +int scoutfs_item_update(struct super_block *sb, struct kvec *key, + struct kvec *val); +int scoutfs_item_delete(struct super_block *sb, struct kvec *key); + +long scoutfs_item_dirty_bytes(struct super_block *sb); +int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg); int scoutfs_item_setup(struct super_block *sb); void scoutfs_item_destroy(struct super_block *sb); diff --git a/kmod/src/kvec.c b/kmod/src/kvec.c index e2b26061..6cddb073 100644 --- a/kmod/src/kvec.c +++ b/kmod/src/kvec.c @@ -112,6 +112,28 @@ int scoutfs_kvec_memcpy(struct kvec *dst, struct kvec *src) return copied; } +/* + * Copy bytes in src into dst, stopping if dst is full. The number of copied + * bytes is returned and the lengths of dst are updated if the size changes. + * The pointers in dst are not changed. + */ +int scoutfs_kvec_memcpy_truncate(struct kvec *dst, struct kvec *src) +{ + int copied = scoutfs_kvec_memcpy(dst, src); + size_t bytes; + int i; + + if (copied < scoutfs_kvec_length(dst)) { + bytes = copied; + for (i = 0; i < SCOUTFS_KVEC_NR; i++) { + dst[i].iov_len = min(dst[i].iov_len, bytes); + bytes -= dst[i].iov_len; + } + } + + return copied; +} + /* * Copy the src key vector into one new allocation in the dst. The existing * dst is clobbered. The source isn't changed. @@ -139,3 +161,17 @@ void scoutfs_kvec_kfree(struct kvec *kvec) while (kvec->iov_base) kfree((kvec++)->iov_base); } + +void scoutfs_kvec_init_null(struct kvec *kvec) +{ + memset(kvec, 0, SCOUTFS_KVEC_NR * sizeof(kvec[0])); +} + +void scoutfs_kvec_swap(struct kvec *a, struct kvec *b) +{ + SCOUTFS_DECLARE_KVEC(tmp); + + memcpy(tmp, a, sizeof(tmp)); + memcpy(a, b, sizeof(tmp)); + memcpy(b, tmp, sizeof(tmp)); +} diff --git a/kmod/src/kvec.h b/kmod/src/kvec.h index 600055e9..49d51ae9 100644 --- a/kmod/src/kvec.h +++ b/kmod/src/kvec.h @@ -61,7 +61,10 @@ int scoutfs_kvec_memcmp(struct kvec *a, struct kvec *b); int scoutfs_kvec_cmp_overlap(struct kvec *a, struct kvec *b, struct kvec *c, struct kvec *d); int scoutfs_kvec_memcpy(struct kvec *dst, struct kvec *src); +int scoutfs_kvec_memcpy_truncate(struct kvec *dst, struct kvec *src); int scoutfs_kvec_dup_flatten(struct kvec *dst, struct kvec *src); void scoutfs_kvec_kfree(struct kvec *kvec); +void scoutfs_kvec_init_null(struct kvec *kvec); +void scoutfs_kvec_swap(struct kvec *a, struct kvec *b); #endif diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c index c6b1a33f..15c22fd5 100644 --- a/kmod/src/manifest.c +++ b/kmod/src/manifest.c @@ -20,6 +20,7 @@ #include "kvec.h" #include "seg.h" #include "item.h" +#include "ring.h" #include "manifest.h" struct manifest { @@ -30,6 +31,8 @@ struct manifest { u8 last_level; struct rb_root level_roots[SCOUTFS_MANIFEST_MAX_LEVEL + 1]; + + struct list_head dirty_list; }; #define DECLARE_MANIFEST(sb, name) \ @@ -40,12 +43,11 @@ struct manifest_entry { struct list_head level0_entry; struct rb_node node; }; + struct list_head dirty_entry; - SCOUTFS_DECLARE_KVEC(first); - SCOUTFS_DECLARE_KVEC(last); - u64 segno; - u64 seq; - u8 level; + struct scoutfs_ring_add_manifest am; + /* u8 key_bytes[am.first_key_len]; */ + /* u8 val_bytes[am.last_key_len]; */ }; /* @@ -60,6 +62,32 @@ struct manifest_ref { u8 level; }; +static void init_ment_keys(struct manifest_entry *ment, struct kvec *first, + struct kvec *last) +{ + scoutfs_kvec_init(first, &ment->am + 1, + le16_to_cpu(ment->am.first_key_len)); + scoutfs_kvec_init(last, &ment->am + 1 + + le16_to_cpu(ment->am.first_key_len), + le16_to_cpu(ment->am.last_key_len)); +} + +/* + * returns: + * < 0 : key < ment->first_key + * > 0 : key > ment->first_key + * == 0 : ment->first_key <= key <= ment->last_key + */ +static bool cmp_key_ment(struct kvec *key, struct manifest_entry *ment) +{ + SCOUTFS_DECLARE_KVEC(first); + SCOUTFS_DECLARE_KVEC(last); + + init_ment_keys(ment, first, last); + + return scoutfs_kvec_cmp_overlap(key, key, first, last); +} + static struct manifest_entry *find_ment(struct rb_root *root, struct kvec *key) { struct rb_node *node = root->rb_node; @@ -69,8 +97,7 @@ static struct manifest_entry *find_ment(struct rb_root *root, struct kvec *key) while (node) { ment = container_of(node, struct manifest_entry, node); - cmp = scoutfs_kvec_cmp_overlap(key, key, - ment->first, ment->last); + cmp = cmp_key_ment(key, ment); if (cmp < 0) node = node->rb_left; else if (cmp > 0) @@ -91,14 +118,17 @@ static int insert_ment(struct rb_root *root, struct manifest_entry *ins) struct rb_node **node = &root->rb_node; struct rb_node *parent = NULL; struct manifest_entry *ment; + SCOUTFS_DECLARE_KVEC(key); int cmp; + /* either first or last works */ + init_ment_keys(ins, key, key); + while (*node) { parent = *node; ment = container_of(*node, struct manifest_entry, node); - cmp = scoutfs_kvec_cmp_overlap(ins->first, ins->last, - ment->first, ment->last); + cmp = cmp_key_ment(key, ment); if (cmp < 0) { node = &(*node)->rb_left; } else if (cmp > 0) { @@ -116,29 +146,32 @@ static int insert_ment(struct rb_root *root, struct manifest_entry *ins) static void free_ment(struct manifest_entry *ment) { - if (!IS_ERR_OR_NULL(ment)) { - scoutfs_kvec_kfree(ment->first); - scoutfs_kvec_kfree(ment->last); + if (!IS_ERR_OR_NULL(ment)) kfree(ment); - } } -static int add_ment(struct manifest *mani, struct manifest_entry *ment) +static int add_ment(struct manifest *mani, struct manifest_entry *ment, + bool dirty) { + u8 level = ment->am.level; int ret; - trace_printk("adding ment %p level %u\n", ment, ment->level); - if (ment->level) { - ret = insert_ment(&mani->level_roots[ment->level], ment); + trace_printk("adding ment %p level %u\n", ment, level); + + if (level) { + ret = insert_ment(&mani->level_roots[level], ment); if (!ret) - mani->last_level = max(mani->last_level, ment->level); + mani->last_level = max(mani->last_level, level); } else { list_add_tail(&ment->level0_entry, &mani->level0_list); mani->level0_nr++; ret = 0; } + if (dirty) + list_add_tail(&ment->dirty_entry, &mani->dirty_list); + return ret; } @@ -155,41 +188,52 @@ static void update_last_level(struct manifest *mani) static void remove_ment(struct manifest *mani, struct manifest_entry *ment) { - if (ment->level) { - rb_erase(&ment->node, &mani->level_roots[ment->level]); + u8 level = ment->am.level; + + if (level) { + rb_erase(&ment->node, &mani->level_roots[level]); update_last_level(mani); } else { list_del_init(&ment->level0_entry); mani->level0_nr--; } + + /* XXX more carefully remove dirty ments.. should be exceptional */ + if (!list_empty(&ment->dirty_entry)) + list_del_init(&ment->dirty_entry); } int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, - struct kvec *last, u64 segno, u64 seq, u8 level) + struct kvec *last, u64 segno, u64 seq, u8 level, + bool dirty) { DECLARE_MANIFEST(sb, mani); struct manifest_entry *ment; unsigned long flags; + int bytes; int ret; - ment = kmalloc(sizeof(struct manifest_entry), GFP_NOFS); + bytes = sizeof(struct manifest_entry) + scoutfs_kvec_length(first), + scoutfs_kvec_length(last); + ment = kmalloc(bytes, GFP_NOFS); if (!ment) return -ENOMEM; - ret = scoutfs_kvec_dup_flatten(ment->first, first) ?: - scoutfs_kvec_dup_flatten(ment->last, last); - if (ret) { - free_ment(ment); - return -ENOMEM; - } + if (level) + RB_CLEAR_NODE(&ment->node); + else + INIT_LIST_HEAD(&ment->level0_entry); + INIT_LIST_HEAD(&ment->dirty_entry); - ment->segno = segno; - ment->seq = seq; - ment->level = level; + ment->am.eh.type = SCOUTFS_RING_ADD_MANIFEST; + ment->am.eh.len = cpu_to_le16(bytes); + ment->am.segno = cpu_to_le64(segno); + ment->am.seq = cpu_to_le64(seq); + ment->am.level = level; /* XXX think about where to insert level 0 */ spin_lock_irqsave(&mani->lock, flags); - ret = add_ment(mani, ment); + ret = add_ment(mani, ment, dirty); spin_unlock_irqrestore(&mani->lock, flags); if (WARN_ON_ONCE(ret)) /* XXX can this happen? ring corruption? */ free_ment(ment); @@ -197,11 +241,11 @@ int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, return ret; } -static void set_ref(struct manifest_ref *ref, struct manifest_entry *mani) +static void set_ref(struct manifest_ref *ref, struct manifest_entry *ment) { - ref->segno = mani->segno; - ref->seq = mani->seq; - ref->level = mani->level; + ref->segno = le64_to_cpu(ment->am.segno); + ref->seq = le64_to_cpu(ment->am.seq); + ref->level = ment->am.level; } /* @@ -242,8 +286,7 @@ static struct manifest_ref *get_key_refs(struct manifest *mani, list_for_each_entry(ment, &mani->level0_list, level0_entry) { trace_printk("trying l0 ment %p\n", ment); - if (scoutfs_kvec_cmp_overlap(key, key, - ment->first, ment->last)) + if (cmp_key_ment(key, ment)) continue; set_ref(&refs[nr++], ment); @@ -410,6 +453,32 @@ out: return ret; } +int scoutfs_manifest_has_dirty(struct super_block *sb) +{ + DECLARE_MANIFEST(sb, mani); + + return !list_empty_careful(&mani->dirty_list); +} + +/* + * Append the dirty manifest entries to the end of the ring. + * + * This returns 0 but can't fail. + */ +int scoutfs_manifest_dirty_ring(struct super_block *sb) +{ + DECLARE_MANIFEST(sb, mani); + struct manifest_entry *ment; + struct manifest_entry *tmp; + + list_for_each_entry_safe(ment, tmp, &mani->dirty_list, dirty_entry) { + scoutfs_ring_append(sb, &ment->am.eh); + list_del_init(&ment->dirty_entry); + } + + return 0; +} + int scoutfs_manifest_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -423,6 +492,7 @@ int scoutfs_manifest_setup(struct super_block *sb) spin_lock_init(&mani->lock); INIT_LIST_HEAD(&mani->level0_list); + INIT_LIST_HEAD(&mani->dirty_list); for (i = 0; i < ARRAY_SIZE(mani->level_roots); i++) mani->level_roots[i] = RB_ROOT; diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h index c1ea0160..f3bea21a 100644 --- a/kmod/src/manifest.h +++ b/kmod/src/manifest.h @@ -2,7 +2,11 @@ #define _SCOUTFS_MANIFEST_H_ int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, - struct kvec *last, u64 segno, u64 seq, u8 level); + struct kvec *last, u64 segno, u64 seq, u8 level, + bool dirty); +int scoutfs_manifest_has_dirty(struct super_block *sb); +int scoutfs_manifest_dirty_ring(struct super_block *sb); + int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key); int scoutfs_manifest_setup(struct super_block *sb); diff --git a/kmod/src/ring.c b/kmod/src/ring.c index 867acd3b..3cda13b5 100644 --- a/kmod/src/ring.c +++ b/kmod/src/ring.c @@ -13,126 +13,140 @@ #include #include #include +#include #include "super.h" #include "format.h" #include "kvec.h" #include "bio.h" #include "manifest.h" +#include "alloc.h" #include "ring.h" +#include "crc.h" + /* - * OK, log: - * - big preallocated ring of variable length entries - * - entries are rounded to 4k blocks - * - entire thing is read and indexed in rbtree - * - static allocated page is kept around to record and write entries - * - indexes have cursor that points to next node to migrate - * - any time an entry is written an entry is migrated - * - allocate room for 4x (maybe including worst case rounding) - * - mount does binary search looking for newest entry - * - newest entry describes block where we started migrating - * - replay then walks from oldest to newest replaying - * - entries are marked with migration so we know where to set cursor after + * Right now we're only writing a segment a time. The entries needed to + * write a segment will always be smaller than a segment itself. * + * XXX This'll get more clever as we can write multiple segments and build + * up dirty entries while processing compaction results. + */ +struct ring_info { + struct page *pages[SCOUTFS_SEGMENT_PAGES]; + struct scoutfs_ring_block *ring; + struct scoutfs_ring_entry_header *next_eh; + unsigned int nr_blocks; + unsigned int space; +}; + +#define DECLARE_RING_INFO(sb, name) \ + struct ring_info *name = SCOUTFS_SB(sb)->ring_info + +/* * XXX * - verify blocks * - could compress + * - have all entry sources dirty at cursors before dirtying + * - advancing cursor updates head as cursor wraps */ -/* read in a meg at a time */ -#define NR_PAGES DIV_ROUND_UP(1024 * 1024, PAGE_SIZE) -#define NR_BLOCKS (NR_PAGES * SCOUTFS_BLOCKS_PER_PAGE) - -#if 0 -#define BLOCKS_PER_PAGE (PAGE_SIZE / SCOUTFS_BLOCK_SIZE) -static void read_page_end_io(struct bio *bio, int err) +/* + * The space calculation when starting a block included a final empty + * entry header. That is zeroed here. + */ +static void finish_block(struct scoutfs_ring_block *ring, unsigned int tail) { - struct bio_vec *bvec; - struct page *page; - unsigned long i; + memset((char *)ring + tail, 0, SCOUTFS_BLOCK_SIZE - tail); + scoutfs_crc_block(&ring->hdr); +} - for_each_bio_segment(bio, bvec, i) { - page = bvec->bv_page; +void scoutfs_ring_append(struct super_block *sb, + struct scoutfs_ring_entry_header *eh) +{ + DECLARE_RING_INFO(sb, rinf); + struct scoutfs_ring_block *ring = rinf->ring; + unsigned int len = le16_to_cpu(eh->len); - if (err) - SetPageError(page); - else - SetPageUptodate(page); - unlock_page(page); + if (rinf->space < len) { + if (ring) + finish_block(ring, rinf->space); + ring = scoutfs_page_block_address(rinf->pages, rinf->nr_blocks); + rinf->ring = ring; + + memset(ring, 0, sizeof(struct scoutfs_ring_block)); + + rinf->nr_blocks++; + rinf->next_eh = ring->entries; + rinf->space = SCOUTFS_BLOCK_SIZE - + offsetof(struct scoutfs_ring_block, entries) - + sizeof(struct scoutfs_ring_entry_header); } - bio_put(bio); + memcpy(rinf->next_eh, eh, len); + rinf->next_eh = (void *)((char *)eh + len); + rinf->space -= len; } /* - * Read the given number of 4k blocks into the pages provided by the - * caller. We translate the block count into a page count and fill - * bios a page at a time. + * Kick off the writes to update the ring. Update the dirty super to + * reference the written ring. */ -static int read_blocks(struct super_block *sb, struct page **pages, - u64 blkno, unsigned int nr_blocks) +int scoutfs_ring_submit_write(struct super_block *sb, + struct scoutfs_bio_completion *comp) { - unsigned int nr_pages = DIV_ROUND_UP(nr_blocks, PAGES_PER_BLOCK); - unsigned int bytes; - struct bio *bio; - int ret = 0; + struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; + DECLARE_RING_INFO(sb, rinf); + u64 head_blocks; + u64 blocks; + u64 blkno; + u64 ind; - for (i = 0; i < nr_pages; i++) { - page = pages[i]; + if (!rinf->nr_blocks) + return 0; - if (!bio) { - bio = bio_alloc(GFP_NOFS, nr_pages - i); - if (!bio) - bio = bio_alloc(GFP_NOFS, 1); - if (!bio) { - ret = -ENOMEM; - break; - } + if (rinf->space) + finish_block(rinf->ring, rinf->space); - bio->bi_sector = blkno << (SCOUTFS_BLOCK_SHIFT - 9); - bio->bi_bdev = sb->s_bdev; - bio->bi_end_io = read_pages_end_io; - } + ind = le64_to_cpu(super->ring_tail_index) + 1; + blocks = rinf->nr_blocks; + blkno = le64_to_cpu(super->ring_blkno) + ind; - lock_page(page); - ClearPageError(page); - ClearPageUptodate(page); + /* + * If the log wrapped then we have to write two fragments to the + * tail and head of the ring. We submit the head fragment + * first. + * + * The head fragment starts at some block offset in the + * preallocated pages. This hacky page math only works when our + * 4k blocks size == page_size. To fix it we'd add a offset + * block to the bio submit loop which could add an initial + * partial page vec to the bios. + */ + BUILD_BUG_ON(SCOUTFS_BLOCK_SIZE != PAGE_SIZE); - bytes = min(nr_blocks << SCOUTFS_BLOCK_SHIFT, PAGE_SIZE); - - if (bio_add_page(bio, page, bytes, 0) != bytes) { - /* submit the full bio and retry this page */ - submit_bio(READ, bio); - bio = NULL; - unlock_page(page); - i--; - continue; - } - - blkno += BLOCKS_PER_PAGE; - nr_blocks -= BLOCKS_PER_PAGE; + if (ind + blocks > le64_to_cpu(super->ring_blocks)) { + head_blocks = (ind + blocks) - le64_to_cpu(super->ring_blocks); + blocks -= head_blocks; + scoutfs_bio_submit_comp(sb, WRITE, rinf->pages + blocks, + le64_to_cpu(super->ring_blkno), + head_blocks, comp); } - if (bio) - submit_bio(READ, bio); + scoutfs_bio_submit_comp(sb, WRITE, rinf->pages, blkno, blocks, comp); - for (i = 0; i < nr_pages; i++) { - page = pages[i]; + ind += blocks; + if (ind == le64_to_cpu(super->ring_blocks)) + ind = 0; + super->ring_tail_index = cpu_to_le64(ind); - wait_on_page_locked(page); - if (!ret && (!PageUptodate(page) || PageError(page))) - ret = -EIO; - } - - return ret; + return 0; } -#endif - static int read_one_entry(struct super_block *sb, struct scoutfs_ring_entry_header *eh) { + struct scoutfs_ring_alloc_region *reg; struct scoutfs_ring_add_manifest *am; SCOUTFS_DECLARE_KVEC(first); SCOUTFS_DECLARE_KVEC(last); @@ -156,7 +170,13 @@ static int read_one_entry(struct super_block *sb, ret = scoutfs_manifest_add(sb, first, last, le64_to_cpu(am->segno), - le64_to_cpu(am->seq), am->level); + le64_to_cpu(am->seq), am->level, + false); + break; + + case SCOUTFS_RING_ADD_ALLOC: + reg = container_of(eh, struct scoutfs_ring_alloc_region, eh); + ret = scoutfs_alloc_add(sb, reg); break; default: @@ -171,33 +191,22 @@ static int read_entries(struct super_block *sb, { struct scoutfs_ring_entry_header *eh; int ret = 0; - int i; - trace_printk("reading %u entries\n", le32_to_cpu(ring->nr_entries)); + for (eh = ring->entries; eh->len; + eh = (void *)eh + le16_to_cpu(eh->len)) { - eh = ring->entries; - - for (i = 0; i < le32_to_cpu(ring->nr_entries); i++) { ret = read_one_entry(sb, eh); if (ret) break; - - eh = (void *)eh + le16_to_cpu(eh->len); } return ret; } -#if 0 -/* return pointer to the blk 4k block offset amongst the pages */ -static void *page_block_address(struct page **pages, unsigned int blk) -{ - unsigned int i = blk / BLOCKS_PER_PAGE; - unsigned int off = (blk % BLOCKS_PER_PAGE) << SCOUTFS_BLOCK_SHIFT; - return page_address(pages[i]) + off; -} -#endif +/* read in a meg at a time */ +#define NR_PAGES DIV_ROUND_UP(1024 * 1024, PAGE_SIZE) +#define NR_BLOCKS (NR_PAGES * SCOUTFS_BLOCKS_PER_PAGE) int scoutfs_ring_read(struct super_block *sb) { @@ -274,3 +283,43 @@ out: return ret; } + +int scoutfs_ring_setup(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct ring_info *rinf; + struct page *page; + int i; + + rinf = kzalloc(sizeof(struct ring_info), GFP_KERNEL); + if (!rinf) + return -ENOMEM; + sbi->ring_info = rinf; + + for (i = 0; i < ARRAY_SIZE(rinf->pages); i++) { + page = alloc_page(GFP_KERNEL); + if (!page) { + while (--i >= 0) + __free_page(rinf->pages[i]); + return -ENOMEM; + } + + rinf->pages[i] = page; + } + + return 0; +} + +void scoutfs_ring_destroy(struct super_block *sb) +{ + DECLARE_RING_INFO(sb, rinf); + int i; + + if (rinf) { + for (i = 0; i < ARRAY_SIZE(rinf->pages); i++) + __free_page(rinf->pages[i]); + + kfree(rinf); + } +} + diff --git a/kmod/src/ring.h b/kmod/src/ring.h index 4f6930c9..94eb84c3 100644 --- a/kmod/src/ring.h +++ b/kmod/src/ring.h @@ -3,6 +3,16 @@ #include +struct scoutfs_bio_completion; + int scoutfs_ring_read(struct super_block *sb); +void scoutfs_ring_append(struct super_block *sb, + struct scoutfs_ring_entry_header *eh); + +int scoutfs_ring_submit_write(struct super_block *sb, + struct scoutfs_bio_completion *comp); + +int scoutfs_ring_setup(struct super_block *sb); +void scoutfs_ring_destroy(struct super_block *sb); #endif diff --git a/kmod/src/seg.c b/kmod/src/seg.c index 4537c50c..e86d595d 100644 --- a/kmod/src/seg.c +++ b/kmod/src/seg.c @@ -21,6 +21,8 @@ #include "seg.h" #include "bio.h" #include "kvec.h" +#include "manifest.h" +#include "alloc.h" /* * seg.c should just be about the cache and io, and maybe @@ -127,8 +129,9 @@ static struct scoutfs_segment *find_seg(struct rb_root *root, u64 segno) /* * This always inserts the segment into the rbtree. If there's already - * a segment at the given seg then it is removed and returned. The caller - * doesn't have to erase it from the tree if it's returned. + * a segment at the given seg then it is removed and returned. The + * caller doesn't have to erase it from the tree if it's returned but it + * does have to put the reference that it's given. */ static struct scoutfs_segment *replace_seg(struct rb_root *root, struct scoutfs_segment *ins) @@ -205,6 +208,45 @@ static u64 segno_to_blkno(u64 blkno) return blkno << (SCOUTFS_SEGMENT_SHIFT - SCOUTFS_BLOCK_SHIFT); } +int scoutfs_seg_alloc(struct super_block *sb, struct scoutfs_segment **seg_ret) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac = sbi->segment_cache; + struct scoutfs_segment *existing; + struct scoutfs_segment *seg; + unsigned long flags; + u64 segno; + int ret; + + *seg_ret = NULL; + + ret = scoutfs_alloc_segno(sb, &segno); + if (ret) + goto out; + + seg = alloc_seg(segno); + if (!seg) { + ret = scoutfs_alloc_free(sb, segno); + BUG_ON(ret); /* XXX could make pending when allocating */ + ret = -ENOMEM; + goto out; + } + + /* XXX always remove existing segs, is that necessary? */ + spin_lock_irqsave(&cac->lock, flags); + atomic_inc(&seg->refcount); + existing = replace_seg(&cac->root, seg); + spin_unlock_irqrestore(&cac->lock, flags); + if (existing) + scoutfs_seg_put(existing); + + *seg_ret = seg; + ret = 0; +out: + return ret; + +} + /* * The bios submitted by this don't have page references themselves. If * this succeeds then the caller must call _wait before putting their @@ -248,6 +290,19 @@ struct scoutfs_segment *scoutfs_seg_submit_read(struct super_block *sb, return seg; } +int scoutfs_seg_submit_write(struct super_block *sb, + struct scoutfs_segment *seg, + struct scoutfs_bio_completion *comp) +{ + trace_printk("submitting segno %llu\n", seg->segno); + + scoutfs_bio_submit_comp(sb, WRITE, seg->pages, + segno_to_blkno(seg->segno), + SCOUTFS_SEGMENT_BLOCKS, comp); + + return 0; +} + int scoutfs_seg_wait(struct super_block *sb, struct scoutfs_segment *seg) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -270,29 +325,67 @@ static void *off_ptr(struct scoutfs_segment *seg, u32 off) return page_address(seg->pages[pg]) + pg_off; } -/* - * Return a pointer to the item in the array at the given position. - * - * The item structs fill the first block in the segment after the - * initial segment block struct. Item structs don't cross block - * boundaries so the final bytes that would make up a partial item - * struct are skipped. - */ -static struct scoutfs_segment_item *pos_item(struct scoutfs_segment *seg, - int pos) +static u32 pos_off(struct scoutfs_segment *seg, u32 pos) { - u32 off; + /* items need of be a power of two */ + BUILD_BUG_ON(!is_power_of_2(sizeof(struct scoutfs_segment_item))); + /* and the first item has to be naturally aligned */ + BUILD_BUG_ON(offsetof(struct scoutfs_segment_block, items) & + sizeof(struct scoutfs_segment_item)); - if (pos < SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS) { - off = sizeof(struct scoutfs_segment_block); - } else { - pos -= SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS; - off = (1 + (pos / SCOUTFS_SEGMENT_ITEMS_PER_BLOCK)) * - SCOUTFS_BLOCK_SIZE; - pos %= SCOUTFS_SEGMENT_ITEMS_PER_BLOCK; - } + return offsetof(struct scoutfs_segment_block, items[pos]); +} - return off_ptr(seg, off + (pos * sizeof(struct scoutfs_segment_item))); +static void *pos_ptr(struct scoutfs_segment *seg, u32 pos) +{ + return off_ptr(seg, pos_off(seg, pos)); +} + +/* + * The persistent item fields that are stored in the segment are packed + * with funny precision. We translate those to and from a much more + * natural native representation of the fields. + */ +struct native_item { + u64 seq; + u32 key_off; + u32 val_off; + u16 key_len; + u16 val_len; +}; + +static void load_item(struct scoutfs_segment *seg, u32 pos, + struct native_item *item) +{ + struct scoutfs_segment_item *sitem = pos_ptr(seg, pos); + u32 packed; + + item->seq = le64_to_cpu(sitem->seq); + + packed = le32_to_cpu(sitem->key_off_len); + item->key_off = packed >> SCOUTFS_SEGMENT_ITEM_OFF_SHIFT; + item->key_len = packed & SCOUTFS_SEGMENT_ITEM_LEN_MASK; + + packed = le32_to_cpu(sitem->val_off_len); + item->val_off = packed >> SCOUTFS_SEGMENT_ITEM_OFF_SHIFT; + item->val_len = packed & SCOUTFS_SEGMENT_ITEM_LEN_MASK; +} + +static void store_item(struct scoutfs_segment *seg, u32 pos, + struct native_item *item) +{ + struct scoutfs_segment_item *sitem = pos_ptr(seg, pos); + u32 packed; + + sitem->seq = cpu_to_le64(item->seq); + + packed = (item->key_off << SCOUTFS_SEGMENT_ITEM_OFF_SHIFT) | + (item->key_len & SCOUTFS_SEGMENT_ITEM_LEN_MASK); + sitem->key_off_len = cpu_to_le32(packed); + + packed = (item->val_off << SCOUTFS_SEGMENT_ITEM_OFF_SHIFT) | + (item->val_len & SCOUTFS_SEGMENT_ITEM_LEN_MASK); + sitem->val_off_len = cpu_to_le32(packed); } static void kvec_from_pages(struct scoutfs_segment *seg, @@ -313,19 +406,17 @@ int scoutfs_seg_item_kvecs(struct scoutfs_segment *seg, int pos, struct kvec *key, struct kvec *val) { struct scoutfs_segment_block *sblk = off_ptr(seg, 0); - struct scoutfs_segment_item *item; + struct native_item item; if (pos < 0 || pos >= le32_to_cpu(sblk->nr_items)) return -ENOENT; - item = pos_item(seg, pos); + load_item(seg, pos, &item); if (key) - kvec_from_pages(seg, key, le32_to_cpu(item->key_off), - le16_to_cpu(item->key_len)); + kvec_from_pages(seg, key, item.key_off, item.key_len); if (val) - kvec_from_pages(seg, val, le32_to_cpu(item->val_off), - le16_to_cpu(item->val_len)); + kvec_from_pages(seg, val, item.val_off, item.val_len); return 0; } @@ -365,6 +456,90 @@ int scoutfs_seg_find_pos(struct scoutfs_segment *seg, struct kvec *key) return find_key_pos(seg, key); } +/* + * Store the first item in the segment. The caller knows the number + * of items and bytes of keys that determine where the keys and values + * start. Future items are appended by looking at the last item. + * + * This should never fail because any item must always fit in a segment. + */ +void scoutfs_seg_first_item(struct super_block *sb, struct scoutfs_segment *seg, + struct kvec *key, struct kvec *val, + unsigned int nr_items, unsigned int key_bytes) +{ + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + struct native_item item; + SCOUTFS_DECLARE_KVEC(item_key); + SCOUTFS_DECLARE_KVEC(item_val); + u32 key_off; + u32 val_off; + + key_off = pos_off(seg, nr_items); + val_off = key_off + key_bytes; + + sblk->nr_items = cpu_to_le32(1); + + item.seq = 1; + item.key_off = key_off; + item.val_off = val_off; + item.key_len = scoutfs_kvec_length(key); + item.val_len = scoutfs_kvec_length(val); + store_item(seg, 0, &item); + + scoutfs_seg_item_kvecs(seg, 0, key, val); + scoutfs_kvec_memcpy(item_key, key); + scoutfs_kvec_memcpy(item_val, val); +} + +void scoutfs_seg_append_item(struct super_block *sb, + struct scoutfs_segment *seg, + struct kvec *key, struct kvec *val) +{ + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + struct native_item item; + struct native_item prev; + SCOUTFS_DECLARE_KVEC(item_key); + SCOUTFS_DECLARE_KVEC(item_val); + u32 nr; + + nr = le32_to_cpu(sblk->nr_items); + sblk->nr_items = cpu_to_le32(nr + 1); + + load_item(seg, nr - 1, &prev); + + item.seq = 1; + item.key_off = prev.key_off + prev.key_len; + item.key_len = scoutfs_kvec_length(key); + item.val_off = prev.val_off + prev.val_len; + item.val_len = scoutfs_kvec_length(val); + store_item(seg, 0, &item); + + scoutfs_seg_item_kvecs(seg, nr, key, val); + scoutfs_kvec_memcpy(item_key, key); + scoutfs_kvec_memcpy(item_val, val); +} + +/* + * Add a dirty manifest entry for the given segment at the given level. + */ +int scoutfs_seg_add_ment(struct super_block *sb, struct scoutfs_segment *seg, + u8 level) +{ + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + struct native_item item; + SCOUTFS_DECLARE_KVEC(first); + SCOUTFS_DECLARE_KVEC(last); + + load_item(seg, 0, &item); + kvec_from_pages(seg, first, item.key_off, item.key_len); + + load_item(seg, le32_to_cpu(sblk->nr_items) - 1, &item); + kvec_from_pages(seg, last, item.key_off, item.key_len); + + return scoutfs_manifest_add(sb, first, last, le64_to_cpu(sblk->segno), + le64_to_cpu(sblk->max_seq), level, true); +} + int scoutfs_seg_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -400,4 +575,3 @@ void scoutfs_seg_destroy(struct super_block *sb) kfree(cac); } } - diff --git a/kmod/src/seg.h b/kmod/src/seg.h index 1957a308..c5ae81d4 100644 --- a/kmod/src/seg.h +++ b/kmod/src/seg.h @@ -1,6 +1,7 @@ #ifndef _SCOUTFS_SEG_H_ #define _SCOUTFS_SEG_H_ +struct scoutfs_bio_completion; struct scoutfs_segment; struct kvec; @@ -14,6 +15,20 @@ int scoutfs_seg_item_kvecs(struct scoutfs_segment *seg, int pos, void scoutfs_seg_put(struct scoutfs_segment *seg); +int scoutfs_seg_alloc(struct super_block *sb, struct scoutfs_segment **seg_ret); +void scoutfs_seg_first_item(struct super_block *sb, struct scoutfs_segment *seg, + struct kvec *key, struct kvec *val, + unsigned int nr_items, unsigned int key_bytes); +void scoutfs_seg_append_item(struct super_block *sb, + struct scoutfs_segment *seg, + struct kvec *key, struct kvec *val); +int scoutfs_seg_add_ment(struct super_block *sb, struct scoutfs_segment *seg, + u8 level); + +int scoutfs_seg_submit_write(struct super_block *sb, + struct scoutfs_segment *seg, + struct scoutfs_bio_completion *comp); + int scoutfs_seg_setup(struct super_block *sb); void scoutfs_seg_destroy(struct super_block *sb); diff --git a/kmod/src/super.c b/kmod/src/super.c index 7866185a..5ce1a2b7 100644 --- a/kmod/src/super.c +++ b/kmod/src/super.c @@ -33,6 +33,7 @@ #include "manifest.h" #include "seg.h" #include "bio.h" +#include "alloc.h" #include "scoutfs_trace.h" static struct kset *scoutfs_kset; @@ -226,6 +227,8 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) scoutfs_seg_setup(sb) ?: scoutfs_manifest_setup(sb) ?: scoutfs_item_setup(sb) ?: + scoutfs_alloc_setup(sb) ?: + scoutfs_ring_setup(sb) ?: scoutfs_ring_read(sb) ?: // scoutfs_buddy_setup(sb) ?: scoutfs_setup_trans(sb); @@ -264,8 +267,10 @@ static void scoutfs_kill_sb(struct super_block *sb) if (sbi->block_shrinker.shrink == scoutfs_block_shrink) unregister_shrinker(&sbi->block_shrinker); scoutfs_item_destroy(sb); + scoutfs_alloc_destroy(sb); scoutfs_manifest_destroy(sb); scoutfs_seg_destroy(sb); + scoutfs_ring_destroy(sb); scoutfs_block_destroy(sb); scoutfs_destroy_counters(sb); if (sbi->kset) @@ -285,7 +290,6 @@ static struct file_system_type scoutfs_fs_type = { /* safe to call at any failure point in _init */ static void teardown_module(void) { - scoutfs_dir_exit(); scoutfs_inode_exit(); if (scoutfs_kset) kset_unregister(scoutfs_kset); @@ -302,7 +306,6 @@ static int __init scoutfs_module_init(void) return -ENOMEM; ret = scoutfs_inode_init() ?: - scoutfs_dir_init() ?: register_filesystem(&scoutfs_fs_type); if (ret) teardown_module(); diff --git a/kmod/src/super.h b/kmod/src/super.h index b1b20e97..bb803105 100644 --- a/kmod/src/super.h +++ b/kmod/src/super.h @@ -12,6 +12,7 @@ struct buddy_info; struct item_cache; struct manifest; struct segment_cache; +struct ring_info; struct scoutfs_sb_info { struct super_block *sb; @@ -34,6 +35,8 @@ struct scoutfs_sb_info { struct manifest *manifest; struct item_cache *item_cache; struct segment_cache *segment_cache; + struct seg_alloc *seg_alloc; + struct ring_info *ring_info; struct buddy_info *buddy_info; diff --git a/kmod/src/trans.c b/kmod/src/trans.c index b9108500..217ec29f 100644 --- a/kmod/src/trans.c +++ b/kmod/src/trans.c @@ -22,6 +22,12 @@ #include "trans.h" #include "buddy.h" #include "filerw.h" +#include "bio.h" +#include "item.h" +#include "manifest.h" +#include "seg.h" +#include "alloc.h" +#include "ring.h" #include "scoutfs_trace.h" /* @@ -74,37 +80,43 @@ void scoutfs_trans_write_func(struct work_struct *work) struct scoutfs_sb_info *sbi = container_of(work, struct scoutfs_sb_info, trans_write_work); struct super_block *sb = sbi->sb; + struct scoutfs_bio_completion comp; + struct scoutfs_segment *seg; bool advance = false; int ret = 0; - bool have_umount; - sbi->trans_task = current; + scoutfs_bio_init_comp(&comp); + sbi->trans_task = NULL; wait_event(sbi->trans_hold_wq, atomic_cmpxchg(&sbi->trans_holds, 0, -1) == 0); - if (scoutfs_block_has_dirty(sb)) { - /* XXX need writeback errors from inode address spaces? */ + /* XXX file data needs to be updated to the new item api */ +#if 0 + scoutfs_filerw_free_alloc(sb); +#endif - /* XXX definitely don't understand this */ - have_umount = down_read_trylock(&sb->s_umount); + /* + * We only have to check if there are dirty items or manifest + * entries. You can't have dirty alloc regions without having + * changed references to the allocated segments which produces + * dirty manfiest entries. + */ + if (scoutfs_item_dirty_bytes(sb) || scoutfs_manifest_has_dirty(sb)) { - sync_inodes_sb(sb); - - if (have_umount) - up_read(&sb->s_umount); - - scoutfs_filerw_free_alloc(sb); - - ret = scoutfs_buddy_apply_pending(sb, false) ?: - scoutfs_block_write_dirty(sb) ?: + ret = scoutfs_seg_alloc(sb, &seg) ?: + scoutfs_item_dirty_seg(sb, seg); + scoutfs_seg_add_ment(sb, seg, 0) ?: + scoutfs_manifest_dirty_ring(sb) ?: + scoutfs_alloc_dirty_ring(sb) ?: + scoutfs_ring_submit_write(sb, &comp) ?: + scoutfs_seg_submit_write(sb, seg, &comp) ?: + scoutfs_bio_wait_comp(sb, &comp) ?: scoutfs_write_dirty_super(sb); - if (ret) { - scoutfs_buddy_apply_pending(sb, true); - } else { - scoutfs_buddy_committed(sb); - advance = 1; - } + BUG_ON(ret); + + scoutfs_seg_put(seg); + advance = true; } spin_lock(&sbi->trans_write_lock); @@ -183,6 +195,10 @@ int scoutfs_file_fsync(struct file *file, loff_t start, loff_t end, return scoutfs_sync_fs(file->f_inode->i_sb, 1); } +/* + * The first holders race to try and allocate the segment that will be + * written by the next commit. + */ int scoutfs_hold_trans(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -195,21 +211,28 @@ int scoutfs_hold_trans(struct super_block *sb) } /* - * As we release we ask the allocator how many blocks have been - * allocated since the last transaction was successfully committed. If - * it's large enough we kick off a write. This is mostly to reduce the - * commit latency. We also don't want to let the IO pipeline sit idle. - * Once we have enough blocks to write efficiently we should do so. + * As we release we kick off a commit if we have a segment's worth of + * dirty items. + * + * Right now it's conservatively kicking off writes at ~95% full blocks. + * This leaves a lot of slop for the largest item bytes created by a + * holder and overrun by concurrent holders (who aren't accounted + * today). + * + * It should more precisely know the worst case item byte consumption of + * holders and only kick off a write when someone tries to hold who + * might fill the segment. */ void scoutfs_release_trans(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + unsigned int target = (SCOUTFS_SEGMENT_SIZE * 95 / 100); if (current == sbi->trans_task) return; if (atomic_sub_return(1, &sbi->trans_holds) == 0) { - if (scoutfs_buddy_alloc_count(sb) >= SCOUTFS_MAX_TRANS_BLOCKS) + if (scoutfs_item_dirty_bytes(sb) >= target) scoutfs_sync_fs(sb, 0); wake_up(&sbi->trans_hold_wq);