diff --git a/kmod/src/Makefile b/kmod/src/Makefile index 143929b7..d808143b 100644 --- a/kmod/src/Makefile +++ b/kmod/src/Makefile @@ -2,6 +2,6 @@ obj-$(CONFIG_SCOUTFS_FS) := scoutfs.o CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include -scoutfs-y += block.o btree.o buddy.o counters.o crc.o dir.o filerw.o \ - inode.o ioctl.o msg.o name.o scoutfs_trace.o super.o trans.o \ - xattr.o +scoutfs-y += bio.o block.o btree.o buddy.o counters.o crc.o dir.o filerw.o \ + kvec.o inode.o ioctl.o item.o manifest.o msg.o name.o ring.o \ + seg.o scoutfs_trace.o super.o trans.o xattr.o diff --git a/kmod/src/bio.c b/kmod/src/bio.c new file mode 100644 index 00000000..d58eebe9 --- /dev/null +++ b/kmod/src/bio.c @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "bio.h" + +struct bio_end_io_args { + struct super_block *sb; + atomic_t bytes_in_flight; + int err; + scoutfs_bio_end_io_t end_io; + void *data; +}; + +static void dec_end_io(struct bio_end_io_args *args, size_t bytes, int err) +{ + if (err && !args->err) + args->err = err; + + if (atomic_sub_return(bytes, &args->bytes_in_flight) == 0) { + args->end_io(args->sb, args->data, args->err); + kfree(args); + } +} + +static void bio_end_io(struct bio *bio, int err) +{ + struct bio_end_io_args *args = bio->bi_private; + + dec_end_io(args, bio->bi_size, err); + bio_put(bio); +} + +/* + * Read or write the given number of 4k blocks from the front of the + * pages provided by the caller. We translate the block count into a + * page count and fill bios a page at a time. + * + * The caller is responsible for ensuring that the pages aren't freed + * while bios are in flight. + * + * The end_io function is always called once with the error result of + * the IO. It can be called before _submit returns. + */ +void scoutfs_bio_submit(struct super_block *sb, int rw, struct page **pages, + u64 blkno, unsigned int nr_blocks, + scoutfs_bio_end_io_t end_io, void *data) +{ + unsigned int nr_pages = DIV_ROUND_UP(nr_blocks, + SCOUTFS_BLOCKS_PER_PAGE); + struct bio_end_io_args *args; + struct blk_plug plug; + unsigned int bytes; + struct page *page; + struct bio *bio = NULL; + int ret = 0; + int i; + + args = kmalloc(sizeof(struct bio_end_io_args), GFP_NOFS); + if (!args) { + end_io(sb, data, -ENOMEM); + return; + } + + args->sb = sb; + atomic_set(&args->bytes_in_flight, 1); + args->err = 0; + args->end_io = end_io; + args->data = data; + + blk_start_plug(&plug); + + for (i = 0; i < nr_pages; i++) { + page = pages[i]; + + if (!bio) { + bio = bio_alloc(GFP_NOFS, nr_pages - i); + if (!bio) + bio = bio_alloc(GFP_NOFS, 1); + if (!bio) { + ret = -ENOMEM; + break; + } + + bio->bi_sector = blkno << (SCOUTFS_BLOCK_SHIFT - 9); + bio->bi_bdev = sb->s_bdev; + bio->bi_end_io = bio_end_io; + bio->bi_private = args; + } + + bytes = min_t(int, nr_blocks << SCOUTFS_BLOCK_SHIFT, PAGE_SIZE); + + if (bio_add_page(bio, page, bytes, 0) != bytes) { + /* submit the full bio and retry this page */ + atomic_add(bio->bi_size, &args->bytes_in_flight); + submit_bio(rw, bio); + bio = NULL; + i--; + continue; + } + + blkno += SCOUTFS_BLOCKS_PER_PAGE; + nr_blocks -= SCOUTFS_BLOCKS_PER_PAGE; + } + + if (bio) { + atomic_add(bio->bi_size, &args->bytes_in_flight); + submit_bio(rw, bio); + } + + blk_finish_plug(&plug); + dec_end_io(args, 1, ret); +} + +struct end_io_completion { + struct completion comp; + int err; +}; + +static void end_io_complete(struct super_block *sb, void *data, int err) +{ + struct end_io_completion *comp = data; + + comp->err = err; + complete(&comp->comp); +} + +/* + * A synchronous read of the given blocks. + * + * XXX we could make this interruptible. + */ +int scoutfs_bio_read(struct super_block *sb, struct page **pages, + u64 blkno, unsigned int nr_blocks) +{ + struct end_io_completion comp = { + .comp = COMPLETION_INITIALIZER(comp.comp), + }; + + scoutfs_bio_submit(sb, READ, pages, blkno, nr_blocks, + end_io_complete, &comp); + wait_for_completion(&comp.comp); + return comp.err; +} + +/* return pointer to the blk 4k block offset amongst the pages */ +void *scoutfs_page_block_address(struct page **pages, unsigned int blk) +{ + unsigned int i = blk / SCOUTFS_BLOCKS_PER_PAGE; + unsigned int off = (blk % SCOUTFS_BLOCKS_PER_PAGE) << + SCOUTFS_BLOCK_SHIFT; + + return page_address(pages[i]) + off; +} diff --git a/kmod/src/bio.h b/kmod/src/bio.h new file mode 100644 index 00000000..094f6038 --- /dev/null +++ b/kmod/src/bio.h @@ -0,0 +1,23 @@ +#ifndef _SCOUTFS_BIO_H_ +#define _SCOUTFS_BIO_H_ + +/* + * Our little block IO wrapper is just a convenience wrapper that takes + * our block size units and handles tracks multiple bios per larger io. + * + * If bios could hold an unlimited number of pages instead of + * BIO_MAX_PAGES then this would just use a single bio directly. + */ + +typedef void (*scoutfs_bio_end_io_t)(struct super_block *sb, void *data, + int err); + +void scoutfs_bio_submit(struct super_block *sb, int rw, struct page **pages, + u64 blkno, unsigned int nr_blocks, + scoutfs_bio_end_io_t end_io, void *data); +int scoutfs_bio_read(struct super_block *sb, struct page **pages, + u64 blkno, unsigned int nr_blocks); + +void *scoutfs_page_block_address(struct page **pages, unsigned int blk); + +#endif diff --git a/kmod/src/format.h b/kmod/src/format.h index 8c7bb7a1..d0efbae4 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -6,9 +6,23 @@ /* super block id */ #define SCOUTFS_SUPER_ID 0x2e736674756f6373ULL /* "scoutfs." */ +/* + * The super block and ring blocks are fixed 4k. + */ #define SCOUTFS_BLOCK_SHIFT 12 #define SCOUTFS_BLOCK_SIZE (1 << SCOUTFS_BLOCK_SHIFT) #define SCOUTFS_BLOCK_MASK (SCOUTFS_BLOCK_SIZE - 1) +#define SCOUTFS_BLOCKS_PER_PAGE (PAGE_SIZE / SCOUTFS_BLOCK_SIZE) + +/* + * FS data is stored in segments, for now they're fixed size. They'll + * be dynamic. + */ +#define SCOUTFS_SEGMENT_SHIFT 20 +#define SCOUTFS_SEGMENT_SIZE (1 << SCOUTFS_SEGMENT_SHIFT) +#define SCOUTFS_SEGMENT_MASK (SCOUTFS_SEGMENT_SIZE - 1) +#define SCOUTFS_SEGMENT_PAGES (SCOUTFS_SEGMENT_SIZE / PAGE_SIZE) +#define SCOUTFS_SEGMENT_BLOCKS (SCOUTFS_SEGMENT_SIZE / BLOCK_SIZE) #define SCOUTFS_PAGES_PER_BLOCK (SCOUTFS_BLOCK_SIZE / PAGE_SIZE) #define SCOUTFS_BLOCK_PAGE_ORDER (SCOUTFS_BLOCK_SHIFT - PAGE_SHIFT) @@ -37,6 +51,67 @@ struct scoutfs_block_header { __le64 blkno; } __packed; +struct scoutfs_ring_entry_header { + __u8 type; + __le16 len; +} __packed; + +#define SCOUTFS_RING_ADD_MANIFEST 1 + +struct scoutfs_ring_add_manifest { + struct scoutfs_ring_entry_header eh; + __le64 segno; + __le64 seq; + __le16 first_key_len; + __le16 last_key_len; + __u8 level; + /* first and last key bytes */ +} __packed; + +/* + * This is absurdly huge. If there was only ever 1 item per segment and + * 2^64 items the tree could get this deep. + */ +#define SCOUTFS_MANIFEST_MAX_LEVEL 20 + +struct scoutfs_ring_block { + struct scoutfs_block_header hdr; + __le32 nr_entries; + struct scoutfs_ring_entry_header entries[0]; +} __packed; + +struct scoutfs_segment_item { + __le64 seq; + __le32 key_off; + __le32 val_off; + __le16 key_len; + __le16 val_len; +} __packed; + +/* + * Each large segment starts with a segment block that describes the + * rest of the blocks that make up the segment. + */ +struct scoutfs_segment_block { + __le32 crc; + __le32 _padding; + __le64 segno; + __le64 max_seq; + __le32 nr_items; + /* item array with gaps so they don't cross 4k blocks */ + /* packed keys */ + /* packed vals */ +} __packed; + +/* the first block in the segment has the header and items */ +#define SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS \ + ((SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_segment_block)) / \ + sizeof(struct scoutfs_segment_item)) + +/* the rest of the header blocks are full of items */ +#define SCOUTFS_SEGMENT_ITEMS_PER_BLOCK \ + (SCOUTFS_BLOCK_SIZE / sizeof(struct scoutfs_segment_item)) + /* * Block references include the sequence number so that we can detect * readers racing with writers and so that we can tell that we don't @@ -118,6 +193,11 @@ struct scoutfs_key { #define SCOUTFS_MAX_ITEM_LEN 512 +struct scoutfs_inode_key { + __u8 type; + __be64 ino; +} __packed; + struct scoutfs_btree_root { u8 height; struct scoutfs_block_ref ref; @@ -180,6 +260,11 @@ struct scoutfs_btree_item { #define SCOUTFS_UUID_BYTES 16 +/* + * The ring fields describe the statically allocated ring log. The + * head and tail indexes are logical 4k blocks offsets inside the ring. + * The head block should contain the seq. + */ struct scoutfs_super_block { struct scoutfs_block_header hdr; __le64 id; @@ -187,6 +272,11 @@ struct scoutfs_super_block { __le64 next_ino; __le64 total_blocks; __le64 free_blocks; + __le64 ring_blkno; + __le64 ring_blocks; + __le64 ring_head_index; + __le64 ring_tail_index; + __le64 ring_head_seq; __le64 buddy_blocks; struct scoutfs_buddy_root buddy_root; struct scoutfs_btree_root btree_root; diff --git a/kmod/src/inode.c b/kmod/src/inode.c index f990dcf9..0ccfe006 100644 --- a/kmod/src/inode.c +++ b/kmod/src/inode.c @@ -29,6 +29,8 @@ #include "trans.h" #include "btree.h" #include "msg.h" +#include "kvec.h" +#include "item.h" /* * XXX @@ -126,25 +128,28 @@ static void load_inode(struct inode *inode, struct scoutfs_inode *cinode) ci->data_version = le64_to_cpu(cinode->data_version); } +static void set_inode_key(struct scoutfs_inode_key *ikey, u64 ino) +{ + ikey->type = SCOUTFS_INODE_KEY; + ikey->ino = cpu_to_be64(ino); +} + static int scoutfs_read_locked_inode(struct inode *inode) { struct super_block *sb = inode->i_sb; - struct scoutfs_btree_root *meta = SCOUTFS_META(sb); - struct scoutfs_btree_val val; + struct scoutfs_inode_key ikey; struct scoutfs_inode sinode; - struct scoutfs_key key; + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); int ret; - scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0); - scoutfs_btree_init_val(&val, &sinode, sizeof(sinode)); + set_inode_key(&ikey, scoutfs_ino(inode)); + scoutfs_kvec_init(key, &ikey, sizeof(ikey)); + scoutfs_kvec_init(val, &sinode, sizeof(sinode)); - ret = scoutfs_btree_lookup(sb, meta, &key, &val); - if (ret == sizeof(sinode)) { + ret = scoutfs_item_lookup_exact(sb, key, val, sizeof(sinode)); + if (ret == 0) load_inode(inode, &sinode); - ret = 0; - } else if (ret >= 0) { - ret = -EIO; - } return ret; } diff --git a/kmod/src/inode.h b/kmod/src/inode.h index f0f74024..0d48f158 100644 --- a/kmod/src/inode.h +++ b/kmod/src/inode.h @@ -49,4 +49,7 @@ u64 scoutfs_last_ino(struct super_block *sb); void scoutfs_inode_exit(void); int scoutfs_inode_init(void); +int scoutfs_item_setup(struct super_block *sb); +void scoutfs_item_destroy(struct super_block *sb); + #endif diff --git a/kmod/src/item.c b/kmod/src/item.c new file mode 100644 index 00000000..73e9665b --- /dev/null +++ b/kmod/src/item.c @@ -0,0 +1,217 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "kvec.h" +#include "manifest.h" +#include "item.h" + +struct item_cache { + spinlock_t lock; + struct rb_root root; +}; + +struct cached_item { + struct rb_node node; + + SCOUTFS_DECLARE_KVEC(key); + SCOUTFS_DECLARE_KVEC(val); +}; + +static struct cached_item *find_item(struct rb_root *root, struct kvec *key) +{ + struct rb_node *node = root->rb_node; + struct rb_node *parent = NULL; + struct cached_item *item; + int cmp; + + while (node) { + parent = node; + item = container_of(node, struct cached_item, node); + + cmp = scoutfs_kvec_memcmp(key, item->key); + if (cmp < 0) + node = node->rb_left; + else if (cmp > 0) + node = node->rb_right; + else + return item; + } + + return NULL; +} + +static struct cached_item *insert_item(struct rb_root *root, + struct cached_item *ins) +{ + struct rb_node **node = &root->rb_node; + struct rb_node *parent = NULL; + struct cached_item *found = NULL; + struct cached_item *item; + int cmp; + + while (*node) { + parent = *node; + item = container_of(*node, struct cached_item, node); + + cmp = scoutfs_kvec_memcmp(ins->key, item->key); + if (cmp < 0) { + node = &(*node)->rb_left; + } else if (cmp > 0) { + node = &(*node)->rb_right; + } else { + rb_replace_node(&item->node, &ins->node, root); + found = item; + break; + } + } + + if (!found) { + rb_link_node(&ins->node, parent, node); + rb_insert_color(&ins->node, root); + } + + return found; +} + +/* + * Find an item with the given key and copy its value into the caller's + * value vector. The amount of bytes copied is returned which can be + * 0 or truncated if the caller's buffer isn't big enough. + */ +int scoutfs_item_lookup(struct super_block *sb, struct kvec *key, + struct kvec *val) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *item; + unsigned long flags; + int ret; + + do { + spin_lock_irqsave(&cac->lock, flags); + + item = find_item(&cac->root, key); + if (item) + ret = scoutfs_kvec_memcpy(val, item->val); + else + ret = -ENOENT; + + spin_unlock_irqrestore(&cac->lock, flags); + + } while (!item && ((ret = scoutfs_manifest_read_items(sb, key)) == 0)); + + return ret; +} + +/* + * This requires that the item at the specified key has a value of the + * same length as the specified value. Callers are asserting that + * mismatched size are corruption so it returns -EIO if the sizes don't + * match. This isn't the fast path so we don't mind the copying + * overhead that comes from only detecting the size mismatch after the + * copy by reusing the more permissive _lookup(). + */ +int scoutfs_item_lookup_exact(struct super_block *sb, struct kvec *key, + struct kvec *val, int size) +{ + int ret; + + ret = scoutfs_item_lookup(sb, key, val); + if (ret >= 0 && ret != size) + ret = -EIO; + + return ret; +} + +static void free_item(struct cached_item *item) +{ + if (!IS_ERR_OR_NULL(item)) { + scoutfs_kvec_kfree(item->val); + scoutfs_kvec_kfree(item->key); + kfree(item); + } +} + +/* + * Add an item with the key and value to the item cache. The new item + * is clean. Any existing item at the key will be removed and freed. + */ +int scoutfs_item_insert(struct super_block *sb, struct kvec *key, + struct kvec *val) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct cached_item *found; + struct cached_item *item; + unsigned long flags; + int ret; + + item = kmalloc(sizeof(struct cached_item), GFP_NOFS); + if (!item) + return -ENOMEM; + + ret = scoutfs_kvec_dup_flatten(item->key, key) ?: + scoutfs_kvec_dup_flatten(item->val, val); + if (ret) { + free_item(item); + return ret; + } + + spin_lock_irqsave(&cac->lock, flags); + found = insert_item(&cac->root, item); + spin_unlock_irqrestore(&cac->lock, flags); + free_item(found); + + return 0; +} + +int scoutfs_item_setup(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac; + + cac = kzalloc(sizeof(struct item_cache), GFP_KERNEL); + if (!cac) + return -ENOMEM; + sbi->item_cache = cac; + + spin_lock_init(&cac->lock); + cac->root = RB_ROOT; + + return 0; +} + +void scoutfs_item_destroy(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct item_cache *cac = sbi->item_cache; + struct rb_node *node; + struct cached_item *item; + + if (cac) { + for (node = rb_first(&cac->root); node; ) { + item = container_of(node, struct cached_item, node); + node = rb_next(node); + free_item(item); + } + + kfree(cac); + } + +} diff --git a/kmod/src/item.h b/kmod/src/item.h new file mode 100644 index 00000000..bfaae9db --- /dev/null +++ b/kmod/src/item.h @@ -0,0 +1,16 @@ +#ifndef _SCOUTFS_ITEM_H_ +#define _SCOUTFS_ITEM_H_ + +#include + +int scoutfs_item_lookup(struct super_block *sb, struct kvec *key, + struct kvec *val); +int scoutfs_item_lookup_exact(struct super_block *sb, struct kvec *key, + struct kvec *val, int size); +int scoutfs_item_insert(struct super_block *sb, struct kvec *key, + struct kvec *val); + +int scoutfs_item_setup(struct super_block *sb); +void scoutfs_item_destroy(struct super_block *sb); + +#endif diff --git a/kmod/src/kvec.c b/kmod/src/kvec.c new file mode 100644 index 00000000..e2b26061 --- /dev/null +++ b/kmod/src/kvec.c @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "inode.h" +#include "dir.h" +#include "xattr.h" +#include "msg.h" +#include "block.h" +#include "counters.h" +#include "trans.h" +#include "buddy.h" +#include "kvec.h" +#include "scoutfs_trace.h" + +/* + * Return the result of memcmp between the min of the two total lengths. + * If their shorter lengths are equal than the shorter length is considered + * smaller than the longer. + */ +int scoutfs_kvec_memcmp(struct kvec *a, struct kvec *b) +{ + int b_off = 0; + int a_off = 0; + int len; + int ret; + + while (a->iov_base && b->iov_base) { + len = min(a->iov_len - a_off, b->iov_len - b_off); + ret = memcmp(a->iov_base + a_off, b->iov_base + b_off, len); + if (ret) + return ret; + + b_off += len; + if (b_off == b->iov_len) + b++; + a_off += len; + if (a_off == a->iov_len) + a++; + } + + return a->iov_base ? 1 : b->iov_base ? -1 : 0; +} + +/* + * Returns 0 if [a,b] overlaps with [c,d]. Returns -1 if a < c and + * 1 if b > d. + */ +int scoutfs_kvec_cmp_overlap(struct kvec *a, struct kvec *b, + struct kvec *c, struct kvec *d) +{ + return scoutfs_kvec_memcmp(a, c) < 0 ? -1 : + scoutfs_kvec_memcmp(b, d) > 0 ? 1 : 0; +} + +/* + * Set just the pointers and length fields in the dst vector to point to + * the source vector. + */ +void scoutfs_kvec_clone(struct kvec *dst, struct kvec *src) +{ + int i; + + for (i = 0; i < SCOUTFS_KVEC_NR; i++) + *(dst++) = *(src++); +} + +/* + * Copy as much of src as fits in dst. Null base pointers termintae the + * copy. The number of bytes copied is returned. Only the buffers + * pointed to by dst are changed, the kvec elements are not changed. + */ +int scoutfs_kvec_memcpy(struct kvec *dst, struct kvec *src) +{ + int src_off = 0; + int dst_off = 0; + int copied = 0; + int len; + + while (dst->iov_base && src->iov_base) { + len = min(dst->iov_len - dst_off, src->iov_len - src_off); + memcpy(dst->iov_base + dst_off, src->iov_base + src_off, len); + + copied += len; + + src_off += len; + if (src_off == src->iov_len) + src++; + dst_off += len; + if (dst_off == dst->iov_len) + dst++; + } + + return copied; +} + +/* + * Copy the src key vector into one new allocation in the dst. The existing + * dst is clobbered. The source isn't changed. + */ +int scoutfs_kvec_dup_flatten(struct kvec *dst, struct kvec *src) +{ + void *ptr; + size_t len = scoutfs_kvec_length(src); + + ptr = kmalloc(len, GFP_NOFS); + if (!ptr) + return -ENOMEM; + + scoutfs_kvec_init(dst, ptr, len); + scoutfs_kvec_memcpy(dst, src); + return 0; +} + +/* + * Free all the set pointers in the kvec. The pointer values aren't modified + * if they're freed. + */ +void scoutfs_kvec_kfree(struct kvec *kvec) +{ + while (kvec->iov_base) + kfree((kvec++)->iov_base); +} diff --git a/kmod/src/kvec.h b/kmod/src/kvec.h new file mode 100644 index 00000000..600055e9 --- /dev/null +++ b/kmod/src/kvec.h @@ -0,0 +1,67 @@ +#ifndef _SCOUTFS_KVEC_H_ +#define _SCOUTFS_KVEC_H_ + +#include + +/* + * The item APIs use kvecs to represent variable size item keys and + * values. + */ + +/* + * This ends up defining the max item size as nr - 1 * page _size. + */ +#define SCOUTFS_KVEC_NR 4 + +#define SCOUTFS_DECLARE_KVEC(name) \ + struct kvec name[SCOUTFS_KVEC_NR] + +static inline void scoutfs_kvec_init_all(struct kvec *kvec, + void *ptr0, size_t len0, + void *ptr1, size_t len1, + void *ptr2, size_t len2, + void *ptr3, size_t len3, ...) +{ + BUG_ON(ptr3 != NULL); + + kvec[0].iov_base = ptr0; + kvec[0].iov_len = len0; + kvec[1].iov_base = ptr1; + kvec[1].iov_len = len1; + kvec[2].iov_base = ptr2; + kvec[2].iov_len = len2; + kvec[3].iov_base = ptr3; + kvec[3].iov_len = len3; +} + +/* + * Provide a nice variadic initialization function without having to + * iterate over the callers arg types. We play some macro games to pad + * out the callers ptr/len pairs to the full possible number. This will + * produce confusing errors if an odd number of arguments is given and + * the padded ptr/length types aren't compatible with the fixed + * arguments in the static inline. + */ +#define scoutfs_kvec_init(val, ...) \ + scoutfs_kvec_init_all(val, __VA_ARGS__, NULL, 0, NULL, 0, NULL, 0) + +static inline int scoutfs_kvec_length(struct kvec *kvec) +{ + BUILD_BUG_ON(sizeof(struct kvec) != sizeof(struct iovec)); + BUILD_BUG_ON(offsetof(struct kvec, iov_len) != + offsetof(struct iovec, iov_len)); + BUILD_BUG_ON(member_sizeof(struct kvec, iov_len) != + member_sizeof(struct iovec, iov_len)); + + return iov_length((struct iovec *)kvec, SCOUTFS_KVEC_NR); +} + +void scoutfs_kvec_clone(struct kvec *dst, struct kvec *src); +int scoutfs_kvec_memcmp(struct kvec *a, struct kvec *b); +int scoutfs_kvec_cmp_overlap(struct kvec *a, struct kvec *b, + struct kvec *c, struct kvec *d); +int scoutfs_kvec_memcpy(struct kvec *dst, struct kvec *src); +int scoutfs_kvec_dup_flatten(struct kvec *dst, struct kvec *src); +void scoutfs_kvec_kfree(struct kvec *kvec); + +#endif diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c new file mode 100644 index 00000000..a7ea9dc0 --- /dev/null +++ b/kmod/src/manifest.c @@ -0,0 +1,449 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "kvec.h" +#include "seg.h" +#include "item.h" +#include "manifest.h" + +struct manifest { + spinlock_t lock; + + struct list_head level0_list; + unsigned int level0_nr; + + u8 last_level; + struct rb_root level_roots[SCOUTFS_MANIFEST_MAX_LEVEL + 1]; +}; + +#define DECLARE_MANIFEST(sb, name) \ + struct manifest *name = SCOUTFS_SB(sb)->manifest + +struct manifest_entry { + union { + struct list_head level0_entry; + struct rb_node node; + }; + + struct kvec *first; + struct kvec *last; + u64 segno; + u64 seq; + u8 level; +}; + +/* + * A path tracks all the segments from level 0 to the last level that + * overlap with the search key. + */ +struct manifest_ref { + u64 segno; + u64 seq; + struct scoutfs_segment *seg; + int pos; + u8 level; +}; + +static struct manifest_entry *find_ment(struct rb_root *root, struct kvec *key) +{ + struct rb_node *node = root->rb_node; + struct manifest_entry *ment; + int cmp; + + while (node) { + ment = container_of(node, struct manifest_entry, node); + + cmp = scoutfs_kvec_cmp_overlap(key, key, + ment->first, ment->last); + if (cmp < 0) + node = node->rb_left; + else if (cmp > 0) + node = node->rb_right; + else + return ment; + } + + return NULL; +} + +/* + * Insert a new entry into one of the L1+ trees. There should never be + * entries that overlap. + */ +static int insert_ment(struct rb_root *root, struct manifest_entry *ins) +{ + struct rb_node **node = &root->rb_node; + struct rb_node *parent = NULL; + struct manifest_entry *ment; + int cmp; + + while (*node) { + parent = *node; + ment = container_of(*node, struct manifest_entry, node); + + cmp = scoutfs_kvec_cmp_overlap(ins->first, ins->last, + ment->first, ment->last); + if (cmp < 0) { + node = &(*node)->rb_left; + } else if (cmp > 0) { + node = &(*node)->rb_right; + } else { + return -EEXIST; + } + } + + rb_link_node(&ins->node, parent, node); + rb_insert_color(&ins->node, root); + + return 0; +} + +static void free_ment(struct manifest_entry *ment) +{ + if (!IS_ERR_OR_NULL(ment)) { + scoutfs_kvec_kfree(ment->first); + scoutfs_kvec_kfree(ment->last); + kfree(ment); + } +} + +static int add_ment(struct manifest *mani, struct manifest_entry *ment) +{ + int ret; + + if (ment->level) { + ret = insert_ment(&mani->level_roots[ment->level], ment); + if (!ret) + mani->last_level = max(mani->last_level, ment->level); + } else { + list_add_tail(&ment->level0_entry, &mani->level0_list); + mani->level0_nr++; + ret = 0; + } + + return ret; +} + +static void update_last_level(struct manifest *mani) +{ + int i; + + for (i = mani->last_level; + i > 0 && RB_EMPTY_ROOT(&mani->level_roots[i]); i--) + ; + + mani->last_level = i; +} + +static void remove_ment(struct manifest *mani, struct manifest_entry *ment) +{ + if (ment->level) { + rb_erase(&ment->node, &mani->level_roots[ment->level]); + update_last_level(mani); + } else { + list_del_init(&ment->level0_entry); + mani->level0_nr--; + } +} + +int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, + struct kvec *last, u64 segno, u64 seq, u8 level) +{ + DECLARE_MANIFEST(sb, mani); + struct manifest_entry *ment; + unsigned long flags; + int ret; + + ment = kmalloc(sizeof(struct manifest_entry), GFP_NOFS); + if (!ment) + return -ENOMEM; + + ret = scoutfs_kvec_dup_flatten(ment->first, first) ?: + scoutfs_kvec_dup_flatten(ment->first, last); + if (ret) { + free_ment(ment); + return -ENOMEM; + } + + ment->segno = segno; + ment->seq = seq; + ment->level = level; + + /* XXX think about where to insert level 0 */ + spin_lock_irqsave(&mani->lock, flags); + ret = add_ment(mani, ment); + spin_unlock_irqrestore(&mani->lock, flags); + if (WARN_ON_ONCE(ret)) /* XXX can this happen? ring corruption? */ + free_ment(ment); + + return ret; +} + +static void set_ref(struct manifest_ref *ref, struct manifest_entry *mani) +{ + ref->segno = mani->segno; + ref->seq = mani->seq; + ref->level = mani->level; +} + +/* + * Returns refs if intersecting segments are found, NULL if none intersect, + * and PTR_ERR on failure. + */ +static struct manifest_ref *get_key_refs(struct manifest *mani, + struct kvec *key, + unsigned int *nr_ret) +{ + struct manifest_ref *refs = NULL; + struct manifest_entry *ment; + struct rb_root *root; + unsigned long flags; + unsigned int total; + unsigned int nr; + int i; + + spin_lock_irqsave(&mani->lock, flags); + + total = mani->level0_nr + mani->last_level; + while (nr != total) { + nr = total; + spin_unlock_irqrestore(&mani->lock, flags); + + kfree(refs); + refs = kcalloc(total, sizeof(struct manifest_ref), GFP_NOFS); + if (!refs) + return ERR_PTR(-ENOMEM); + + spin_lock_irqsave(&mani->lock, flags); + } + + nr = 0; + + list_for_each_entry(ment, &mani->level0_list, level0_entry) { + if (scoutfs_kvec_cmp_overlap(key, key, + ment->first, ment->last)) + continue; + + set_ref(&refs[nr++], ment); + } + + for (i = 1; i <= mani->last_level; i++) { + root = &mani->level_roots[i]; + if (RB_EMPTY_ROOT(root)) + continue; + + ment = find_ment(root, key); + if (ment) + set_ref(&refs[nr++], ment); + } + + spin_unlock_irqrestore(&mani->lock, flags); + + *nr_ret = nr; + if (!nr) { + kfree(refs); + refs = NULL; + } + + return refs; +} + +/* + * The caller didn't find an item for the given key in the item cache + * and wants us to search for it in the lsm segments. We search the + * manifest for all the segments that contain the key. We then read the + * segments and iterate over their items looking for ours. We insert it + * and some number of other surrounding items to amortize the relatively + * expensive multi-segment searches. + * + * This is asking the seg code to read each entire segment. The seg + * code could give it it helpers to submit and wait on blocks within the + * segment so that we don't have wild bandwidth amplification in the + * cold random read case. + * + * The segments are immutable at this point so we can use their contents + * as long as we hold refs. + */ +int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key) +{ + DECLARE_MANIFEST(sb, mani); + SCOUTFS_DECLARE_KVEC(item_key); + SCOUTFS_DECLARE_KVEC(item_val); + SCOUTFS_DECLARE_KVEC(found_key); + SCOUTFS_DECLARE_KVEC(found_val); + struct scoutfs_segment *seg; + struct manifest_ref *refs; + unsigned long had_found; + bool found; + int ret = 0; + int err; + int nr_refs; + int cmp; + int i; + int n; + + refs = get_key_refs(mani, key, &nr_refs); + if (IS_ERR(refs)) + return PTR_ERR(refs); + if (!refs) + return -ENOENT; + + /* submit reads for all the segments */ + for (i = 0; i < nr_refs; i++) { + seg = scoutfs_seg_submit_read(sb, refs[i].segno); + if (IS_ERR(seg)) { + ret = PTR_ERR(seg); + break; + } + + refs[i].seg = seg; + } + + /* wait for submitted segments and search if we haven't seen failure */ + for (n = 0; n < i; n++) { + seg = refs[i].seg; + + err = scoutfs_seg_wait(sb, seg); + if (err && !ret) + ret = err; + + if (!ret) + refs[i].pos = scoutfs_seg_find_pos(seg, key); + } + + /* done if we saw errors */ + if (ret) + goto out; + + /* walk sorted items, resolving across segments, and insert */ + for (n = 0; n < 16; n++) { + + found = false; + + /* find the most recent least key */ + for (i = 0; i < nr_refs; i++) { + seg = refs[i].seg; + if (!seg) + continue; + + /* get kvecs, removing if we ran out of items */ + ret = scoutfs_seg_item_kvecs(seg, refs[i].pos, + item_key, item_val); + if (ret < 0) { + scoutfs_seg_put(seg); + refs[i].seg = NULL; + continue; + } + + if (found) { + cmp = scoutfs_kvec_memcmp(item_key, found_key); + if (cmp >= 0) { + if (cmp == 0) + set_bit(i, &had_found); + continue; + } + } + + /* remember new least key */ + scoutfs_kvec_clone(found_key, key); + scoutfs_kvec_clone(found_val, item_val); + found = true; + had_found = 0; + set_bit(i, &had_found); + } + + /* return -ENOENT if we didn't find any or the callers item */ + if (n == 0 && + (!found || scoutfs_kvec_memcmp(key, found_key))) { + ret = -ENOENT; + break; + } + + if (!found) { + ret = 0; + break; + } + + ret = scoutfs_item_insert(sb, item_key, item_val); + if (ret) + break; + + /* advance all the positions past the found key */ + for_each_set_bit(i, &had_found, BITS_PER_LONG) + refs[i].pos++; + } + +out: + for (i = 0; i < nr_refs; i++) + scoutfs_seg_put(refs[i].seg); + + kfree(refs); + return ret; +} + +int scoutfs_manifest_setup(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct manifest *mani; + int i; + + mani = kzalloc(sizeof(struct manifest), GFP_KERNEL); + if (!mani) + return -ENOMEM; + sbi->manifest = mani; + + spin_lock_init(&mani->lock); + INIT_LIST_HEAD(&mani->level0_list); + for (i = 0; i < ARRAY_SIZE(mani->level_roots); i++) + mani->level_roots[i] = RB_ROOT; + + return 0; +} + +void scoutfs_manifest_destroy(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct manifest *mani = sbi->manifest; + struct manifest_entry *ment; + struct manifest_entry *tmp; + struct rb_node *node; + struct rb_root *root; + int i; + + if (!mani) + return; + + for (i = 1; i <= mani->last_level; i++) { + root = &mani->level_roots[i]; + + for (node = rb_first(root); node; ) { + ment = container_of(node, struct manifest_entry, node); + node = rb_next(node); + remove_ment(mani, ment); + free_ment(ment); + } + } + + list_for_each_entry_safe(ment, tmp, &mani->level0_list, level0_entry) { + remove_ment(mani, ment); + free_ment(ment); + } + + kfree(mani); +} diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h new file mode 100644 index 00000000..c1ea0160 --- /dev/null +++ b/kmod/src/manifest.h @@ -0,0 +1,11 @@ +#ifndef _SCOUTFS_MANIFEST_H_ +#define _SCOUTFS_MANIFEST_H_ + +int scoutfs_manifest_add(struct super_block *sb, struct kvec *first, + struct kvec *last, u64 segno, u64 seq, u8 level); +int scoutfs_manifest_read_items(struct super_block *sb, struct kvec *key); + +int scoutfs_manifest_setup(struct super_block *sb); +void scoutfs_manifest_destroy(struct super_block *sb); + +#endif diff --git a/kmod/src/ring.c b/kmod/src/ring.c new file mode 100644 index 00000000..865071aa --- /dev/null +++ b/kmod/src/ring.c @@ -0,0 +1,263 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "kvec.h" +#include "bio.h" +#include "manifest.h" +#include "ring.h" + +/* + * OK, log: + * - big preallocated ring of variable length entries + * - entries are rounded to 4k blocks + * - entire thing is read and indexed in rbtree + * - static allocated page is kept around to record and write entries + * - indexes have cursor that points to next node to migrate + * - any time an entry is written an entry is migrated + * - allocate room for 4x (maybe including worst case rounding) + * - mount does binary search looking for newest entry + * - newest entry describes block where we started migrating + * - replay then walks from oldest to newest replaying + * - entries are marked with migration so we know where to set cursor after + * + * XXX + * - verify blocks + * - could compress + */ + +/* read in a meg at a time */ +#define NR_PAGES DIV_ROUND_UP(1024 * 1024, PAGE_SIZE) +#define NR_BLOCKS (NR_PAGES * SCOUTFS_BLOCKS_PER_PAGE) + +#if 0 +#define BLOCKS_PER_PAGE (PAGE_SIZE / SCOUTFS_BLOCK_SIZE) +static void read_page_end_io(struct bio *bio, int err) +{ + struct bio_vec *bvec; + struct page *page; + unsigned long i; + + for_each_bio_segment(bio, bvec, i) { + page = bvec->bv_page; + + if (err) + SetPageError(page); + else + SetPageUptodate(page); + unlock_page(page); + } + + bio_put(bio); +} + +/* + * Read the given number of 4k blocks into the pages provided by the + * caller. We translate the block count into a page count and fill + * bios a page at a time. + */ +static int read_blocks(struct super_block *sb, struct page **pages, + u64 blkno, unsigned int nr_blocks) +{ + unsigned int nr_pages = DIV_ROUND_UP(nr_blocks, PAGES_PER_BLOCK); + unsigned int bytes; + struct bio *bio; + int ret = 0; + + for (i = 0; i < nr_pages; i++) { + page = pages[i]; + + if (!bio) { + bio = bio_alloc(GFP_NOFS, nr_pages - i); + if (!bio) + bio = bio_alloc(GFP_NOFS, 1); + if (!bio) { + ret = -ENOMEM; + break; + } + + bio->bi_sector = blkno << (SCOUTFS_BLOCK_SHIFT - 9); + bio->bi_bdev = sb->s_bdev; + bio->bi_end_io = read_pages_end_io; + } + + lock_page(page); + ClearPageError(page); + ClearPageUptodate(page); + + bytes = min(nr_blocks << SCOUTFS_BLOCK_SHIFT, PAGE_SIZE); + + if (bio_add_page(bio, page, bytes, 0) != bytes) { + /* submit the full bio and retry this page */ + submit_bio(READ, bio); + bio = NULL; + unlock_page(page); + i--; + continue; + } + + blkno += BLOCKS_PER_PAGE; + nr_blocks -= BLOCKS_PER_PAGE; + } + + if (bio) + submit_bio(READ, bio); + + for (i = 0; i < nr_pages; i++) { + page = pages[i]; + + wait_on_page_locked(page); + if (!ret && (!PageUptodate(page) || PageError(page))) + ret = -EIO; + } + + return ret; +} +#endif + + +static int read_one_entry(struct super_block *sb, + struct scoutfs_ring_entry_header *eh) +{ + struct scoutfs_ring_add_manifest *am; + SCOUTFS_DECLARE_KVEC(first); + SCOUTFS_DECLARE_KVEC(last); + int ret; + + switch(eh->type) { + case SCOUTFS_RING_ADD_MANIFEST: + am = container_of(eh, struct scoutfs_ring_add_manifest, eh); + + scoutfs_kvec_init(first, am + 1, + le16_to_cpu(am->first_key_len)); + scoutfs_kvec_init(last, + first[0].iov_base + first[0].iov_len, + le16_to_cpu(am->last_key_len)); + + ret = scoutfs_manifest_add(sb, first, last, + le64_to_cpu(am->segno), + le64_to_cpu(am->seq), am->level); + break; + + default: + ret = -EINVAL; + } + + return ret; +} + +static int read_entries(struct super_block *sb, + struct scoutfs_ring_block *ring) +{ + struct scoutfs_ring_entry_header *eh; + int ret = 0; + int i; + + eh = ring->entries; + + for (i = 0; i < le32_to_cpu(ring->nr_entries); i++) { + ret = read_one_entry(sb, eh); + if (ret) + break; + + eh = (void *)eh + le16_to_cpu(eh->len); + } + + return ret; +} + +#if 0 +/* return pointer to the blk 4k block offset amongst the pages */ +static void *page_block_address(struct page **pages, unsigned int blk) +{ + unsigned int i = blk / BLOCKS_PER_PAGE; + unsigned int off = (blk % BLOCKS_PER_PAGE) << SCOUTFS_BLOCK_SHIFT; + + return page_address(pages[i]) + off; +} +#endif + +int scoutfs_ring_read(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_ring_block *ring; + struct page **pages; + struct page *page; + u64 index; + u64 blkno; + u64 tail; + u64 seq; + int ret; + int nr; + int i; + + /* nr_blocks/pages calc doesn't handle multiple pages per block */ + BUILD_BUG_ON(PAGE_SIZE < SCOUTFS_BLOCK_SIZE); + + pages = kcalloc(NR_PAGES, sizeof(struct page *), GFP_NOFS); + if (!pages) + return -ENOMEM; + + for (i = 0; i < NR_PAGES; i++) { + page = alloc_page(GFP_NOFS); + if (!page) { + ret = -ENOMEM; + goto out; + } + + pages[i] = page; + } + + index = le64_to_cpu(super->ring_head_index); + tail = le64_to_cpu(super->ring_tail_index); + seq = le64_to_cpu(super->ring_head_seq); + + do { + blkno = le64_to_cpu(super->ring_blkno) + index; + + if (index <= tail) + nr = tail - index + 1; + else + nr = le64_to_cpu(super->ring_blocks) - index; + nr = min_t(int, nr, NR_BLOCKS); + + ret = scoutfs_bio_read(sb, pages, index, nr); + if (ret) + goto out; + + /* XXX verify block header */ + + for (i = 0; i < nr; i++) { + ring = scoutfs_page_block_address(pages, i); + ret = read_entries(sb, ring); + if (ret) + goto out; + } + + index += nr; + if (index == le64_to_cpu(super->ring_blocks)) + index = 0; + } while (index != tail); + +out: + for (i = 0; i < NR_PAGES && pages && pages[i]; i++) + __free_page(pages[i]); + kfree(pages); + + return ret; +} diff --git a/kmod/src/ring.h b/kmod/src/ring.h new file mode 100644 index 00000000..4f6930c9 --- /dev/null +++ b/kmod/src/ring.h @@ -0,0 +1,8 @@ +#ifndef _SCOUTFS_RING_H_ +#define _SCOUTFS_RING_H_ + +#include + +int scoutfs_ring_read(struct super_block *sb); + +#endif diff --git a/kmod/src/seg.c b/kmod/src/seg.c new file mode 100644 index 00000000..9f884845 --- /dev/null +++ b/kmod/src/seg.c @@ -0,0 +1,399 @@ +/* + * Copyright (C) 2016 Versity Software, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ +#include +#include +#include +#include +#include + +#include "super.h" +#include "format.h" +#include "seg.h" +#include "bio.h" +#include "kvec.h" + +/* + * seg.c should just be about the cache and io, and maybe + * iteration and stuff. + * + * XXX: + * - lru and shrinker + * - verify csum + * - make sure item headers don't cross page boundaries + * - just wait on pages instead of weird flags? + */ + +struct segment_cache { + spinlock_t lock; + struct rb_root root; + wait_queue_head_t waitq; +}; + +struct scoutfs_segment { + struct rb_node node; + atomic_t refcount; + u64 segno; + unsigned long flags; + int err; + struct page *pages[SCOUTFS_SEGMENT_PAGES]; +}; + +enum { + SF_END_IO = 0, +}; + +static struct scoutfs_segment *alloc_seg(u64 segno) +{ + struct scoutfs_segment *seg; + struct page *page; + int i; + + /* don't waste the tail of pages */ + BUILD_BUG_ON(SCOUTFS_SEGMENT_SIZE % PAGE_SIZE); + + seg = kzalloc(sizeof(struct scoutfs_segment), GFP_NOFS); + if (!seg) + return seg; + + RB_CLEAR_NODE(&seg->node); + atomic_set(&seg->refcount, 1); + seg->segno = segno; + + for (i = 0; i < SCOUTFS_SEGMENT_PAGES; i++) { + page = alloc_page(GFP_NOFS); + if (!page) { + scoutfs_seg_put(seg); + return ERR_PTR(-ENOMEM); + } + + seg->pages[i] = page; + } + + return seg; +} + +void scoutfs_seg_put(struct scoutfs_segment *seg) +{ + int i; + + if (!IS_ERR_OR_NULL(seg) && atomic_dec_and_test(&seg->refcount)) { + WARN_ON_ONCE(!RB_EMPTY_NODE(&seg->node)); + for (i = 0; i < SCOUTFS_SEGMENT_PAGES; i++) + if (seg->pages[i]) + __free_page(seg->pages[i]); + kfree(seg); + } +} + +static int cmp_u64s(u64 a, u64 b) +{ + return a < b ? -1 : a > b ? 1 : 0; +} + +static struct scoutfs_segment *find_seg(struct rb_root *root, u64 segno) +{ + struct rb_node *node = root->rb_node; + struct rb_node *parent = NULL; + struct scoutfs_segment *seg; + int cmp; + + while (node) { + parent = node; + seg = container_of(node, struct scoutfs_segment, node); + + cmp = cmp_u64s(segno, seg->segno); + if (cmp < 0) + node = node->rb_left; + else if (cmp > 0) + node = node->rb_right; + else + return seg; + } + + return NULL; +} + +/* + * This always inserts the segment into the rbtree. If there's already + * a segment at the given seg then it is removed and returned. The caller + * doesn't have to erase it from the tree if it's returned. + */ +static struct scoutfs_segment *replace_seg(struct rb_root *root, + struct scoutfs_segment *ins) +{ + struct rb_node **node = &root->rb_node; + struct rb_node *parent = NULL; + struct scoutfs_segment *seg; + struct scoutfs_segment *found = NULL; + int cmp; + + while (*node) { + parent = *node; + seg = container_of(*node, struct scoutfs_segment, node); + + cmp = cmp_u64s(ins->segno, seg->segno); + if (cmp < 0) { + node = &(*node)->rb_left; + } else if (cmp > 0) { + node = &(*node)->rb_right; + } else { + rb_replace_node(&seg->node, &ins->node, root); + found = seg; + break; + } + } + + if (!found) { + rb_link_node(&ins->node, parent, node); + rb_insert_color(&ins->node, root); + } + + return found; +} + +static bool erase_seg(struct rb_root *root, struct scoutfs_segment *seg) +{ + if (!RB_EMPTY_NODE(&seg->node)) { + rb_erase(&seg->node, root); + RB_CLEAR_NODE(&seg->node); + return true; + } + + return false; +} + +static void seg_end_io(struct super_block *sb, void *data, int err) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac = sbi->segment_cache; + struct scoutfs_segment *seg = data; + unsigned long flags; + bool erased; + + if (err) { + seg->err = err; + + spin_lock_irqsave(&cac->lock, flags); + erased = erase_seg(&cac->root, seg); + spin_unlock_irqrestore(&cac->lock, flags); + if (erased) + scoutfs_seg_put(seg); + } + + set_bit(SF_END_IO, &seg->flags); + smp_mb__after_atomic(); + if (waitqueue_active(&cac->waitq)) + wake_up(&cac->waitq); + + scoutfs_seg_put(seg); +} + +static u64 segno_to_blkno(u64 blkno) +{ + return blkno << (SCOUTFS_SEGMENT_SHIFT - SCOUTFS_BLOCK_SHIFT); +} + +/* + * The bios submitted by this don't have page references themselves. If + * this succeeds then the caller must call _wait before putting their + * seg ref. + */ +struct scoutfs_segment *scoutfs_seg_submit_read(struct super_block *sb, + u64 segno) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac = sbi->segment_cache; + struct scoutfs_segment *existing; + struct scoutfs_segment *seg; + unsigned long flags; + + spin_lock_irqsave(&cac->lock, flags); + seg = find_seg(&cac->root, segno); + if (seg) + atomic_inc(&seg->refcount); + spin_unlock_irqrestore(&cac->lock, flags); + if (seg) + return seg; + + seg = alloc_seg(segno); + if (IS_ERR(seg)) + return seg; + + /* always drop existing segs, could compare seqs */ + spin_lock_irqsave(&cac->lock, flags); + atomic_inc(&seg->refcount); + existing = replace_seg(&cac->root, seg); + spin_unlock_irqrestore(&cac->lock, flags); + if (existing) + scoutfs_seg_put(existing); + + atomic_inc(&seg->refcount); + scoutfs_bio_submit(sb, READ, seg->pages, segno_to_blkno(seg->segno), + SCOUTFS_SEGMENT_BLOCKS, seg_end_io, seg); + + return seg; +} + +int scoutfs_seg_wait(struct super_block *sb, struct scoutfs_segment *seg) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac = sbi->segment_cache; + int ret; + + ret = wait_event_interruptible(cac->waitq, + test_bit(SF_END_IO, &seg->flags)); + if (!ret) + ret = seg->err; + + return ret; +} + +static void *off_ptr(struct scoutfs_segment *seg, u32 off) +{ + unsigned int pg = off >> PAGE_SHIFT; + unsigned int pg_off = off & ~PAGE_MASK; + + return page_address(seg->pages[pg]) + pg_off; +} + +/* + * Return a pointer to the item in the array at the given position. + * + * The item structs fill the first block in the segment after the + * initial segment block struct. Item structs don't cross block + * boundaries so the final bytes that would make up a partial item + * struct are skipped. + */ +static struct scoutfs_segment_item *pos_item(struct scoutfs_segment *seg, + int pos) +{ + u32 off; + + if (pos < SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS) { + off = sizeof(struct scoutfs_segment_block); + } else { + pos -= SCOUTFS_SEGMENT_FIRST_BLOCK_ITEMS; + off = (1 + (pos / SCOUTFS_SEGMENT_ITEMS_PER_BLOCK)) * + SCOUTFS_BLOCK_SIZE; + pos %= SCOUTFS_SEGMENT_ITEMS_PER_BLOCK; + } + + return off_ptr(seg, off + (pos * sizeof(struct scoutfs_segment_item))); +} + +static void kvec_from_pages(struct scoutfs_segment *seg, + struct kvec *kvec, u32 off, u16 len) +{ + u32 first; + + first = min_t(int, len, PAGE_SIZE - (off & ~PAGE_MASK)); + + if (first == len) + scoutfs_kvec_init(kvec, off_ptr(seg, off), len); + else + scoutfs_kvec_init(kvec, off_ptr(seg, off), first, + off_ptr(seg, off + first), len - first); +} + +int scoutfs_seg_item_kvecs(struct scoutfs_segment *seg, int pos, + struct kvec *key, struct kvec *val) +{ + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + struct scoutfs_segment_item *item; + + if (pos < 0 || pos >= le32_to_cpu(sblk->nr_items)) + return -ENOENT; + + item = pos_item(seg, pos); + + if (key) + kvec_from_pages(seg, key, le32_to_cpu(item->key_off), + le16_to_cpu(item->key_len)); + if (val) + kvec_from_pages(seg, val, le32_to_cpu(item->val_off), + le16_to_cpu(item->val_len)); + + return 0; +} + +/* + * Find the first item array position whose key is >= the search key. + * This can return the number of positions if the key is greater than + * all the keys. + */ +static int find_key_pos(struct scoutfs_segment *seg, struct kvec *search) +{ + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + SCOUTFS_DECLARE_KVEC(key); + unsigned int start = 0; + unsigned int end = le32_to_cpu(sblk->nr_items); + unsigned int pos = 0; + int cmp; + + while (start < end) { + pos = start + (end - start) / 2; + scoutfs_seg_item_kvecs(seg, pos, key, NULL); + + cmp = scoutfs_kvec_memcmp(search, key); + if (cmp < 0) + end = pos; + else if (cmp > 0) + start = ++pos; + else + break; + } + + return pos; +} + +int scoutfs_seg_find_pos(struct scoutfs_segment *seg, struct kvec *key) +{ + return find_key_pos(seg, key); +} + +int scoutfs_seg_setup(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac; + + cac = kzalloc(sizeof(struct segment_cache), GFP_KERNEL); + if (!cac) + return -ENOMEM; + sbi->segment_cache = cac; + + spin_lock_init(&cac->lock); + cac->root = RB_ROOT; + init_waitqueue_head(&cac->waitq); + + return 0; +} + +void scoutfs_seg_destroy(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct segment_cache *cac = sbi->segment_cache; + struct scoutfs_segment *seg; + struct rb_node *node; + + if (cac) { + for (node = rb_first(&cac->root); node; ) { + seg = container_of(node, struct scoutfs_segment, node); + node = rb_next(node); + erase_seg(&cac->root, seg); + scoutfs_seg_put(seg); + } + + kfree(cac); + } +} + diff --git a/kmod/src/seg.h b/kmod/src/seg.h new file mode 100644 index 00000000..1957a308 --- /dev/null +++ b/kmod/src/seg.h @@ -0,0 +1,20 @@ +#ifndef _SCOUTFS_SEG_H_ +#define _SCOUTFS_SEG_H_ + +struct scoutfs_segment; +struct kvec; + +struct scoutfs_segment *scoutfs_seg_submit_read(struct super_block *sb, + u64 segno); +int scoutfs_seg_wait(struct super_block *sb, struct scoutfs_segment *seg); + +int scoutfs_seg_find_pos(struct scoutfs_segment *seg, struct kvec *key); +int scoutfs_seg_item_kvecs(struct scoutfs_segment *seg, int pos, + struct kvec *key, struct kvec *val); + +void scoutfs_seg_put(struct scoutfs_segment *seg); + +int scoutfs_seg_setup(struct super_block *sb); +void scoutfs_seg_destroy(struct super_block *sb); + +#endif diff --git a/kmod/src/super.c b/kmod/src/super.c index ca085815..4fdcb1f5 100644 --- a/kmod/src/super.c +++ b/kmod/src/super.c @@ -28,6 +28,10 @@ #include "counters.h" #include "trans.h" #include "buddy.h" +#include "ring.h" +#include "item.h" +#include "manifest.h" +#include "seg.h" #include "scoutfs_trace.h" static struct kset *scoutfs_kset; @@ -212,7 +216,11 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) ret = scoutfs_setup_counters(sb) ?: read_supers(sb) ?: - scoutfs_buddy_setup(sb) ?: + scoutfs_seg_setup(sb) ?: + scoutfs_manifest_setup(sb) ?: + scoutfs_item_setup(sb) ?: + scoutfs_ring_read(sb) ?: +// scoutfs_buddy_setup(sb) ?: scoutfs_setup_trans(sb); if (ret) return ret; @@ -227,7 +235,7 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) if (!sb->s_root) return -ENOMEM; - scoutfs_scan_orphans(sb); +// scoutfs_scan_orphans(sb); return 0; } @@ -248,6 +256,9 @@ static void scoutfs_kill_sb(struct super_block *sb) scoutfs_buddy_destroy(sb); if (sbi->block_shrinker.shrink == scoutfs_block_shrink) unregister_shrinker(&sbi->block_shrinker); + scoutfs_item_destroy(sb); + scoutfs_manifest_destroy(sb); + scoutfs_seg_destroy(sb); scoutfs_block_destroy(sb); scoutfs_destroy_counters(sb); if (sbi->kset) diff --git a/kmod/src/super.h b/kmod/src/super.h index 25e6eec1..b1b20e97 100644 --- a/kmod/src/super.h +++ b/kmod/src/super.h @@ -9,6 +9,9 @@ struct scoutfs_counters; struct buddy_info; +struct item_cache; +struct manifest; +struct segment_cache; struct scoutfs_sb_info { struct super_block *sb; @@ -28,6 +31,10 @@ struct scoutfs_sb_info { struct list_head block_lru_list; unsigned long block_lru_nr; + struct manifest *manifest; + struct item_cache *item_cache; + struct segment_cache *segment_cache; + struct buddy_info *buddy_info; struct rw_semaphore btree_rwsem;