diff --git a/kmod/src/Makefile b/kmod/src/Makefile index 8eefa971..6f0d66d8 100644 --- a/kmod/src/Makefile +++ b/kmod/src/Makefile @@ -4,5 +4,4 @@ CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include scoutfs-y += alloc.o bio.o btree.o compact.o counters.o data.o dir.o kvec.o \ inode.o ioctl.o item.o key.o lock.o manifest.o msg.o net.o \ - options.o ring.o seg.o scoutfs_trace.o sort_priv.o super.o trans.o \ - xattr.o + options.o seg.o scoutfs_trace.o sort_priv.o super.o trans.o xattr.o diff --git a/kmod/src/format.h b/kmod/src/format.h index 0f5b8638..1ccac901 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -7,7 +7,7 @@ #define SCOUTFS_SUPER_ID 0x2e736674756f6373ULL /* "scoutfs." */ /* - * The super block and ring blocks are fixed 4k. + * The super block and btree blocks are fixed 4k. */ #define SCOUTFS_BLOCK_SHIFT 12 #define SCOUTFS_BLOCK_SIZE (1 << SCOUTFS_BLOCK_SHIFT) @@ -50,32 +50,6 @@ struct scoutfs_block_header { __le64 blkno; } __packed; -struct scoutfs_ring_entry { - __le16 data_len; - __u8 flags; - __u8 data[0]; -} __packed; - -#define SCOUTFS_RING_ENTRY_FLAG_DELETION (1 << 0) - -struct scoutfs_ring_block { - __le32 crc; - __le32 pad; - __le64 fsid; - __le64 seq; - __le64 block; - __le32 nr_entries; - struct scoutfs_ring_entry entries[0]; -} __packed; - -struct scoutfs_ring_descriptor { - __le64 blkno; - __le64 total_blocks; - __le64 first_block; - __le64 first_seq; - __le64 nr_blocks; -} __packed; - /* * Assert that we'll be able to represent all possible keys with 8 64bit * primary sort values. @@ -401,11 +375,6 @@ struct scoutfs_inet_addr { #define SCOUTFS_DEFAULT_PORT 12345 -/* - * The ring fields describe the statically allocated ring log. The - * head and tail indexes are logical 4k blocks offsets inside the ring. - * The head block should contain the seq. - */ struct scoutfs_super_block { struct scoutfs_block_header hdr; __le64 id; @@ -415,10 +384,6 @@ struct scoutfs_super_block { __le64 alloc_uninit; __le64 total_segs; __le64 free_segs; - __le64 ring_blkno; - __le64 ring_blocks; - __le64 ring_tail_block; - __le64 ring_gen; struct scoutfs_btree_ring bring; __le64 next_seg_seq; struct scoutfs_btree_root alloc_root; diff --git a/kmod/src/ring.c b/kmod/src/ring.c deleted file mode 100644 index 1e5f8e78..00000000 --- a/kmod/src/ring.c +++ /dev/null @@ -1,817 +0,0 @@ -/* - * Copyright (C) 2017 Versity Software, Inc. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License v2 as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - */ -#include -#include -#include -#include -#include - -#include "super.h" -#include "format.h" -#include "bio.h" -#include "ring.h" - -/* - * scoutfs stores the persistent indexes for the server in a simple log - * entries in a preallocated ring of blocks. - * - * The index is read from the log and loaded in to an rbtree in memory. - * Callers then lock around operations that work on the rbtrees. Dirty - * and deleted nodes are tracked and are eventually copied to pages that - * are written to the tail of the log. - * - * This has the great benefit of updating an index with very few (often - * one) contiguous block writes with low write amplification. - * - * This has the significant cost of requiring reading the indexes in to - * memory before doing any work and then having to hold them resident. - * This is fine for now but we'll have to address these latency and - * capacity limitations before too long. - * - * Callers are entirely responsible for locking. - */ - -/* - * XXX - * - deletion entries could be smaller if we understood keys - * - shouldn't be too hard to compress - */ - -/* - * @block records the logical ring index of the block that contained the - * node. As we commit a ring update we can look at the clean list to - * find the first block that we have to read out of the ring. This - * helps minimize the active region of the ring. - * - * @in_ring is used to mark nodes that were present in the ring and - * which need deletion entries written to the ring before they can be - * freed. - */ -struct ring_node { - struct rb_node rb_node; - struct list_head head; - u64 block; - - u16 data_len; - - u8 dirty:1, - deleted:1, - in_ring:1; - - /* data is packed but callers perform native long bitops */ - u8 data[0] __aligned(__alignof__(long)); -}; - -static struct ring_node *data_rnode(void *data) -{ - return data ? container_of(data, struct ring_node, data) : NULL; -} - -static void *rnode_data(struct ring_node *rnode) -{ - return rnode ? rnode->data : NULL; -} - -static unsigned total_entry_bytes(unsigned data_len) -{ - return offsetof(struct scoutfs_ring_entry, data[data_len]); -} - -/* - * Each time we mark a node dirty we also dirty the oldest clean entry. - * This ensures that we never overwrite stable data. - * - * Picture a ring of blocks where the first half of the ring is full of - * existing entries. Imagine that we continuously update a set of - * entries that make up a single block. Each new update block - * invalidates the previous update block but it advances through the - * ring while the old entries are sitting idle in the first half. - * Eventually the new update blocks wrap around and clobber the old - * blocks. - * - * Now instead imagine that each time we dirty an entry in this set of - * constantly changing entries that we also go and dirty the earliest - * existing entry in the ring. Now each update is a block of the - * useless updating entries and a block of old entries that have been - * migrated. Each time we write two blocks to the ring we migrate one - * block from the start of the ring. Now by the time we fill the second - * half of the ring we've reclaimed half of the first half of the ring. - * - * So we size the ring to fit 4x the largest possible index. Now we're - * sure that we'll be able to fully migrate the index from the first - * half of the ring into the second half before it wraps around and - * starts overwriting the first. - */ -static void mark_node_dirty(struct scoutfs_ring_info *ring, - struct ring_node *rnode, bool migrate) -{ - struct ring_node *pos; - long total; - - if (!rnode || rnode->dirty) - return; - - list_move_tail(&rnode->head, &ring->dirty_list); - rnode->dirty = 1; - ring->dirty_bytes += total_entry_bytes(rnode->data_len); - - if (migrate) { - total = total_entry_bytes(rnode->data_len); - - list_for_each_entry_safe(rnode, pos, &ring->clean_list, head) { - mark_node_dirty(ring, rnode, false); - total -= total_entry_bytes(rnode->data_len); - if (total < 0) - break; - } - } -} - -static void mark_node_clean(struct scoutfs_ring_info *ring, - struct ring_node *rnode) -{ - if (!rnode || !rnode->dirty) - return; - - list_move_tail(&rnode->head, &ring->clean_list); - rnode->dirty = 0; - ring->dirty_bytes -= total_entry_bytes(rnode->data_len); -} - -static void free_node(struct scoutfs_ring_info *ring, - struct ring_node *rnode) -{ - if (rnode) { - mark_node_clean(ring, rnode); - - if (!list_empty(&rnode->head)) - list_del_init(&rnode->head); - if (!RB_EMPTY_NODE(&rnode->rb_node)) - rb_erase(&rnode->rb_node, &ring->rb_root); - - kfree(rnode); - } -} - -/* - * Walk the tree and return the last node traversed. cmp gives the - * caller the comparison between their key and the returned node. The - * caller can provide either their key or another nodes data to compare - * with during descent. If we're asked to insert we replace any node we - * find in the key's place. - */ -static struct ring_node *ring_rb_walk(struct scoutfs_ring_info *ring, - void *key, void *data, - struct ring_node *ins, - int *cmp) -{ - struct rb_node **node = &ring->rb_root.rb_node; - struct rb_node *parent = NULL; - struct ring_node *found = NULL; - struct ring_node *rnode = NULL; - - /* only provide one or the other */ - BUG_ON(!!key == !!data); - - while (*node) { - parent = *node; - rnode = container_of(*node, struct ring_node, rb_node); - - if (key) - *cmp = ring->compare_key(key, &rnode->data); - else - *cmp = ring->compare_data(data, &rnode->data); - - if (*cmp < 0) { - node = &(*node)->rb_left; - } else if (*cmp > 0) { - node = &(*node)->rb_right; - } else { - found = rnode; - break; - } - } - - if (ins) { - if (found) { - rb_replace_node(&found->rb_node, &ins->rb_node, - &ring->rb_root); - RB_CLEAR_NODE(&found->rb_node); - free_node(ring, found); - } else { - rb_link_node(&ins->rb_node, parent, node); - rb_insert_color(&ins->rb_node, &ring->rb_root); - } - found = ins; - *cmp = 0; - } - - return rnode; -} - -static struct ring_node *ring_rb_entry(struct rb_node *node) -{ - return node ? rb_entry(node, struct ring_node, rb_node) : NULL; -} - -/* return the next node, skipping deleted */ -static struct ring_node *ring_rb_next(struct ring_node *rnode) -{ - do { - if (rnode) - rnode = ring_rb_entry(rb_next(&rnode->rb_node)); - } while (rnode && rnode->deleted); - - return rnode; -} - -/* return the prev node, skipping deleted */ -static struct ring_node *ring_rb_prev(struct ring_node *rnode) -{ - do { - if (rnode) - rnode = ring_rb_entry(rb_prev(&rnode->rb_node)); - } while (rnode && rnode->deleted); - - return rnode; -} - -/* return the first node, skipping deleted */ -static struct ring_node *ring_rb_first(struct scoutfs_ring_info *ring) -{ - struct ring_node *rnode; - - rnode = ring_rb_entry(rb_first(&ring->rb_root)); - if (rnode && rnode->deleted) - rnode = ring_rb_next(rnode); - return rnode; -} - -static struct ring_node *alloc_node(unsigned data_len) -{ - struct ring_node *rnode; - - rnode = kzalloc(offsetof(struct ring_node, data[data_len]), GFP_NOFS); - if (rnode) { - RB_CLEAR_NODE(&rnode->rb_node); - INIT_LIST_HEAD(&rnode->head); - rnode->data_len = data_len; - } - - return rnode; -} - -/* - * Insert a new node. This will replace any existing node which could - * be in any state. - */ -void *scoutfs_ring_insert(struct scoutfs_ring_info *ring, void *key, - unsigned data_len) -{ - struct ring_node *rnode; - int cmp; - - rnode = alloc_node(data_len); - if (!rnode) - return NULL; - - ring_rb_walk(ring, key, NULL, rnode, &cmp); - /* just put it on a list, dirtying moves it to dirty */ - list_add_tail(&rnode->head, &ring->dirty_list); - mark_node_dirty(ring, rnode, true); - - trace_printk("inserted rnode %p in %u deleted %u dirty %u\n", - rnode, rnode->in_ring, rnode->deleted, - rnode->dirty); - - return rnode->data; -} - -void *scoutfs_ring_first(struct scoutfs_ring_info *ring) -{ - return rnode_data(ring_rb_first(ring)); -} - -void *scoutfs_ring_lookup(struct scoutfs_ring_info *ring, void *key) -{ - struct ring_node *rnode; - int cmp; - - rnode = ring_rb_walk(ring, key, NULL, NULL, &cmp); - if (rnode && (cmp || rnode->deleted)) - rnode = NULL; - - return rnode_data(rnode); -} - -void *scoutfs_ring_lookup_next(struct scoutfs_ring_info *ring, void *key) -{ - struct ring_node *rnode; - int cmp; - - rnode = ring_rb_walk(ring, key, NULL, NULL, &cmp); - if (rnode && (cmp > 0 || rnode->deleted)) - rnode = ring_rb_next(rnode); - - return rnode_data(rnode); -} - -void *scoutfs_ring_lookup_prev(struct scoutfs_ring_info *ring, void *key) -{ - struct ring_node *rnode; - int cmp; - - rnode = ring_rb_walk(ring, key, NULL, NULL, &cmp); - if (rnode && (cmp < 0 || rnode->deleted)) - rnode = ring_rb_prev(rnode); - - return rnode_data(rnode); -} - -void *scoutfs_ring_next(struct scoutfs_ring_info *ring, void *data) -{ - return rnode_data(ring_rb_next(data_rnode(data))); -} - -void *scoutfs_ring_prev(struct scoutfs_ring_info *ring, void *data) -{ - return rnode_data(ring_rb_prev(data_rnode(data))); -} - -/* - * Calculate the most blocks we could have to use to store a given number - * of bytes of entries. - */ -static unsigned most_blocks(unsigned long bytes) -{ - unsigned long space; - - space = SCOUTFS_BLOCK_SIZE - - sizeof(struct scoutfs_ring_block); - - return DIV_ROUND_UP(bytes, space); -} - -static u64 wrap_ring_block(struct scoutfs_ring_descriptor *rdesc, u64 block) -{ - if (block >= le64_to_cpu(rdesc->total_blocks)) - block -= le64_to_cpu(rdesc->total_blocks); - - /* XXX callers should have verified on load */ - BUG_ON(block >= le64_to_cpu(rdesc->total_blocks)); - - return block; -} - -static u64 calc_first_dirty_block(struct scoutfs_ring_descriptor *rdesc) -{ - return wrap_ring_block(rdesc, le64_to_cpu(rdesc->first_block) + - le64_to_cpu(rdesc->nr_blocks)); -} - -static __le32 rblk_crc(struct scoutfs_ring_block *rblk) -{ - unsigned long skip = (char *)(&rblk->crc + 1) - (char *)rblk; - - return cpu_to_le32(crc32c(~0, (char *)rblk + skip, - SCOUTFS_BLOCK_SIZE - skip)); -} - -/* - * This is called after the caller has copied all the dirty nodes into - * blocks in pages for writing. We might be able to dirty a few more - * clean nodes to fill up the end of the last dirty block to keep the - * ring blocks densely populated. - */ -static void fill_last_dirty_block(struct scoutfs_ring_info *ring, - unsigned space) -{ - struct ring_node *rnode; - struct ring_node *pos; - unsigned tot; - - list_for_each_entry_safe(rnode, pos, &ring->clean_list, head) { - - tot = total_entry_bytes(rnode->data_len); - if (tot > space) - break; - - mark_node_dirty(ring, rnode, false); - space -= tot; - } -} - -void scoutfs_ring_dirty(struct scoutfs_ring_info *ring, void *data) -{ - struct ring_node *rnode; - - rnode = data_rnode(data); - if (rnode) - mark_node_dirty(ring, rnode, true); -} - -/* - * Delete the given node. This can free the node so the caller cannot - * use the data after calling this. - * - * If the node previously existed in the ring then we have to save it and - * write a deletion entry before freeing it. - */ -void scoutfs_ring_delete(struct scoutfs_ring_info *ring, void *data) -{ - struct ring_node *rnode = data_rnode(data); - - trace_printk("deleting rnode %p in %u deleted %u dirty %u\n", - rnode, rnode->in_ring, rnode->deleted, rnode->dirty); - - BUG_ON(rnode->deleted); - - if (rnode->in_ring) { - rnode->deleted = 1; - mark_node_dirty(ring, rnode, true); - } else { - free_node(ring, rnode); - } -} - -static struct scoutfs_ring_block *block_in_pages(struct page **pages, - unsigned i) -{ - return page_address(pages[i / SCOUTFS_BLOCKS_PER_PAGE]) + - ((i % SCOUTFS_BLOCKS_PER_PAGE) << SCOUTFS_BLOCK_SHIFT); -} - -static int load_ring_block(struct scoutfs_ring_info *ring, - struct scoutfs_ring_block *rblk) -{ - struct scoutfs_ring_entry *rent; - struct ring_node *rnode; - unsigned data_len; - unsigned i; - int ret = 0; - int cmp; - - trace_printk("block %llu\n", le64_to_cpu(rblk->block)); - - rent = rblk->entries; - for (i = 0; i < le32_to_cpu(rblk->nr_entries); i++) { - - /* XXX verify fields? */ - data_len = le16_to_cpu(rent->data_len); - - trace_printk("rent %u data_len %u\n", i, data_len); - - if (rent->flags & SCOUTFS_RING_ENTRY_FLAG_DELETION) { - rnode = ring_rb_walk(ring, NULL, rent->data, NULL, - &cmp); - if (rnode && cmp == 0) - free_node(ring, rnode); - } else { - rnode = alloc_node(data_len); - if (!rnode) { - ret = -ENOMEM; - break; - } - - rnode->block = le64_to_cpu(rblk->block); - rnode->in_ring = 1; - memcpy(rnode->data, rent->data, data_len); - - ring_rb_walk(ring, NULL, rnode->data, rnode, &cmp); - list_add_tail(&rnode->head, &ring->clean_list); - } - - rent = (void *)&rent->data[data_len]; - } - - return ret; -} - -/* - * Read the ring entries into rb nodes with nice large synchronous reads. - */ -#define LOAD_BYTES (4 * 1024 * 1024) -#define LOAD_BLOCKS DIV_ROUND_UP(LOAD_BYTES, SCOUTFS_BLOCK_SIZE) -#define LOAD_PAGES DIV_ROUND_UP(LOAD_BYTES, PAGE_SIZE) -int scoutfs_ring_load(struct super_block *sb, struct scoutfs_ring_info *ring) -{ - struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; - struct scoutfs_ring_descriptor *rdesc = ring->rdesc; - struct scoutfs_ring_block *rblk; - struct page **pages; - unsigned read_nr; - unsigned i; - __le32 crc; - u64 block; - u64 total; - u64 seq; - u64 nr; - int ret; - - pages = kcalloc(LOAD_PAGES, sizeof(struct page *), GFP_NOFS); - if (!pages) - return -ENOMEM; - - for (i = 0; i < LOAD_PAGES; i++) { - pages[i] = alloc_page(GFP_NOFS); - if (!pages[i]) { - ret = -ENOMEM; - goto out; - } - } - - block = le64_to_cpu(rdesc->first_block); - seq = le64_to_cpu(rdesc->first_seq); - total = le64_to_cpu(rdesc->total_blocks); - nr = le64_to_cpu(rdesc->nr_blocks); - - while (nr) { - read_nr = min3(nr, (u64)LOAD_BLOCKS, total - block); - - ret = scoutfs_bio_read(sb, pages, le64_to_cpu(rdesc->blkno) + - block, read_nr); - if (ret) - goto out; - - for (i = 0; i < read_nr; i++) { - rblk = block_in_pages(pages, i); - crc = rblk_crc(rblk); - - if (rblk->fsid != super->hdr.fsid || - le64_to_cpu(rblk->block) != (block + i) || - le64_to_cpu(rblk->seq) != (seq + i) || - rblk->crc != crc) { - ret = -EIO; - goto out; - } - - ret = load_ring_block(ring, rblk); - if (ret) - goto out; - } - - block = wrap_ring_block(rdesc, block + read_nr); - seq += read_nr; - nr -= read_nr; - } - ret = 0; - -out: - for (i = 0; pages && i < LOAD_PAGES && pages[i]; i++) - __free_page(pages[i]); - kfree(pages); - - if (ret) - scoutfs_ring_destroy(ring); - - return ret; -} - -static struct ring_node *first_dirty_node(struct scoutfs_ring_info *ring) -{ - return list_first_entry_or_null(&ring->dirty_list, struct ring_node, - head); -} - -static struct ring_node *next_dirty_node(struct scoutfs_ring_info *ring, - struct ring_node *rnode) -{ - if (rnode->head.next == &ring->dirty_list) - return NULL; - - return list_next_entry(rnode, head); -} - -static void ring_free_pages(struct scoutfs_ring_info *ring) -{ - unsigned i; - - if (!ring->pages) - return; - - for (i = 0; i < ring->nr_pages; i++) { - if (ring->pages[i]) - __free_page(ring->pages[i]); - } - - kfree(ring->pages); - - ring->pages = NULL; - ring->nr_pages = 0; -} - -int scoutfs_ring_has_dirty(struct scoutfs_ring_info *ring) -{ - return !!ring->dirty_bytes; -} - -int scoutfs_ring_submit_write(struct super_block *sb, - struct scoutfs_ring_info *ring, - struct scoutfs_bio_completion *comp) -{ - struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; - struct scoutfs_ring_descriptor *rdesc = ring->rdesc; - struct scoutfs_ring_block *rblk; - struct scoutfs_ring_entry *rent; - struct ring_node *rnode; - struct ring_node *next; - struct page **pages; - unsigned nr_blocks; - unsigned nr_pages; - unsigned i; - u64 blkno; - u64 block; - u64 first; - u64 last; - u64 seq; - u64 nr; - u8 *end; - int ret; - - if (ring->dirty_bytes == 0) - return 0; - - nr_blocks = most_blocks(ring->dirty_bytes); - nr_pages = DIV_ROUND_UP(nr_blocks, SCOUTFS_BLOCKS_PER_PAGE); - - ring_free_pages(ring); - - pages = kcalloc(nr_pages, sizeof(struct page *), GFP_NOFS); - if (!pages) - return -ENOMEM; - - ring->pages = pages; - ring->nr_pages = nr_pages; - - for (i = 0; i < nr_pages; i++) { - pages[i] = alloc_page(GFP_NOFS | __GFP_ZERO); - if (!pages[i]) { - ret = -ENOMEM; - goto out; - } - } - - block = ring->first_dirty_block; - seq = ring->first_dirty_seq; - rnode = first_dirty_node(ring); - - for (i = 0; rnode && i < nr_blocks; i++) { - - rblk = block_in_pages(pages, i); - end = (u8 *)rblk + SCOUTFS_BLOCK_SIZE; - - rblk->fsid = super->hdr.fsid; - rblk->seq = cpu_to_le64(seq); - rblk->block = cpu_to_le64(block); - - rent = rblk->entries; - - while (rnode && &rent->data[rnode->data_len] <= end) { - - trace_printk("writing ent %u rnode %p in %u deleted %u dirty %u\n", - le32_to_cpu(rblk->nr_entries), - rnode, rnode->in_ring, rnode->deleted, - rnode->dirty); - - rent->data_len = cpu_to_le16(rnode->data_len); - if (rnode->deleted) - rent->flags = SCOUTFS_RING_ENTRY_FLAG_DELETION; - memcpy(rent->data, rnode->data, rnode->data_len); - - le32_add_cpu(&rblk->nr_entries, 1); - - rnode->block = block; - - rent = (void *)&rent->data[le16_to_cpu(rent->data_len)]; - - next = next_dirty_node(ring, rnode); - if (!next) { - fill_last_dirty_block(ring, (char *)end - - (char *)rent); - next = next_dirty_node(ring, rnode); - } - rnode = next; - } - - rblk->crc = rblk_crc(rblk); - - block = wrap_ring_block(rdesc, block + 1); - seq++; - } - - /* update the number of blocks we actually filled */ - nr_blocks = i; - - /* point the descriptor at the new active region of the ring */ - rnode = list_first_entry_or_null(&ring->clean_list, struct ring_node, - head); - if (rnode) - first = rnode->block; - else - first = ring->first_dirty_block; - - last = wrap_ring_block(rdesc, ring->first_dirty_block + nr_blocks); - - if (first < last) - nr = last - first; - else - nr = last + le64_to_cpu(rdesc->total_blocks) - first; - - rdesc->first_block = cpu_to_le64(first); - rdesc->first_seq = cpu_to_le64(ring->first_dirty_seq + nr_blocks - nr); - rdesc->nr_blocks = cpu_to_le64(nr); - - /* the contig dirty blocks in pages might wrap around ring */ - blkno = le64_to_cpu(rdesc->blkno) + ring->first_dirty_block; - nr = min_t(u64, nr_blocks, - le64_to_cpu(rdesc->total_blocks) - ring->first_dirty_block); - - scoutfs_bio_submit_comp(sb, WRITE, pages, blkno, nr, comp); - - if (nr != nr_blocks) { - pages += nr / SCOUTFS_BLOCKS_PER_PAGE; - blkno = le64_to_cpu(rdesc->blkno); - nr = nr_blocks - nr; - - scoutfs_bio_submit_comp(sb, WRITE, pages, blkno, nr, comp); - } - - ret = 0; - -out: - if (ret) - ring_free_pages(ring); - - return ret; -} - -void scoutfs_ring_write_complete(struct scoutfs_ring_info *ring) -{ - struct ring_node *rnode; - struct ring_node *pos; - - list_for_each_entry_safe(rnode, pos, &ring->dirty_list, head) { - if (rnode->deleted) { - free_node(ring, rnode); - } else { - mark_node_clean(ring, rnode); - rnode->in_ring = 1; - } - } - - ring_free_pages(ring); - - ring->dirty_bytes = 0; - ring->first_dirty_block = calc_first_dirty_block(ring->rdesc); - ring->first_dirty_seq = le64_to_cpu(ring->rdesc->first_seq) + - le64_to_cpu(ring->rdesc->nr_blocks); -} - -void scoutfs_ring_init(struct scoutfs_ring_info *ring, - struct scoutfs_ring_descriptor *rdesc, - scoutfs_ring_cmp_t compare_key, - scoutfs_ring_cmp_t compare_data) -{ - ring->rdesc = rdesc; - ring->compare_key = compare_key; - ring->compare_data = compare_data; - ring->rb_root = RB_ROOT; - INIT_LIST_HEAD(&ring->clean_list); - INIT_LIST_HEAD(&ring->dirty_list); - ring->dirty_bytes = 0; - ring->first_dirty_block = calc_first_dirty_block(rdesc); - ring->first_dirty_seq = le64_to_cpu(rdesc->first_seq) + - le64_to_cpu(rdesc->nr_blocks); - ring->pages = NULL; - ring->nr_pages = 0; -} - -void scoutfs_ring_destroy(struct scoutfs_ring_info *ring) -{ - struct ring_node *rnode; - struct ring_node *pos; - - /* XXX we don't really have a coherent forced dirty unmount story */ - WARN_ON_ONCE(!list_empty(&ring->dirty_list)); - - list_splice_init(&ring->dirty_list, &ring->clean_list); - - list_for_each_entry_safe(rnode, pos, &ring->clean_list, head) { - list_del_init(&rnode->head); - kfree(rnode); - } - - ring_free_pages(ring); - scoutfs_ring_init(ring, ring->rdesc, ring->compare_key, - ring->compare_data); -} diff --git a/kmod/src/ring.h b/kmod/src/ring.h deleted file mode 100644 index a9341092..00000000 --- a/kmod/src/ring.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _SCOUTFS_RING_H_ -#define _SCOUTFS_RING_H_ - -struct scoutfs_bio_completion; - -typedef int (*scoutfs_ring_cmp_t)(void *a, void *b); - -struct scoutfs_ring_info { - struct scoutfs_ring_descriptor *rdesc; - - scoutfs_ring_cmp_t compare_key; - scoutfs_ring_cmp_t compare_data; - - struct rb_root rb_root; - - struct list_head clean_list; - struct list_head dirty_list; - - unsigned long dirty_bytes; - u64 first_dirty_block; - u64 first_dirty_seq; - - struct page **pages; - unsigned long nr_pages; -}; - -void scoutfs_ring_init(struct scoutfs_ring_info *ring, - struct scoutfs_ring_descriptor *rdesc, - scoutfs_ring_cmp_t compare_key, - scoutfs_ring_cmp_t compare_data); - -int scoutfs_ring_load(struct super_block *sb, struct scoutfs_ring_info *ring); - -void *scoutfs_ring_insert(struct scoutfs_ring_info *ring, void *key, - unsigned data_len); - -void *scoutfs_ring_first(struct scoutfs_ring_info *ring); -void *scoutfs_ring_lookup(struct scoutfs_ring_info *ring, void *key); -void *scoutfs_ring_lookup_next(struct scoutfs_ring_info *ring, void *key); -void *scoutfs_ring_lookup_prev(struct scoutfs_ring_info *ring, void *key); - -void *scoutfs_ring_next(struct scoutfs_ring_info *ring, void *rdata); -void *scoutfs_ring_prev(struct scoutfs_ring_info *ring, void *rdata); -void scoutfs_ring_dirty(struct scoutfs_ring_info *ring, void *rdata); -void scoutfs_ring_delete(struct scoutfs_ring_info *ring, void *rdata); - -int scoutfs_ring_has_dirty(struct scoutfs_ring_info *ring); -int scoutfs_ring_submit_write(struct super_block *sb, - struct scoutfs_ring_info *ring, - struct scoutfs_bio_completion *comp); -void scoutfs_ring_write_complete(struct scoutfs_ring_info *ring); - -void scoutfs_ring_destroy(struct scoutfs_ring_info *ring); - -#endif