diff --git a/kmod/src/block.c b/kmod/src/block.c index cb403465..9102aaaa 100644 --- a/kmod/src/block.c +++ b/kmod/src/block.c @@ -58,3 +58,54 @@ struct buffer_head *scoutfs_read_block(struct super_block *sb, u64 blkno) brelse(bh); return NULL; } + +/* + * Return a locked dirty buffer with undefined contents. The caller is + * responsible for initializing the entire block. Callers can try and + * read from these dirty blocks so we mark them verified so that they + * don't try to check uninitialized crcs. + */ +struct buffer_head *scoutfs_dirty_bh(struct super_block *sb, u64 blkno) +{ + struct buffer_head *bh; + + bh = sb_getblk(sb, blkno); + if (bh) { + lock_buffer(bh); + set_buffer_uptodate(bh); + mark_buffer_dirty(bh); + set_buffer_private_verified(bh); + } + + return bh; +} + +/* + * Return a locked dirty buffer with a partially initialized block + * header. The caller has to calculate the header crc before unlocking + * the block. The header will have the sequence number of the dirty super + * by default. + */ +struct buffer_head *scoutfs_dirty_block(struct super_block *sb, u64 blkno) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_block_header *hdr; + struct buffer_head *bh; + + bh = scoutfs_dirty_bh(sb, blkno); + if (bh) { + hdr = (void *)bh->b_data; + *hdr = super->hdr; + hdr->blkno = cpu_to_le64(blkno); + } + + return bh; +} + +void scoutfs_calc_hdr_crc(struct buffer_head *bh) +{ + struct scoutfs_block_header *hdr = (void *)bh->b_data; + + hdr->crc = cpu_to_le32(scoutfs_crc_block(hdr)); +} diff --git a/kmod/src/block.h b/kmod/src/block.h index c87fb6b8..30d79864 100644 --- a/kmod/src/block.h +++ b/kmod/src/block.h @@ -2,5 +2,8 @@ #define _SCOUTFS_BLOCK_H_ struct buffer_head *scoutfs_read_block(struct super_block *sb, u64 blkno); +struct buffer_head *scoutfs_dirty_bh(struct super_block *sb, u64 blkno); +struct buffer_head *scoutfs_dirty_block(struct super_block *sb, u64 blkno); +void scoutfs_calc_hdr_crc(struct buffer_head *bh); #endif diff --git a/kmod/src/chunk.c b/kmod/src/chunk.c index 6b5758af..4b7a24ec 100644 --- a/kmod/src/chunk.c +++ b/kmod/src/chunk.c @@ -24,16 +24,65 @@ #include "dir.h" #include "msg.h" #include "block.h" +#include "ring.h" void scoutfs_set_chunk_alloc_bits(struct super_block *sb, struct scoutfs_ring_bitmap *bm) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); - u64 off = le64_to_cpu(bm->offset); + u64 off = le64_to_cpu(bm->offset) * ARRAY_SIZE(bm->bits); /* XXX check for corruption */ sbi->chunk_alloc_bits[off] = bm->bits[0]; sbi->chunk_alloc_bits[off + 1] = bm->bits[1]; - +} + +/* + * Return the block number of the first block in a free chunk. + * + * The region around the cleared free bit for the allocation is always + * added to the ring and will generate a ton of overlapping ring + * entries. This is fine for initial testing but won't be good enough + * for real use. We'll have a bitmap of dirtied regions that are only + * logged as the update is written out. + */ +int scoutfs_alloc_chunk(struct super_block *sb, u64 *blkno) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + unsigned long size = le64_to_cpu(super->total_chunks); + struct scoutfs_ring_bitmap bm; + unsigned long off; + unsigned long bit; + int ret; + + spin_lock(&sbi->chunk_alloc_lock); + + bit = find_next_bit_le(sbi->chunk_alloc_bits, size, 0); + if (bit >= size) { + ret = -ENOSPC; + } else { + clear_bit_le(bit, sbi->chunk_alloc_bits); + + off = round_down(bit, sizeof(bm.bits) * 8); + bm.offset = le32_to_cpu(off); + + off *= ARRAY_SIZE(bm.bits); + bm.bits[0] = sbi->chunk_alloc_bits[off]; + bm.bits[1] = sbi->chunk_alloc_bits[off + 1]; + + *blkno = bit << SCOUTFS_CHUNK_BLOCK_SHIFT; + ret = 0; + } + + spin_unlock(&sbi->chunk_alloc_lock); + + if (!ret) { + ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_BITMAP, &bm, + sizeof(bm)); + WARN_ON_ONCE(ret); + } + + return ret; } diff --git a/kmod/src/chunk.h b/kmod/src/chunk.h index b2cb6ff7..eb6615c7 100644 --- a/kmod/src/chunk.h +++ b/kmod/src/chunk.h @@ -3,5 +3,6 @@ void scoutfs_set_chunk_alloc_bits(struct super_block *sb, struct scoutfs_ring_bitmap *bm); +int scoutfs_alloc_chunk(struct super_block *sb, u64 *blkno); #endif diff --git a/kmod/src/manifest.c b/kmod/src/manifest.c index ec1773ac..8666ac49 100644 --- a/kmod/src/manifest.c +++ b/kmod/src/manifest.c @@ -18,6 +18,7 @@ #include "format.h" #include "manifest.h" #include "key.h" +#include "ring.h" /* * The manifest organizes log segment blocks into a tree structure. @@ -195,6 +196,21 @@ int scoutfs_add_manifest(struct super_block *sb, return 0; } +/* + * The caller is writing a new log segment. We add it to the in-memory + * manifest and write it to dirty ring blocks. + * + * XXX we'd also need to add stale manifest entry's to the ring + * XXX In the future we'd send it to the leader + */ +int scoutfs_new_manifest(struct super_block *sb, + struct scoutfs_ring_manifest_entry *ment) +{ + return scoutfs_add_manifest(sb, ment) ?: + scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST, + ment, sizeof(*ment)); +} + /* * Fill the caller's ment with the next log segment in the manifest that * might contain the given key. The ment is initialized to 0 to return diff --git a/kmod/src/manifest.h b/kmod/src/manifest.h index f22b3709..407bfa28 100644 --- a/kmod/src/manifest.h +++ b/kmod/src/manifest.h @@ -6,6 +6,8 @@ void scoutfs_destroy_manifest(struct super_block *sb); int scoutfs_add_manifest(struct super_block *sb, struct scoutfs_ring_manifest_entry *ment); +int scoutfs_new_manifest(struct super_block *sb, + struct scoutfs_ring_manifest_entry *ment); void scoutfs_delete_manifest(struct super_block *sb, u64 blkno); bool scoutfs_next_manifest_segment(struct super_block *sb, diff --git a/kmod/src/ring.c b/kmod/src/ring.c index aeee472b..095c30b2 100644 --- a/kmod/src/ring.c +++ b/kmod/src/ring.c @@ -23,6 +23,7 @@ #include "manifest.h" #include "chunk.h" #include "block.h" +#include "ring.h" static int replay_ring_block(struct super_block *sb, struct buffer_head *bh) { @@ -62,11 +63,11 @@ static int replay_ring_block(struct super_block *sb, struct buffer_head *bh) } /* - * Read a given logical ring block. - * - * Each ring map block entry maps a chunk's worth of ring blocks. + * Return the block number of the block that contains the given logical + * block in the ring. We look up ring block chunks in the map blocks + * in the chunk described by the super. */ -static struct buffer_head *read_ring_block(struct super_block *sb, u64 block) +static u64 map_ring_block(struct super_block *sb, u64 block) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; @@ -85,7 +86,7 @@ static struct buffer_head *read_ring_block(struct super_block *sb, u64 block) bh = scoutfs_read_block(sb, le64_to_cpu(super->ring_map_blkno) + div); if (!bh) - return NULL; + return 0; /* XXX verify map block */ @@ -93,9 +94,35 @@ static struct buffer_head *read_ring_block(struct super_block *sb, u64 block) blkno = le64_to_cpu(map->blknos[rem]) + ring_block; brelse(bh); + return blkno; +} + +/* + * Read a given logical ring block. + */ +static struct buffer_head *read_ring_block(struct super_block *sb, u64 block) +{ + u64 blkno = map_ring_block(sb, block); + + if (!blkno) + return NULL; + return scoutfs_read_block(sb, blkno); } +/* + * Return a dirty locked logical ring block. + */ +static struct buffer_head *dirty_ring_block(struct super_block *sb, u64 block) +{ + u64 blkno = map_ring_block(sb, block); + + if (!blkno) + return NULL; + + return scoutfs_dirty_block(sb, blkno); +} + int scoutfs_replay_ring(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -126,3 +153,99 @@ int scoutfs_replay_ring(struct super_block *sb) return ret; } + +/* + * The caller is generating ring entries for manifest and allocator + * bitmap as they write items to blocks. We pin the block that we're + * working on so that it isn't written out until we fill it and + * calculate its checksum. + */ +int scoutfs_dirty_ring_entry(struct super_block *sb, u8 type, void *data, + u16 len) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct scoutfs_ring_block *ring; + struct scoutfs_ring_entry *ent; + struct buffer_head *bh; + unsigned int avail; + u64 block; + int ret = 0; + + bh = sbi->dirty_ring_bh; + ent = sbi->dirty_ring_ent; + avail = sbi->dirty_ring_ent_avail; + + if (bh && len > avail) { + scoutfs_finish_dirty_ring(sb); + bh = NULL; + } + if (!bh) { + block = le64_to_cpu(super->ring_first_block) + + le64_to_cpu(super->ring_active_blocks); + if (block >= le64_to_cpu(super->ring_total_blocks)) + block -= le64_to_cpu(super->ring_total_blocks); + + bh = dirty_ring_block(sb, block); + if (!bh) { + ret = -ENOMEM; + goto out; + } + + ring = (void *)bh->b_data; + ring->nr_entries = 0; + ent = (void *)(ring + 1); + /* assuming len fits in new empty block */ + } + + ring = (void *)bh->b_data; + + ent->type = type; + ent->len = cpu_to_le16(len); + memcpy(ent + 1, data, len); + le16_add_cpu(&ring->nr_entries, 1); + + ent = (void *)(ent + 1) + le16_to_cpu(ent->len); + avail = SCOUTFS_BLOCK_SIZE - ((char *)(ent + 1) - (char *)ring); +out: + sbi->dirty_ring_bh = bh; + sbi->dirty_ring_ent = ent; + sbi->dirty_ring_ent_avail = avail; + + return ret; +} + +/* + * The super might have a pinned partial dirty ring block. This is + * called as we finish the block or when the commit is done. We + * calculate the checksum and unlock it so it can be written. + * + * XXX This is about to write a partial block. We might as well fill + * that space with more old entries from the manifest and ring before + * we write it. + */ +int scoutfs_finish_dirty_ring(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct buffer_head *bh; + + bh = sbi->dirty_ring_bh; + if (!bh) + return 0; + + sbi->dirty_ring_bh = NULL; + + /* + * XXX we're not zeroing the tail of the block here. We will + * when we change the item block format to let us append to + * the block without walking all the items. + */ + scoutfs_calc_hdr_crc(bh); + unlock_buffer(bh); + brelse(bh); + + le64_add_cpu(&super->ring_active_blocks, 1); + + return 0; +} diff --git a/kmod/src/ring.h b/kmod/src/ring.h index b50b67e3..ee929e20 100644 --- a/kmod/src/ring.h +++ b/kmod/src/ring.h @@ -2,5 +2,8 @@ #define _SCOUTFS_RING_H_ int scoutfs_replay_ring(struct super_block *sb); +int scoutfs_dirty_ring_entry(struct super_block *sb, u8 type, void *data, + u16 len); +int scoutfs_finish_dirty_ring(struct super_block *sb); #endif diff --git a/kmod/src/segment.c b/kmod/src/segment.c index 4fbc1dd2..79b40f4f 100644 --- a/kmod/src/segment.c +++ b/kmod/src/segment.c @@ -23,6 +23,8 @@ #include "segment.h" #include "manifest.h" #include "block.h" +#include "chunk.h" +#include "ring.h" static struct scoutfs_item_header *next_ihdr(struct scoutfs_item_header *ihdr) { @@ -101,3 +103,103 @@ struct scoutfs_item *scoutfs_read_segment_item(struct super_block *sb, return item; } + +static int finish_item_block(struct super_block *sb, struct buffer_head *bh, + void *until) +{ + struct scoutfs_item_block *iblk = (void *)bh->b_data; + struct scoutfs_ring_manifest_entry ment; + + memset(until, 0, (void *)bh->b_data + SCOUTFS_BLOCK_SIZE - until); + scoutfs_calc_hdr_crc(bh); + unlock_buffer(bh); + brelse(bh); + + ment.blkno = cpu_to_le64(bh->b_blocknr); + ment.seq = iblk->hdr.seq; + ment.level = 0; + ment.first = iblk->first; + ment.last = iblk->last; + + return scoutfs_new_manifest(sb, &ment); +} + +/* + * Write all the currently dirty items in newly allocated log segments. + * New ring entries are added as the alloc bitmap is modified and as the + * manifest is updated. If we write out all the item and ring blocks then + * we write a new super that references those new blocks. + */ +int scoutfs_write_dirty_items(struct super_block *sb) +{ + struct address_space *mapping = sb->s_bdev->bd_inode->i_mapping; + struct scoutfs_item_header *ihdr; + struct scoutfs_item_block *iblk; + struct scoutfs_item *item; + struct buffer_head *bh; + int val_space; + u64 blkno; + int ret; + + /* XXX wait until transactions are complete */ + + item = NULL; + iblk = NULL; + while ((item = scoutfs_item_next_dirty(sb, item))) { + + if (iblk && (item->val_len > val_space)) { + iblk = NULL; + ret = finish_item_block(sb, bh, ihdr); + if (ret) + break; + } + + if (!iblk) { + /* get the next item block */ + ret = scoutfs_alloc_chunk(sb, &blkno); + if (ret) + break; + + bh = scoutfs_dirty_block(sb, blkno); + if (!bh) { + ret = -ENOMEM; + break; + } + + iblk = (void *)bh->b_data; + iblk->first = item->key; + iblk->nr_items = 0; + ihdr = (void *)(iblk + 1); + /* XXX assuming that val_space is big enough */ + } + + iblk->last = item->key; + ihdr->key = item->key; + ihdr->len = cpu_to_le16(item->val_len); + memcpy((void *)(ihdr + 1), item->val, item->val_len); + le32_add_cpu(&iblk->nr_items, 1); + + /* XXX assuming that the next ihdr fits */ + ihdr = (void *)(ihdr + 1) + le16_to_cpu(ihdr->len); + val_space = (char *)iblk + SCOUTFS_BLOCK_SIZE - + (char *)(ihdr + 1); + } + + scoutfs_item_put(item); /* only if the loop aborted */ + + /* finish writing if we did work and haven't failed */ + if (iblk && !ret) { + ret = finish_item_block(sb, bh, ihdr) ?: + scoutfs_finish_dirty_ring(sb) ?: + filemap_write_and_wait(mapping) ?: + scoutfs_write_dirty_super(sb); + if (!ret) { + scoutfs_advance_dirty_super(sb); + scoutfs_item_all_clean(sb); + } + } + + /* XXX better tear down down in the error case */ + + return ret; +} diff --git a/kmod/src/segment.h b/kmod/src/segment.h index 6ae33f42..4755e5cd 100644 --- a/kmod/src/segment.h +++ b/kmod/src/segment.h @@ -3,5 +3,6 @@ struct scoutfs_item *scoutfs_read_segment_item(struct super_block *sb, struct scoutfs_key *key); +int scoutfs_write_dirty_items(struct super_block *sb); #endif diff --git a/kmod/src/super.c b/kmod/src/super.c index 3f9d001f..3bdfc743 100644 --- a/kmod/src/super.c +++ b/kmod/src/super.c @@ -26,12 +26,67 @@ #include "block.h" #include "manifest.h" #include "ring.h" +#include "segment.h" + +static int scoutfs_sync_fs(struct super_block *sb, int wait) +{ + /* XXX always waiting */ + return scoutfs_write_dirty_items(sb); +} static const struct super_operations scoutfs_super_ops = { .alloc_inode = scoutfs_alloc_inode, .destroy_inode = scoutfs_destroy_inode, + .sync_fs = scoutfs_sync_fs, }; +/* + * The caller advances the block number and sequence number in the super + * every time it wants to dirty it and eventually write it to reference + * dirty data that's been written. + */ +void scoutfs_advance_dirty_super(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + u64 blkno; + + blkno = le64_to_cpu(super->hdr.blkno) - SCOUTFS_SUPER_BLKNO; + if (++blkno == SCOUTFS_SUPER_NR) + blkno = 0; + super->hdr.blkno = cpu_to_le64(SCOUTFS_SUPER_BLKNO + blkno); + + le64_add_cpu(&super->hdr.seq, 1); +} + +/* + * We've been modifying the super copy in the info as we made changes. + * Write the super to finalize. + */ +int scoutfs_write_dirty_super(struct super_block *sb) +{ + struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); + struct scoutfs_super_block *super = &sbi->super; + struct buffer_head *bh; + size_t sz; + int ret; + + bh = scoutfs_dirty_block(sb, le64_to_cpu(super->hdr.blkno)); + if (!bh) + return -ENOMEM; + + sz = sizeof(struct scoutfs_super_block); + memcpy(bh->b_data, super, sz); + memset(bh->b_data + sz, 0, SCOUTFS_BLOCK_SIZE - sz); + scoutfs_calc_hdr_crc(bh); + + unlock_buffer(bh); + ret = sync_dirty_buffer(bh); + brelse(bh); + + return ret; +} + static int read_supers(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); @@ -114,6 +169,7 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) spin_lock_init(&sbi->item_lock); sbi->item_root = RB_ROOT; sbi->dirty_item_root = RB_ROOT; + spin_lock_init(&sbi->chunk_alloc_lock); if (!sb_set_blocksize(sb, SCOUTFS_BLOCK_SIZE)) { printk(KERN_ERR "couldn't set blocksize\n"); @@ -140,6 +196,8 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent) if (!sb->s_root) return -ENOMEM; + scoutfs_advance_dirty_super(sb); + return 0; } @@ -151,8 +209,8 @@ static struct dentry *scoutfs_mount(struct file_system_type *fs_type, int flags, static void scoutfs_kill_sb(struct super_block *sb) { - scoutfs_destroy_manifest(sb); kill_block_super(sb); + scoutfs_destroy_manifest(sb); kfree(sb->s_fs_info); } diff --git a/kmod/src/super.h b/kmod/src/super.h index d7229877..604448ca 100644 --- a/kmod/src/super.h +++ b/kmod/src/super.h @@ -18,7 +18,14 @@ struct scoutfs_sb_info { struct scoutfs_manifest *mani; + spinlock_t chunk_alloc_lock; __le64 *chunk_alloc_bits; + + /* pinned dirty ring block during commit */ + struct buffer_head *dirty_ring_bh; + struct scoutfs_ring_entry *dirty_ring_ent; + unsigned int dirty_ring_ent_avail; + }; static inline struct scoutfs_sb_info *SCOUTFS_SB(struct super_block *sb) @@ -26,4 +33,7 @@ static inline struct scoutfs_sb_info *SCOUTFS_SB(struct super_block *sb) return sb->s_fs_info; } +void scoutfs_advance_dirty_super(struct super_block *sb); +int scoutfs_write_dirty_super(struct super_block *sb); + #endif