scoutfs: first step towards multiple btrees

Starting to implement LSM merging made me really question if it is the
right approach.  I'd like to try an experiment to see if we can get our
concurrent writes done with much simpler btrees.

This commit removes all the functionality that derives from the large
LSM segments and distributing the manifest.

What's left is a multi-page block layer and the husk of the btree
implementation which will give people access to items.  Callers that
work with items get translated to the btree interface.

This gets as far as reading the super block but the format changes and
large block size mean that the crc check fails and the mount returns an
error.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2016-04-10 20:45:29 -07:00
parent a07b41fa8b
commit 5369fa1e05
28 changed files with 373 additions and 3788 deletions

View File

@@ -2,6 +2,5 @@ obj-$(CONFIG_SCOUTFS_FS) := scoutfs.o
CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include
scoutfs-y += block.o bloom.o counters.o chunk.o crc.o dir.o filerw.o inode.o \
ival.o manifest.o msg.o ring.o scoutfs_trace.o segment.o skip.o \
super.o
scoutfs-y += block.o counters.o crc.o dir.o filerw.o inode.o msg.o \
scoutfs_trace.o super.o

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2015 Versity Software, Inc. All rights reserved.
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
@@ -10,73 +10,206 @@
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/buffer_head.h>
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/radix-tree.h>
#include <linux/mm.h>
#include <linux/bio.h>
#include "super.h"
#include "format.h"
#include "block.h"
#include "crc.h"
#include "counters.h"
#define BH_Private_Verified BH_PrivateStart
/*
* XXX
* - tie into reclaim
* - per cpu lru of refs?
* - relax locking
* - get, check, and fill slots instead of full radix walks
* - block slab
* - maybe more clever wait functions
*/
BUFFER_FNS(Private_Verified, private_verified)
static struct scoutfs_block *alloc_block(struct super_block *sb, u64 blkno)
{
struct scoutfs_block *bl;
struct page *page;
static void verify_block_header(struct super_block *sb, struct buffer_head *bh)
/* we'd need to be just a bit more careful */
BUILD_BUG_ON(PAGE_SIZE > SCOUTFS_BLOCK_SIZE);
bl = kzalloc(sizeof(struct scoutfs_block), GFP_NOFS);
if (bl) {
page = alloc_pages(GFP_NOFS, SCOUTFS_BLOCK_PAGE_ORDER);
WARN_ON_ONCE(!page);
if (page) {
init_rwsem(&bl->rwsem);
atomic_set(&bl->refcount, 1);
bl->blkno = blkno;
bl->sb = sb;
bl->page = page;
bl->data = page_address(page);
scoutfs_inc_counter(sb, block_mem_alloc);
} else {
kfree(bl);
bl = NULL;
}
}
return bl;
}
void scoutfs_put_block(struct scoutfs_block *bl)
{
if (!IS_ERR_OR_NULL(bl) && atomic_dec_and_test(&bl->refcount)) {
__free_pages(bl->page, SCOUTFS_BLOCK_PAGE_ORDER);
kfree(bl);
scoutfs_inc_counter(bl->sb, block_mem_free);
}
}
static int verify_block_header(struct super_block *sb, struct scoutfs_block *bl)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_block_header *hdr = (void *)bh->b_data;
struct scoutfs_block_header *hdr = bl->data;
u32 crc = scoutfs_crc_block(hdr);
u64 blkno = bh->b_blocknr;
int ret = -EIO;
if (le32_to_cpu(hdr->crc) != crc) {
printk("blkno %llu hdr crc %x != calculated %x\n", blkno,
printk("blkno %llu hdr crc %x != calculated %x\n", bl->blkno,
le32_to_cpu(hdr->crc), crc);
} else if (super->hdr.fsid && hdr->fsid != super->hdr.fsid) {
printk("blkno %llu fsid %llx != super fsid %llx\n", blkno,
printk("blkno %llu fsid %llx != super fsid %llx\n", bl->blkno,
le64_to_cpu(hdr->fsid), le64_to_cpu(super->hdr.fsid));
} else if (le64_to_cpu(hdr->blkno) != blkno) {
printk("blkno %llu invalid hdr blkno %llx\n", blkno,
} else if (le64_to_cpu(hdr->blkno) != bl->blkno) {
printk("blkno %llu invalid hdr blkno %llx\n", bl->blkno,
le64_to_cpu(hdr->blkno));
} else {
set_buffer_private_verified(bh);
ret = 0;
}
return ret;
}
static void block_read_end_io(struct bio *bio, int err)
{
struct scoutfs_block *bl = bio->bi_private;
struct scoutfs_sb_info *sbi = SCOUTFS_SB(bl->sb);
if (!err && !verify_block_header(bl->sb, bl))
set_bit(SCOUTFS_BLOCK_BIT_UPTODATE, &bl->bits);
else
set_bit(SCOUTFS_BLOCK_BIT_ERROR, &bl->bits);
/*
* uncontended spin_lock in wake_up and unconditional smp_mb to
* make waitqueue_active safe are about the same cost, so we
* prefer the obviously safe choice.
*/
wake_up(&sbi->block_wq);
scoutfs_put_block(bl);
}
static int block_submit_bio(struct scoutfs_block *bl, int rw)
{
struct super_block *sb = bl->sb;
struct bio *bio;
int ret;
bio = bio_alloc(GFP_NOFS, SCOUTFS_PAGES_PER_BLOCK);
if (WARN_ON_ONCE(!bio))
return -ENOMEM;
bio->bi_sector = bl->blkno << (SCOUTFS_BLOCK_SHIFT - 9);
bio->bi_bdev = sb->s_bdev;
/* XXX can we do that? */
ret = bio_add_page(bio, bl->page, SCOUTFS_BLOCK_SIZE, 0);
if (rw & WRITE)
;
else
bio->bi_end_io = block_read_end_io;
bio->bi_private = bl;
atomic_inc(&bl->refcount);
submit_bio(rw, bio);
return 0;
}
/*
* Read an existing block from the device and verify its metadata header.
*/
struct buffer_head *scoutfs_read_block(struct super_block *sb, u64 blkno)
struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno)
{
struct buffer_head *bh;
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_block *found;
struct scoutfs_block *bl;
int ret;
bh = sb_bread(sb, blkno);
if (!bh || buffer_private_verified(bh))
return bh;
/* find an existing block, dropping if it's errored */
spin_lock(&sbi->block_lock);
lock_buffer(bh);
if (!buffer_private_verified(bh))
verify_block_header(sb, bh);
unlock_buffer(bh);
if (!buffer_private_verified(bh)) {
brelse(bh);
bh = NULL;
bl = radix_tree_lookup(&sbi->block_radix, blkno);
if (bl && test_bit(SCOUTFS_BLOCK_BIT_ERROR, &bl->bits)) {
radix_tree_delete(&sbi->block_radix, bl->blkno);
scoutfs_put_block(bl);
bl = NULL;
}
return bh;
}
spin_unlock(&sbi->block_lock);
if (bl)
goto wait;
/*
* Read the block that contains the given byte offset in the given chunk.
*/
struct buffer_head *scoutfs_read_block_off(struct super_block *sb, u64 blkno,
u32 off)
{
if (WARN_ON_ONCE(off >= SCOUTFS_CHUNK_SIZE))
return ERR_PTR(-EINVAL);
/* allocate a new block and try to insert it */
bl = alloc_block(sb, blkno);
if (!bl) {
ret = -EIO;
goto out;
}
return scoutfs_read_block(sb, blkno + (off >> SCOUTFS_BLOCK_SHIFT));
ret = radix_tree_preload(GFP_NOFS);
if (ret)
goto out;
spin_lock(&sbi->block_lock);
found = radix_tree_lookup(&sbi->block_radix, blkno);
if (found) {
scoutfs_put_block(bl);
bl = found;
} else {
radix_tree_insert(&sbi->block_radix, blkno, bl);
atomic_inc(&bl->refcount);
}
spin_unlock(&sbi->block_lock);
radix_tree_preload_end();
if (!found) {
ret = block_submit_bio(bl, READ_SYNC | REQ_META);
if (ret)
goto out;
}
wait:
ret = wait_event_interruptible(sbi->block_wq,
test_bit(SCOUTFS_BLOCK_BIT_UPTODATE, &bl->bits) ||
test_bit(SCOUTFS_BLOCK_BIT_ERROR, &bl->bits));
if (test_bit(SCOUTFS_BLOCK_BIT_UPTODATE, &bl->bits))
ret = 0;
else if (test_bit(SCOUTFS_BLOCK_BIT_ERROR, &bl->bits))
ret = -EIO;
out:
if (ret) {
scoutfs_put_block(bl);
bl = ERR_PTR(ret);
}
return bl;
}
/*
@@ -85,33 +218,56 @@ struct buffer_head *scoutfs_read_block_off(struct super_block *sb, u64 blkno,
* serializing access to the block and for zeroing unwritten block
* contents.
*/
struct buffer_head *scoutfs_new_block(struct super_block *sb, u64 blkno)
struct scoutfs_block *scoutfs_new_block(struct super_block *sb, u64 blkno)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_block_header *hdr;
struct buffer_head *bh;
struct scoutfs_block *found;
struct scoutfs_block *bl;
int ret;
bh = sb_getblk(sb, blkno);
if (bh) {
if (!buffer_uptodate(bh) || buffer_private_verified(bh)) {
lock_buffer(bh);
set_buffer_uptodate(bh);
set_buffer_private_verified(bh);
unlock_buffer(bh);
}
hdr = (void *)bh->b_data;
*hdr = super->hdr;
hdr->blkno = cpu_to_le64(blkno);
/* allocate a new block and try to insert it */
bl = alloc_block(sb, blkno);
if (!bl) {
ret = -EIO;
goto out;
}
return bh;
set_bit(SCOUTFS_BLOCK_BIT_UPTODATE, &bl->bits);
ret = radix_tree_preload(GFP_NOFS);
if (ret)
goto out;
hdr = bl->data;
*hdr = sbi->super.hdr;
hdr->blkno = cpu_to_le64(blkno);
spin_lock(&sbi->block_lock);
found = radix_tree_lookup(&sbi->block_radix, blkno);
if (found) {
radix_tree_delete(&sbi->block_radix, blkno);
scoutfs_put_block(found);
}
radix_tree_insert(&sbi->block_radix, blkno, bl);
atomic_inc(&bl->refcount);
spin_unlock(&sbi->block_lock);
radix_tree_preload_end();
ret = 0;
out:
if (ret) {
scoutfs_put_block(bl);
bl = ERR_PTR(ret);
}
return bl;
}
void scoutfs_calc_hdr_crc(struct buffer_head *bh)
void scoutfs_calc_hdr_crc(struct scoutfs_block *bl)
{
struct scoutfs_block_header *hdr = (void *)bh->b_data;
struct scoutfs_block_header *hdr = bl->data;
hdr->crc = cpu_to_le32(scoutfs_crc_block(hdr));
}

View File

@@ -1,10 +1,30 @@
#ifndef _SCOUTFS_BLOCK_H_
#define _SCOUTFS_BLOCK_H_
struct buffer_head *scoutfs_read_block(struct super_block *sb, u64 blkno);
struct buffer_head *scoutfs_read_block_off(struct super_block *sb, u64 blkno,
u32 off);
struct buffer_head *scoutfs_new_block(struct super_block *sb, u64 blkno);
void scoutfs_calc_hdr_crc(struct buffer_head *bh);
#include <linux/fs.h>
#include <linux/rwlock.h>
#include <linux/atomic.h>
#define SCOUTFS_BLOCK_BIT_UPTODATE (1 << 0)
#define SCOUTFS_BLOCK_BIT_ERROR (1 << 1)
struct scoutfs_block {
struct rw_semaphore rwsem;
atomic_t refcount;
u64 blkno;
unsigned long bits;
struct super_block *sb;
/* only high order page alloc for now */
struct page *page;
void *data;
};
struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno);
struct scoutfs_block *scoutfs_new_block(struct super_block *sb, u64 blkno);
void scoutfs_put_block(struct scoutfs_block *bl);
void scoutfs_calc_hdr_crc(struct scoutfs_block *bl);
#endif

View File

@@ -1,132 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/buffer_head.h>
#include <linux/random.h>
#include <linux/crc32c.h>
#include "super.h"
#include "format.h"
#include "block.h"
#include "bloom.h"
#include "scoutfs_trace.h"
/*
* Each log segment starts with a bloom filters that spans multiple
* blocks. It's used to test for the presence of key in the log segment
* without having to read and search the much larger array of items and
* their keys.
*/
/* XXX garbage hack until we have siphash */
static u32 bloom_hash(struct scoutfs_key *key, __le32 salt)
{
return crc32c(le32_to_cpu(salt), key, sizeof(struct scoutfs_key));
}
/*
* Find the bits in the bloom filter for the given key. The caller calculates
* these once and uses them to test all the blocks.
*/
void scoutfs_calc_bloom_bits(struct scoutfs_bloom_bits *bits,
struct scoutfs_key *key, __le32 *salts)
{
unsigned h_bits = 0;
unsigned int b;
unsigned s = 0;
u64 h = 0;
int i;
BUILD_BUG_ON(SCOUTFS_BLOOM_BIT_WIDTH > 32);
for (i = 0; i < SCOUTFS_BLOOM_BITS; i++) {
if (h_bits < SCOUTFS_BLOOM_BIT_WIDTH) {
h = (h << 32) | bloom_hash(key, salts[s++]);
h_bits += 32;
}
b = h & SCOUTFS_BLOOM_BIT_MASK;
h >>= SCOUTFS_BLOOM_BIT_WIDTH;
h_bits -= SCOUTFS_BLOOM_BIT_WIDTH;
bits->block[i] = (b / SCOUTFS_BLOOM_BITS_PER_BLOCK) %
SCOUTFS_BLOOM_BLOCKS;
bits->bit_off[i] = b % SCOUTFS_BLOOM_BITS_PER_BLOCK;
}
}
/*
* Set the caller's bit numbers in the bloom filter contained in bloom
* blocks starting at the given block number. The caller has
* initialized the blocks and is responsible for locking and dirtying
* and writeout.
*/
int scoutfs_set_bloom_bits(struct super_block *sb, u64 blkno,
struct scoutfs_bloom_bits *bits)
{
struct scoutfs_bloom_block *blm;
struct buffer_head *bh;
int ret = 0;
int i;
for (i = 0; i < SCOUTFS_BLOOM_BITS; i++) {
bh = scoutfs_read_block(sb, blkno + bits->block[i]);
if (!bh) {
ret = -EIO;
break;
}
blm = (void *)bh->b_data;
set_bit_le(bits->bit_off[i], blm->bits);
brelse(bh);
}
return ret;
}
/*
* Returns zero if the bits' key can't be found in the block, true if it
* might, and -errno if IO fails.
*/
int scoutfs_test_bloom_bits(struct super_block *sb, u64 blkno,
struct scoutfs_key *key,
struct scoutfs_bloom_bits *bits)
{
struct scoutfs_bloom_block *blm;
struct buffer_head *bh;
int ret;
int i;
for (i = 0; i < SCOUTFS_BLOOM_BITS; i++) {
bh = scoutfs_read_block(sb, blkno + bits->block[i]);
if (!bh) {
ret = -EIO;
break;
}
blm = (void *)bh->b_data;
ret = !!test_bit_le(bits->bit_off[i], blm->bits);
brelse(bh);
if (!ret)
break;
}
if (ret)
trace_scoutfs_bloom_hit(key);
else
trace_scoutfs_bloom_miss(key);
return ret;
}

View File

@@ -1,17 +0,0 @@
#ifndef _SCOUTFS_BLOOM_H_
#define _SCOUTFS_BLOOM_H_
struct scoutfs_bloom_bits {
u16 bit_off[SCOUTFS_BLOOM_BITS];
u8 block[SCOUTFS_BLOOM_BITS];
};
void scoutfs_calc_bloom_bits(struct scoutfs_bloom_bits *bits,
struct scoutfs_key *key, __le32 *salts);
int scoutfs_test_bloom_bits(struct super_block *sb, u64 blkno,
struct scoutfs_key *key,
struct scoutfs_bloom_bits *bits);
int scoutfs_set_bloom_bits(struct super_block *sb, u64 blkno,
struct scoutfs_bloom_bits *bits);
#endif

58
kmod/src/btree.h Normal file
View File

@@ -0,0 +1,58 @@
#ifndef _SCOUTFS_BTREE_H_
#define _SCOUTFS_BTREE_H_
struct scoutfs_btree_cursor {
/* for btree.c */
struct scoutfs_block *bl;
struct scoutfs_btree_item *item;
/* for callers */
struct scoutfs_key *key;
unsigned val_len;
void *val;
};
static inline int scoutfs_btree_lookup(struct super_block *sb,
struct scoutfs_key *key,
struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
static inline int scoutfs_btree_insert(struct super_block *sb,
struct scoutfs_key *key,
unsigned short val_len,
struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
static inline int scoutfs_btree_dirty(struct super_block *sb,
struct scoutfs_key *key,
unsigned short val_len,
struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
static inline int scoutfs_btree_delete(struct super_block *sb,
struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
static inline int scoutfs_btree_next(struct super_block *sb,
struct scoutfs_key *first,
struct scoutfs_key *last,
struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
static inline int scoutfs_btree_release(struct scoutfs_btree_cursor *curs)
{
return -ENOSYS;
}
#endif

View File

@@ -1,89 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/magic.h>
#include <linux/buffer_head.h>
#include <linux/random.h>
#include "super.h"
#include "format.h"
#include "inode.h"
#include "dir.h"
#include "msg.h"
#include "block.h"
#include "ring.h"
#include "chunk.h"
void scoutfs_set_chunk_alloc_bits(struct super_block *sb,
struct scoutfs_ring_bitmap *bm)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
u64 off = le32_to_cpu(bm->offset) * ARRAY_SIZE(bm->bits);
/* XXX check for corruption */
sbi->chunk_alloc_bits[off] = bm->bits[0];
sbi->chunk_alloc_bits[off + 1] = bm->bits[1];
}
/*
* Return the block number of the first block in a free chunk.
*
* The region around the cleared free bit for the allocation is always
* added to the ring and will generate a ton of overlapping ring
* entries. This is fine for initial testing but won't be good enough
* for real use. We'll have a bitmap of dirtied regions that are only
* logged as the update is written out.
*/
int scoutfs_alloc_chunk(struct super_block *sb, u64 *blkno)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
unsigned long size = le64_to_cpu(super->total_chunks);
struct scoutfs_ring_bitmap bm;
unsigned long off;
unsigned long bit;
int ret;
spin_lock(&sbi->chunk_alloc_lock);
bit = find_next_bit_le(sbi->chunk_alloc_bits, size, 0);
if (bit >= size) {
ret = -ENOSPC;
} else {
clear_bit_le(bit, sbi->chunk_alloc_bits);
off = round_down(bit, sizeof(bm.bits) * 8);
bm.offset = cpu_to_le32(off);
off *= ARRAY_SIZE(bm.bits);
bm.bits[0] = sbi->chunk_alloc_bits[off];
bm.bits[1] = sbi->chunk_alloc_bits[off + 1];
*blkno = bit << SCOUTFS_CHUNK_BLOCK_SHIFT;
ret = 0;
}
spin_unlock(&sbi->chunk_alloc_lock);
if (!ret) {
ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_BITMAP, &bm,
sizeof(bm));
WARN_ON_ONCE(ret);
}
return ret;
}

View File

@@ -1,8 +0,0 @@
#ifndef _SCOUTFS_CHUNK_H_
#define _SCOUTFS_CHUNK_H_
void scoutfs_set_chunk_alloc_bits(struct super_block *sb,
struct scoutfs_ring_bitmap *bm);
int scoutfs_alloc_chunk(struct super_block *sb, u64 *blkno);
#endif

View File

@@ -12,14 +12,11 @@
* other places by this macro. Don't forget to update LAST_COUNTER.
*/
#define EXPAND_EACH_COUNTER \
EXPAND_COUNTER(skip_lookup) \
EXPAND_COUNTER(skip_insert) \
EXPAND_COUNTER(skip_search) \
EXPAND_COUNTER(skip_delete) \
EXPAND_COUNTER(skip_next) \
EXPAND_COUNTER(block_mem_alloc) \
EXPAND_COUNTER(block_mem_free)
#define FIRST_COUNTER skip_lookup
#define LAST_COUNTER skip_next
#define FIRST_COUNTER block_mem_alloc
#define LAST_COUNTER block_mem_free
#undef EXPAND_COUNTER
#define EXPAND_COUNTER(which) struct percpu_counter which;

View File

@@ -20,8 +20,8 @@
#include "dir.h"
#include "inode.h"
#include "key.h"
#include "segment.h"
#include "super.h"
#include "btree.h"
/*
* Directory entries are stored in entries with offsets calculated from
@@ -114,9 +114,9 @@ static unsigned int dent_bytes(unsigned int name_len)
return sizeof(struct scoutfs_dirent) + name_len;
}
static unsigned int item_name_len(struct scoutfs_item_ref *ref)
static unsigned int item_name_len(struct scoutfs_btree_cursor *curs)
{
return ref->val_len - sizeof(struct scoutfs_dirent);
return curs->val_len - sizeof(struct scoutfs_dirent);
}
/*
* Store the dirent item hash in the dentry so that we don't have to
@@ -176,8 +176,8 @@ static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry,
unsigned int flags)
{
struct scoutfs_inode_info *si = SCOUTFS_I(dir);
struct scoutfs_btree_cursor curs = {NULL,};
struct super_block *sb = dir->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_dirent *dent;
struct dentry_info *di;
struct scoutfs_key key;
@@ -209,15 +209,14 @@ static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry,
h = name_hash(dentry->d_name.name, dentry->d_name.len, h);
scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, h);
scoutfs_put_ref(&ref);
ret = scoutfs_read_item(sb, &key, &ref);
ret = scoutfs_btree_lookup(sb, &key, &curs);
if (ret == -ENOENT)
continue;
if (ret < 0)
break;
dent = ref.val;
name_len = item_name_len(&ref);
dent = curs.val;
name_len = item_name_len(&curs);
if (names_equal(dentry->d_name.name, dentry->d_name.len,
dent->name, name_len)) {
ino = le64_to_cpu(dent->ino);
@@ -228,7 +227,7 @@ static struct dentry *scoutfs_lookup(struct inode *dir, struct dentry *dentry,
}
}
scoutfs_put_ref(&ref);
scoutfs_btree_release(&curs);
out:
if (ret == -ENOENT) {
inode = NULL;
@@ -275,12 +274,11 @@ static int scoutfs_readdir(struct file *file, void *dirent, filldir_t filldir)
{
struct inode *inode = file_inode(file);
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_btree_cursor curs = {NULL,};
struct scoutfs_dirent *dent;
struct scoutfs_key first;
struct scoutfs_key last;
unsigned int name_len;
LIST_HEAD(iter_list);
int ret = 0;
u32 pos;
@@ -294,14 +292,13 @@ static int scoutfs_readdir(struct file *file, void *dirent, filldir_t filldir)
scoutfs_set_key(&first, scoutfs_ino(inode), SCOUTFS_DIRENT_KEY,
file->f_pos);
scoutfs_put_ref(&ref);
ret = scoutfs_next_item(sb, &first, &last, &iter_list, &ref);
ret = scoutfs_btree_next(sb, &first, &last, &curs);
if (ret)
break;
dent = ref.val;
name_len = item_name_len(&ref);
pos = scoutfs_key_offset(ref.key);
dent = curs.val;
name_len = item_name_len(&curs);
pos = scoutfs_key_offset(curs.key);
if (filldir(dirent, dent->name, name_len, pos,
le64_to_cpu(dent->ino), dentry_type(dent->type)))
@@ -310,8 +307,7 @@ static int scoutfs_readdir(struct file *file, void *dirent, filldir_t filldir)
file->f_pos = pos + 1;
}
scoutfs_put_ref(&ref);
scoutfs_put_iter_list(&iter_list);
scoutfs_btree_release(&curs);
if (ret == -ENOENT)
ret = 0;
@@ -324,9 +320,9 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
{
struct super_block *sb = dir->i_sb;
struct scoutfs_inode_info *si = SCOUTFS_I(dir);
struct scoutfs_btree_cursor curs = {NULL,};
struct inode *inode = NULL;
struct scoutfs_dirent *dent;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct dentry_info *di;
struct scoutfs_key key;
int bytes;
@@ -356,7 +352,7 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
h = name_hash(dentry->d_name.name, dentry->d_name.len, h);
scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, h);
ret = scoutfs_create_item(sb, &key, bytes, &ref);
ret = scoutfs_btree_insert(sb, &key, bytes, &curs);
if (ret != -EEXIST)
break;
}
@@ -366,13 +362,13 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
goto out;
}
dent = ref.val;
dent = curs.val;
dent->ino = cpu_to_le64(scoutfs_ino(inode));
dent->type = mode_to_type(inode->i_mode);
memcpy(dent->name, dentry->d_name.name, dentry->d_name.len);
di->hash = h;
scoutfs_put_ref(&ref);
scoutfs_btree_release(&curs);
i_size_write(dir, i_size_read(dir) + dentry->d_name.len);
dir->i_mtime = dir->i_ctime = CURRENT_TIME;
@@ -417,7 +413,7 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
struct super_block *sb = dir->i_sb;
struct inode *inode = dentry->d_inode;
struct timespec ts = current_kernel_time();
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_btree_cursor curs = {NULL,};
struct dentry_info *di;
struct scoutfs_key key;
int ret = 0;
@@ -436,12 +432,12 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, di->hash);
ret = scoutfs_read_item(sb, &key, &ref);
ret = scoutfs_btree_lookup(sb, &key, &curs);
if (ret)
goto out;
ret = scoutfs_delete_item(sb, &ref);
scoutfs_put_ref(&ref);
ret = scoutfs_btree_delete(sb, &curs);
scoutfs_btree_release(&curs);
if (ret)
goto out;

View File

@@ -15,11 +15,11 @@
#include <linux/pagemap.h>
#include "format.h"
#include "segment.h"
#include "inode.h"
#include "key.h"
#include "filerw.h"
#include "scoutfs_trace.h"
#include "btree.h"
/*
* File data is stored in items just like everything else. This is very
@@ -61,8 +61,8 @@ static bool map_data_region(struct data_region *dr, u64 pos, struct page *page)
dr->item_off = do_div(pos, SCOUTFS_MAX_ITEM_LEN);
dr->item_key = pos;
dr->len = min(SCOUTFS_MAX_ITEM_LEN - dr->item_off,
PAGE_SIZE - dr->page_off);
dr->len = min_t(int, SCOUTFS_MAX_ITEM_LEN - dr->item_off,
PAGE_SIZE - dr->page_off);
return true;
}
@@ -81,8 +81,8 @@ static bool map_data_region(struct data_region *dr, u64 pos, struct page *page)
static int scoutfs_readpage(struct file *file, struct page *page)
{
struct inode *inode = file->f_mapping->host;
struct scoutfs_btree_cursor curs = {NULL,};
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_key key;
struct data_region dr;
int ret = 0;
@@ -93,7 +93,7 @@ static int scoutfs_readpage(struct file *file, struct page *page)
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_DATA_KEY,
dr.item_key);
ret = scoutfs_read_item(sb, &key, &ref);
ret = scoutfs_btree_lookup(sb, &key, &curs);
if (ret == -ENOENT) {
addr = kmap_atomic(page);
memset(addr + dr.page_off, 0, dr.len);
@@ -104,7 +104,7 @@ static int scoutfs_readpage(struct file *file, struct page *page)
break;
addr = kmap_atomic(page);
memcpy(addr + dr.page_off, ref.val + dr.item_off, dr.len);
memcpy(addr + dr.page_off, curs.val + dr.item_off, dr.len);
kunmap_atomic(addr);
}
@@ -125,8 +125,8 @@ static int scoutfs_readpage(struct file *file, struct page *page)
static int scoutfs_writepage(struct page *page, struct writeback_control *wbc)
{
struct inode *inode = page->mapping->host;
struct scoutfs_btree_cursor curs = {NULL,};
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_key key;
struct data_region dr;
void *addr;
@@ -139,19 +139,19 @@ static int scoutfs_writepage(struct page *page, struct writeback_control *wbc)
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_DATA_KEY,
dr.item_key);
ret = scoutfs_dirty_item(sb, &key, SCOUTFS_MAX_ITEM_LEN, &ref);
/* XXX dirty */
ret = scoutfs_btree_insert(sb, &key, SCOUTFS_MAX_ITEM_LEN,
&curs);
if (ret)
break;
addr = kmap_atomic(page);
memcpy(ref.val + dr.item_off, addr + dr.page_off, dr.len);
memcpy(curs.val + dr.item_off, addr + dr.page_off, dr.len);
kunmap_atomic(addr);
scoutfs_put_ref(&ref);
}
scoutfs_put_ref(&ref);
scoutfs_btree_release(&curs);
if (ret) {
SetPageError(page);

View File

@@ -6,27 +6,12 @@
/* super block id */
#define SCOUTFS_SUPER_ID 0x2e736674756f6373ULL /* "scoutfs." */
/*
* Everything is stored in and addressed as 4k fixed size blocks. This
* avoids having to manage contiguous cpu mappings of larger blocks.
* Larger structures are read and written as multiple blocks.
*/
#define SCOUTFS_BLOCK_SHIFT 12
#define SCOUTFS_BLOCK_SHIFT 14
#define SCOUTFS_BLOCK_SIZE (1 << SCOUTFS_BLOCK_SHIFT)
#define SCOUTFS_BLOCK_MASK (SCOUTFS_BLOCK_SIZE - 1)
/*
* The allocator works on larger chunks. Smaller metadata structures
* like the super blocks and the ring are stored in chunks.
*
* A log segment is a collection of smaller blocks (bloom filter, item blocks)
* stored in a chunk.
*/
#define SCOUTFS_CHUNK_SHIFT 22
#define SCOUTFS_CHUNK_SIZE (1 << SCOUTFS_CHUNK_SHIFT)
#define SCOUTFS_CHUNK_BLOCK_SHIFT (SCOUTFS_CHUNK_SHIFT - SCOUTFS_BLOCK_SHIFT)
#define SCOUTFS_CHUNK_BLOCK_MASK ((1 << SCOUTFS_CHUNK_BLOCK_SHIFT) - 1)
#define SCOUTFS_BLOCKS_PER_CHUNK (1 << SCOUTFS_CHUNK_BLOCK_SHIFT)
#define SCOUTFS_PAGES_PER_BLOCK (SCOUTFS_BLOCK_SIZE / PAGE_SIZE)
#define SCOUTFS_BLOCK_PAGE_ORDER (SCOUTFS_BLOCK_SHIFT - PAGE_SHIFT)
/*
* The super blocks leave some room at the start of the first block for
@@ -35,22 +20,6 @@
#define SCOUTFS_SUPER_BLKNO ((64 * 1024) >> SCOUTFS_BLOCK_SHIFT)
#define SCOUTFS_SUPER_NR 2
/*
* The bloom filters are statically sized. It's a tradeoff between
* storage overhead and false positive rate. At the moment we have
* as few as 1000 and as many as 18000 items in a segment. We can
* get a ~1% false positive rate (triggering header search) rate at
* the high end with a ~20k bloom filter.
*
* n = 18,000, p = 0.01 (1 in 100) → m = 172,532 (21.06KB), k = 7
*/
#define SCOUTFS_BLOOM_BITS 7
#define SCOUTFS_BLOOM_BIT_WIDTH 18 /* 2^18 > m */
#define SCOUTFS_BLOOM_BIT_MASK ((1 << SCOUTFS_BLOOM_BIT_WIDTH) - 1)
#define SCOUTFS_BLOOM_BLOCKS ((20 * 1024) / SCOUTFS_BLOCK_SIZE)
#define SCOUTFS_BLOOM_SALTS \
DIV_ROUND_UP(SCOUTFS_BLOOM_BITS * SCOUTFS_BLOOM_BIT_WIDTH, 32)
/*
* This header is found at the start of every block so that we can
* verify that it's what we were looking for. The crc and padding
@@ -81,14 +50,6 @@ struct scoutfs_super_block {
struct scoutfs_block_header hdr;
__le64 id;
__u8 uuid[SCOUTFS_UUID_BYTES];
__le32 bloom_salts[SCOUTFS_BLOOM_SALTS];
__le64 total_chunks;
__le64 ring_map_blkno;
__le64 ring_map_seq;
__le64 ring_first_block;
__le64 ring_active_blocks;
__le64 ring_total_blocks;
__le64 ring_seq;
} __packed;
/*
@@ -112,110 +73,7 @@ struct scoutfs_key {
#define SCOUTFS_DIRENT_KEY 2
#define SCOUTFS_DATA_KEY 3
struct scoutfs_ring_map_block {
struct scoutfs_block_header hdr;
__le32 nr_chunks;
__le64 blknos[0];
} __packed;
#define SCOUTFS_RING_MAP_BLOCKS \
((SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_ring_map_block)) / \
sizeof(__le64))
struct scoutfs_ring_entry {
u8 type;
__le16 len;
} __packed;
/*
* Ring blocks are stored in chunks described by the ring map blocks.
*
* The manifest entries describe the position of a given log segment in
* the manifest. They're keyed by the block number so that we can
* record movement of a log segment in the manifest with one ring entry
* and we can record deletion with just the block number.
*/
struct scoutfs_ring_block {
struct scoutfs_block_header hdr;
__le16 nr_entries;
} __packed;
enum {
SCOUTFS_RING_ADD_MANIFEST = 0,
SCOUTFS_RING_DEL_MANIFEST,
SCOUTFS_RING_BITMAP,
};
/*
* Including both keys might make the manifest too large. It might be
* better to only include one key and infer a block's range from the
* neighbour's key. The downside of that is that we assume that there
* isn't unused key space between blocks in a level. We might search
* blocks when we didn't need to.
*/
struct scoutfs_manifest_entry {
__le64 blkno;
__le64 seq;
__u8 level;
struct scoutfs_key first;
struct scoutfs_key last;
} __packed;
#define SCOUTFS_MANIFESTS_PER_LEVEL 10
/* 2^22 * 10^13 > 2^64 */
#define SCOUTFS_MAX_LEVEL 13
struct scoutfs_ring_bitmap {
__le32 offset;
__le64 bits[2];
} __packed;
struct scoutfs_bloom_block {
struct scoutfs_block_header hdr;
__le64 bits[0];
} __packed;
#define SCOUTFS_BLOOM_BITS_PER_BLOCK \
(((SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_block_header)) / 8) * 64)
/*
* Items in log segments are sorted in a skip list by their key. We
* have a rough limit of 64k items.
*/
#define SCOUTFS_SKIP_HEIGHT 16
struct scoutfs_skip_root {
__le32 next[SCOUTFS_SKIP_HEIGHT];
} __packed;
/*
* An item block follows the bloom filter blocks at the start of a log
* segment. Its skip root references the item structs which then
* reference the item values in the rest of the block. The references
* are byte offsets from the start of the chunk.
*/
struct scoutfs_item_block {
struct scoutfs_block_header hdr;
struct scoutfs_key first;
struct scoutfs_key last;
struct scoutfs_skip_root skip_root;
} __packed;
struct scoutfs_item {
struct scoutfs_key key;
__le32 offset;
__le16 len;
u8 skip_height;
__le32 skip_next[0];
} __packed;
/*
* Item size caps item file data item length so that they fit in checksummed
* 4k blocks with a bit of expansion room.
*/
#define SCOUTFS_MAX_ITEM_LEN \
(SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_block_header) - 32)
#define SCOUTFS_MAX_ITEM_LEN 2048
struct scoutfs_timespec {
__le64 sec;

View File

@@ -19,7 +19,7 @@
#include "super.h"
#include "key.h"
#include "inode.h"
#include "segment.h"
#include "btree.h"
#include "dir.h"
#include "filerw.h"
#include "scoutfs_trace.h"
@@ -112,17 +112,17 @@ static void load_inode(struct inode *inode, struct scoutfs_inode *cinode)
static int scoutfs_read_locked_inode(struct inode *inode)
{
struct scoutfs_btree_cursor curs = {NULL,};
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_key key;
int ret;
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0);
ret = scoutfs_read_item(sb, &key, &ref);
ret = scoutfs_btree_lookup(sb, &key, &curs);
if (!ret) {
load_inode(inode, ref.val);
scoutfs_put_ref(&ref);
load_inode(inode, curs.val);
scoutfs_btree_release(&curs);
}
return 0;
@@ -213,16 +213,17 @@ static void store_inode(struct scoutfs_inode *cinode, struct inode *inode)
int scoutfs_dirty_inode_item(struct inode *inode)
{
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_btree_cursor curs = {NULL,};
struct scoutfs_key key;
int ret;
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0);
ret = scoutfs_dirty_item(sb, &key, sizeof(struct scoutfs_inode), &ref);
ret = scoutfs_btree_dirty(sb, &key, sizeof(struct scoutfs_inode),
&curs);
if (!ret) {
store_inode(ref.val, inode);
scoutfs_put_ref(&ref);
store_inode(curs.val, inode);
scoutfs_btree_release(&curs);
trace_scoutfs_dirty_inode(inode);
}
return ret;
@@ -239,18 +240,20 @@ int scoutfs_dirty_inode_item(struct inode *inode)
*/
void scoutfs_update_inode_item(struct inode *inode)
{
struct scoutfs_btree_cursor curs = {NULL,};
struct super_block *sb = inode->i_sb;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_key key;
int ret;
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0);
ret = scoutfs_read_item(sb, &key, &ref);
/* XXX maybe just use dirty again? not sure.. */
ret = scoutfs_btree_dirty(sb, &key, sizeof(struct scoutfs_inode),
&curs);
BUG_ON(ret);
store_inode(ref.val, inode);
scoutfs_put_ref(&ref);
store_inode(curs.val, inode);
scoutfs_btree_release(&curs);
trace_scoutfs_update_inode(inode);
}
@@ -262,8 +265,8 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
umode_t mode, dev_t rdev)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_btree_cursor curs = {NULL,};
struct scoutfs_inode_info *ci;
DECLARE_SCOUTFS_ITEM_REF(ref);
struct scoutfs_key key;
struct inode *inode;
int ret;
@@ -285,14 +288,14 @@ struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_INODE_KEY, 0);
ret = scoutfs_create_item(inode->i_sb, &key,
sizeof(struct scoutfs_inode), &ref);
ret = scoutfs_btree_insert(inode->i_sb, &key,
sizeof(struct scoutfs_inode), &curs);
if (ret) {
iput(inode);
return ERR_PTR(ret);
}
scoutfs_put_ref(&ref);
scoutfs_btree_release(&curs);
return inode;
}

View File

@@ -1,147 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include "rbtree_aug.h"
#include "format.h"
#include "key.h"
#include "ival.h"
/*
* scoutfs wants to store overlapping key ranges and find intersections
* for tracking both segments in level 0 and granting access ranges.
*
* We use a simple augmented rbtree of key intervals that tracks the
* greatest end value of all the intervals in a node's subtree. Wikipedia
* data structures 101.
*
* Unfortunately the augmented rbtree callbacks need a tweak to compare
* our key structs. But we don't want to mess around with updating
* distro kernels. So we backport the augmented rbtree code from
* mainline in a private copy. This'll vanish when we bring scoutfs up
* to mainline.
*/
static struct scoutfs_key *node_subtree_end(struct rb_node *node)
{
struct scoutfs_ival *ival;
static struct scoutfs_key static_zero = {0,};
if (!node)
return &static_zero;
ival = container_of(node, struct scoutfs_ival, node);
return &ival->subtree_end;
}
static struct scoutfs_key compute_subtree_end(struct scoutfs_ival *ival)
{
return *scoutfs_max_key(node_subtree_end(ival->node.rb_left),
node_subtree_end(ival->node.rb_right));
}
RB_DECLARE_CALLBACKS(static, ival_rb_cb, struct scoutfs_ival, node,
struct scoutfs_key, subtree_end, compute_subtree_end)
void scoutfs_insert_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_ival *ins)
{
struct rb_node **node = &tree->root.rb_node;
struct rb_node *parent = NULL;
struct scoutfs_ival *ival;
giant_rbtree_hack_build_bugs();
while (*node) {
parent = *node;
ival = container_of(*node, struct scoutfs_ival, node);
/* extend traversed subtree end to cover inserted end */
ival->subtree_end = *scoutfs_max_key(&ival->subtree_end,
&ins->end);
if (scoutfs_key_cmp(&ins->start, &ival->start) < 0)
node = &(*node)->rb_left;
else
node = &(*node)->rb_right;
}
ins->subtree_end = ins->end;
rb_link_node(&ins->node, parent, node);
rb_insert_augmented(&ins->node, &tree->root, &ival_rb_cb);
}
void scoutfs_remove_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_ival *ival)
{
if (!RB_EMPTY_NODE(&ival->node)) {
rb_erase_augmented(&ival->node, &tree->root, &ival_rb_cb);
RB_CLEAR_NODE(&ival->node);
}
}
/*
* Find the interval in the tree with the lowest start value that
* intersects the search range.
*/
static struct scoutfs_ival *first_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_key *start,
struct scoutfs_key *end)
{
struct rb_node *node = tree->root.rb_node;
struct scoutfs_ival *ival;
while (node) {
ival = container_of(node, struct scoutfs_ival, node);
if (scoutfs_key_cmp(node_subtree_end(ival->node.rb_left),
start) >= 0)
node = node->rb_left;
else if (!scoutfs_cmp_key_ranges(start, end,
&ival->start, &ival->end))
return ival;
else if (scoutfs_key_cmp(end, &ival->start) < 0)
break;
else
node = node->rb_right;
}
return NULL;
}
/*
* Find the next interval sorted by the start value which intersect the
* given search range. ival is null to first return the intersection
* with the lowest start value. The caller must serialize access while
* iterating.
*/
struct scoutfs_ival *scoutfs_next_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_key *start,
struct scoutfs_key *end,
struct scoutfs_ival *ival)
{
struct rb_node *node;
if (!ival)
return first_ival(tree, start, end);
node = rb_next(&ival->node);
if (node) {
ival = container_of(node, struct scoutfs_ival, node);
if (!scoutfs_cmp_key_ranges(start, end,
&ival->start, &ival->end))
return ival;
}
return NULL;
}

View File

@@ -1,71 +0,0 @@
#ifndef _SCOUTFS_IVAL_H_
#define _SCOUTFS_IVAL_H_
struct scoutfs_ival_tree {
struct rb_root root;
};
static inline void scoutfs_init_ival_tree(struct scoutfs_ival_tree *tree)
{
tree->root = RB_ROOT;
}
struct scoutfs_ival {
struct rb_node node;
struct scoutfs_key start;
struct scoutfs_key end;
struct scoutfs_key subtree_end;
};
void scoutfs_insert_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_ival *ins);
void scoutfs_remove_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_ival *ival);
struct scoutfs_ival *scoutfs_next_ival(struct scoutfs_ival_tree *tree,
struct scoutfs_key *start,
struct scoutfs_key *end,
struct scoutfs_ival *ival);
/*
* Walk all the intervals in postorder. This lets us free each ival we
* see without erasing and rebalancing.
*/
#define foreach_postorder_ival_safe(itree, ival, node, tmp) \
for (node = rb_first_postorder(&(itree)->root); \
ival = container_of(node, struct scoutfs_ival, node), \
(node && (tmp = *node, 1)), node; \
node = rb_next_postorder(&tmp))
// struct rb_node {
// long unsigned int __rb_parent_color; /* 0 8 */
// struct rb_node * rb_right; /* 8 8 */
// struct rb_node * rb_left; /* 16 8 */
//
// /* size: 24, cachelines: 1, members: 3 */
// /* last cacheline: 24 bytes */
// };
// struct rb_root {
// struct rb_node * rb_node; /* 0 8 */
//
// /* size: 8, cachelines: 1, members: 1 */
// /* last cacheline: 8 bytes */
// };
/*
* Try to find out if the imported hacked rbtree in ival.c goes out of
* sync with the rbtree in the distro kernel.
*/
static inline void giant_rbtree_hack_build_bugs(void)
{
size_t sz = sizeof(long);
BUILD_BUG_ON(offsetof(struct rb_node, __rb_parent_color) != 0);
BUILD_BUG_ON(offsetof(struct rb_node, rb_right) != sz);
BUILD_BUG_ON(offsetof(struct rb_node, rb_left) != (sz * 2));
BUILD_BUG_ON(sizeof(struct rb_node) != (sz * 3));
BUILD_BUG_ON(offsetof(struct rb_root, rb_node) != 0);
BUILD_BUG_ON(sizeof(struct rb_root) != sz);
}
#endif

View File

@@ -1,306 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
#include <linux/sort.h>
#include "super.h"
#include "format.h"
#include "manifest.h"
#include "key.h"
#include "ring.h"
#include "ival.h"
#include "scoutfs_trace.h"
/*
* The manifest organizes log segments into levels of item indexes. New
* segments arrive at level 0 which can have many segments with
* overlapping keys. Then segments are merged into progressively larger
* higher levels which do not have segments with overlapping keys.
*
* All the segments for all the levels are stored in one interval tree.
* This lets reads find all the overlapping segments in all levels with
* one tree walk instead of walks per level. It also lets us move
* segments around the levels by updating their level field rather than
* removing them from one level index and adding them to another.
*/
struct scoutfs_manifest {
spinlock_t lock;
struct scoutfs_ival_tree itree;
};
/*
* There's some redundancy between the interval struct and the manifest
* entry struct. If we re-use both we duplicate fields and memory
* pressure is precious here. So we have a native combination of the
* two.
*/
struct scoutfs_manifest_node {
struct scoutfs_ival ival;
u64 blkno;
u64 seq;
unsigned char level;
};
/*
* Remove an exact match of the entry from the manifest. It's normal
* for ring replay can try to remove an entry that doesn't exist if ring
* wrapping and manifest deletion combine in just the right way.
*/
static void delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
struct scoutfs_ival *ival;
ival = NULL;
while ((ival = scoutfs_next_ival(&mani->itree, &ment->first,
&ment->last, ival))) {
mnode = container_of(ival, struct scoutfs_manifest_node, ival);
if (mnode->blkno == le64_to_cpu(ment->blkno) &&
mnode->seq == le64_to_cpu(ment->seq) &&
!scoutfs_key_cmp(&ment->first, &mnode->ival.start) &&
!scoutfs_key_cmp(&ment->last, &mnode->ival.end))
break;
}
if (ival) {
trace_scoutfs_delete_manifest(ment);
scoutfs_remove_ival(&mani->itree, &mnode->ival);
kfree(mnode);
}
}
void scoutfs_delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
spin_lock(&mani->lock);
delete_manifest(sb, ment);
spin_unlock(&mani->lock);
}
static void insert_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment,
struct scoutfs_manifest_node *mnode)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
trace_scoutfs_insert_manifest(ment);
mnode->ival.start = ment->first;
mnode->ival.end = ment->last;
mnode->blkno = le64_to_cpu(ment->blkno);
mnode->seq = le64_to_cpu(ment->seq);
mnode->level = ment->level;
scoutfs_insert_ival(&mani->itree, &mnode->ival);
}
int scoutfs_insert_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS);
if (!mnode)
return -ENOMEM; /* XXX hmm, fatal? prealloc?*/
spin_lock(&mani->lock);
insert_manifest(sb, ment, mnode);
spin_unlock(&mani->lock);
return 0;
}
/*
* The caller has inserted a temporary manifest entry while they were
* dirtying a segment. It's done now and they want the final segment
* range stored in the manifest and logged in the ring.
*
* If this returns an error then nothing has changed.
*
* XXX we'd also need to add stale manifest entry's to the ring
* XXX In the future we'd send it to the leader
*/
int scoutfs_finalize_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *existing,
struct scoutfs_manifest_entry *updated)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_node *mnode;
int ret;
mnode = kzalloc(sizeof(struct scoutfs_manifest_node), GFP_NOFS);
if (!mnode)
return -ENOMEM; /* XXX hmm, fatal? prealloc?*/
ret = scoutfs_dirty_ring_entry(sb, SCOUTFS_RING_ADD_MANIFEST,
updated,
sizeof(struct scoutfs_manifest_entry));
if (ret) {
kfree(mnode);
return ret;
}
spin_lock(&mani->lock);
delete_manifest(sb, existing);
insert_manifest(sb, updated, mnode);
spin_unlock(&mani->lock);
return 0;
}
/* sorted by increasing level then decreasing seq */
static int cmp_ments(const void *A, const void *B)
{
const struct scoutfs_manifest_entry *a = A;
const struct scoutfs_manifest_entry *b = B;
int cmp;
cmp = (int)a->level - (int)b->level;
if (cmp)
return cmp;
if (le64_to_cpu(a->seq) > le64_to_cpu(b->seq))
return -1;
if (le64_to_cpu(a->seq) < le64_to_cpu(b->seq))
return 1;
return 0;
}
static void swap_ments(void *A, void *B, int size)
{
struct scoutfs_manifest_entry *a = A;
struct scoutfs_manifest_entry *b = B;
swap(*a, *b);
}
/*
* Give the caller an allocated array of manifest entries that intersect
* their search key. The array is sorted in the order for searching for
* the most recent item: decreasing sequence in level 0 then increasing
* levels.
*
* The live manifest can change while the caller walks their array but
* the segments will not be reclaimed and the caller has grants that
* protect their items in the segments even if the segments shift over
* time.
*
* The number of elements in the array is returned, or negative errors,
* and the array is not allocated if 0 is returned.
*
* XXX need to actually keep the segments from being reclaimed
*/
int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_manifest_entry **ments_ret)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_manifest_entry *ments;
struct scoutfs_manifest_node *mnode;
struct scoutfs_ival *ival;
unsigned nr;
int i;
/* make a reasonably large initial guess */
i = 16;
ments = NULL;
do {
kfree(ments);
nr = i;
ments = kmalloc(nr * sizeof(struct scoutfs_manifest_entry),
GFP_NOFS);
if (!ments)
return -ENOMEM;
spin_lock(&mani->lock);
i = 0;
ival = NULL;
while ((ival = scoutfs_next_ival(&mani->itree, key, key,
ival))) {
if (i < nr) {
mnode = container_of(ival,
struct scoutfs_manifest_node, ival);
ments[i].blkno = cpu_to_le64(mnode->blkno);
ments[i].seq = cpu_to_le64(mnode->seq);
ments[i].level = mnode->level;
ments[i].first = ival->start;
ments[i].last = ival->end;
}
i++;
}
spin_unlock(&mani->lock);
} while (i > nr);
if (i) {
sort(ments, i, sizeof(struct scoutfs_manifest_entry),
cmp_ments, swap_ments);
} else {
kfree(ments);
ments = NULL;
}
*ments_ret = ments;
return i;
}
int scoutfs_setup_manifest(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani;
mani = kzalloc(sizeof(struct scoutfs_manifest), GFP_KERNEL);
if (!mani)
return -ENOMEM;
spin_lock_init(&mani->lock);
scoutfs_init_ival_tree(&mani->itree);
sbi->mani = mani;
return 0;
}
/*
* This is called once the manifest will no longer be used.
*/
void scoutfs_destroy_manifest(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest *mani = sbi->mani;
struct scoutfs_ival *ival;
struct rb_node *node;
struct rb_node tmp;
if (mani) {
foreach_postorder_ival_safe(&mani->itree, ival, node, tmp)
kfree(ival);
kfree(mani);
sbi->mani = NULL;
}
}

View File

@@ -1,18 +0,0 @@
#ifndef _SCOUTFS_MANIFEST_H_
#define _SCOUTFS_MANIFEST_H_
int scoutfs_setup_manifest(struct super_block *sb);
void scoutfs_destroy_manifest(struct super_block *sb);
int scoutfs_insert_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment);
void scoutfs_delete_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *ment);
int scoutfs_finalize_manifest(struct super_block *sb,
struct scoutfs_manifest_entry *existing,
struct scoutfs_manifest_entry *updated);
int scoutfs_manifest_find_key(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_manifest_entry **ments_ret);
#endif

View File

@@ -1,996 +0,0 @@
/*
* The upstream augmented rbtree interface currently assumes that it
* can compare the augmented values directly:
*
* if (node->rbaugmented == augmented)
* break;
*
* This doesn't work for our struct key types. The only change needed
* to make this work for us is to turn that into a memcmp. But we're
* developing against distro kernels that sites actually use. For now
* we carry around this giant hack that imports the upstream copy and
* makes the change. It's only used in ival.c.
*
* This is a disgusting hack and also the right thing for this stage of
* the project. We'll fix this up as we submit upstream and trickle
* into distro kernels.
*/
#ifndef _GIANT_RBTREE_HACK_
#define _GIANT_RBTREE_HACK_
/* forbid including kernel rbtree headers by way of includes below */
#define _LINUX_RBTREE_AUGMENTED_H
#define _LINUX_RBTREE_H
#include <linux/kernel.h>
#include <linux/stddef.h>
#include <linux/rcupdate.h>
#include <linux/compiler.h>
#undef EXPORT_SYMBOL
#define EXPORT_SYMBOL(foo)
/*
* then paste rbtree.h, rbtree_augmented.h, and rbtree.c
*/
/* --------- rbtree.h ---------- */
/*
Red Black Trees
(C) 1999 Andrea Arcangeli <andrea@suse.de>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
linux/include/linux/rbtree.h
To use rbtrees you'll have to implement your own insert and search cores.
This will avoid us to use callbacks and to drop drammatically performances.
I know it's not the cleaner way, but in C (not in C++) to get
performances and genericity...
See Documentation/rbtree.txt for documentation and samples.
*/
struct rb_node {
unsigned long __rb_parent_color;
struct rb_node *rb_right;
struct rb_node *rb_left;
} __attribute__((aligned(sizeof(long))));
/* The alignment might seem pointless, but allegedly CRIS needs it */
struct rb_root {
struct rb_node *rb_node;
};
#define rb_parent(r) ((struct rb_node *)((r)->__rb_parent_color & ~3))
#define RB_ROOT (struct rb_root) { NULL, }
#define rb_entry(ptr, type, member) container_of(ptr, type, member)
#define RB_EMPTY_ROOT(root) (READ_ONCE((root)->rb_node) == NULL)
/* 'empty' nodes are nodes that are known not to be inserted in an rbtree */
#define RB_EMPTY_NODE(node) \
((node)->__rb_parent_color == (unsigned long)(node))
#define RB_CLEAR_NODE(node) \
((node)->__rb_parent_color = (unsigned long)(node))
extern void rb_insert_color(struct rb_node *, struct rb_root *);
extern void rb_erase(struct rb_node *, struct rb_root *);
/* Find logical next and previous nodes in a tree */
extern struct rb_node *rb_next(const struct rb_node *);
extern struct rb_node *rb_prev(const struct rb_node *);
extern struct rb_node *rb_first(const struct rb_root *);
extern struct rb_node *rb_last(const struct rb_root *);
/* Postorder iteration - always visit the parent after its children */
extern struct rb_node *rb_first_postorder(const struct rb_root *);
extern struct rb_node *rb_next_postorder(const struct rb_node *);
/* Fast replacement of a single node without remove/rebalance/add/rebalance */
extern void rb_replace_node(struct rb_node *victim, struct rb_node *new,
struct rb_root *root);
static inline void rb_link_node(struct rb_node *node, struct rb_node *parent,
struct rb_node **rb_link)
{
node->__rb_parent_color = (unsigned long)parent;
node->rb_left = node->rb_right = NULL;
*rb_link = node;
}
static inline void rb_link_node_rcu(struct rb_node *node, struct rb_node *parent,
struct rb_node **rb_link)
{
node->__rb_parent_color = (unsigned long)parent;
node->rb_left = node->rb_right = NULL;
rcu_assign_pointer(*rb_link, node);
}
#define rb_entry_safe(ptr, type, member) \
({ typeof(ptr) ____ptr = (ptr); \
____ptr ? rb_entry(____ptr, type, member) : NULL; \
})
/**
* rbtree_postorder_for_each_entry_safe - iterate in post-order over rb_root of
* given type allowing the backing memory of @pos to be invalidated
*
* @pos: the 'type *' to use as a loop cursor.
* @n: another 'type *' to use as temporary storage
* @root: 'rb_root *' of the rbtree.
* @field: the name of the rb_node field within 'type'.
*
* rbtree_postorder_for_each_entry_safe() provides a similar guarantee as
* list_for_each_entry_safe() and allows the iteration to continue independent
* of changes to @pos by the body of the loop.
*
* Note, however, that it cannot handle other modifications that re-order the
* rbtree it is iterating over. This includes calling rb_erase() on @pos, as
* rb_erase() may rebalance the tree, causing us to miss some nodes.
*/
#define rbtree_postorder_for_each_entry_safe(pos, n, root, field) \
for (pos = rb_entry_safe(rb_first_postorder(root), typeof(*pos), field); \
pos && ({ n = rb_entry_safe(rb_next_postorder(&pos->field), \
typeof(*pos), field); 1; }); \
pos = n)
/* --------- rbtree_augmented.h ---------- */
/*
Red Black Trees
(C) 1999 Andrea Arcangeli <andrea@suse.de>
(C) 2002 David Woodhouse <dwmw2@infradead.org>
(C) 2012 Michel Lespinasse <walken@google.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
linux/include/linux/rbtree_augmented.h
*/
/*
* Please note - only struct rb_augment_callbacks and the prototypes for
* rb_insert_augmented() and rb_erase_augmented() are intended to be public.
* The rest are implementation details you are not expected to depend on.
*
* See Documentation/rbtree.txt for documentation and samples.
*/
struct rb_augment_callbacks {
void (*propagate)(struct rb_node *node, struct rb_node *stop);
void (*copy)(struct rb_node *old, struct rb_node *new);
void (*rotate)(struct rb_node *old, struct rb_node *new);
};
extern void __rb_insert_augmented(struct rb_node *node, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new));
/*
* Fixup the rbtree and update the augmented information when rebalancing.
*
* On insertion, the user must update the augmented information on the path
* leading to the inserted node, then call rb_link_node() as usual and
* rb_augment_inserted() instead of the usual rb_insert_color() call.
* If rb_augment_inserted() rebalances the rbtree, it will callback into
* a user provided function to update the augmented information on the
* affected subtrees.
*/
static inline void
rb_insert_augmented(struct rb_node *node, struct rb_root *root,
const struct rb_augment_callbacks *augment)
{
__rb_insert_augmented(node, root, augment->rotate);
}
#define RB_DECLARE_CALLBACKS(rbstatic, rbname, rbstruct, rbfield, \
rbtype, rbaugmented, rbcompute) \
static inline void \
rbname ## _propagate(struct rb_node *rb, struct rb_node *stop) \
{ \
while (rb != stop) { \
rbstruct *node = rb_entry(rb, rbstruct, rbfield); \
rbtype augmented = rbcompute(node); \
if (!memcmp(&node->rbaugmented, &augmented, \
sizeof(augmented))) \
break; \
node->rbaugmented = augmented; \
rb = rb_parent(&node->rbfield); \
} \
} \
static inline void \
rbname ## _copy(struct rb_node *rb_old, struct rb_node *rb_new) \
{ \
rbstruct *old = rb_entry(rb_old, rbstruct, rbfield); \
rbstruct *new = rb_entry(rb_new, rbstruct, rbfield); \
new->rbaugmented = old->rbaugmented; \
} \
static void \
rbname ## _rotate(struct rb_node *rb_old, struct rb_node *rb_new) \
{ \
rbstruct *old = rb_entry(rb_old, rbstruct, rbfield); \
rbstruct *new = rb_entry(rb_new, rbstruct, rbfield); \
new->rbaugmented = old->rbaugmented; \
old->rbaugmented = rbcompute(old); \
} \
rbstatic const struct rb_augment_callbacks rbname = { \
rbname ## _propagate, rbname ## _copy, rbname ## _rotate \
};
#define RB_RED 0
#define RB_BLACK 1
#define __rb_parent(pc) ((struct rb_node *)(pc & ~3))
#define __rb_color(pc) ((pc) & 1)
#define __rb_is_black(pc) __rb_color(pc)
#define __rb_is_red(pc) (!__rb_color(pc))
#define rb_color(rb) __rb_color((rb)->__rb_parent_color)
#define rb_is_red(rb) __rb_is_red((rb)->__rb_parent_color)
#define rb_is_black(rb) __rb_is_black((rb)->__rb_parent_color)
static inline void rb_set_parent(struct rb_node *rb, struct rb_node *p)
{
rb->__rb_parent_color = rb_color(rb) | (unsigned long)p;
}
static inline void rb_set_parent_color(struct rb_node *rb,
struct rb_node *p, int color)
{
rb->__rb_parent_color = (unsigned long)p | color;
}
static inline void
__rb_change_child(struct rb_node *old, struct rb_node *new,
struct rb_node *parent, struct rb_root *root)
{
if (parent) {
if (parent->rb_left == old)
WRITE_ONCE(parent->rb_left, new);
else
WRITE_ONCE(parent->rb_right, new);
} else
WRITE_ONCE(root->rb_node, new);
}
extern void __rb_erase_color(struct rb_node *parent, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new));
static __always_inline struct rb_node *
__rb_erase_augmented(struct rb_node *node, struct rb_root *root,
const struct rb_augment_callbacks *augment)
{
struct rb_node *child = node->rb_right;
struct rb_node *tmp = node->rb_left;
struct rb_node *parent, *rebalance;
unsigned long pc;
if (!tmp) {
/*
* Case 1: node to erase has no more than 1 child (easy!)
*
* Note that if there is one child it must be red due to 5)
* and node must be black due to 4). We adjust colors locally
* so as to bypass __rb_erase_color() later on.
*/
pc = node->__rb_parent_color;
parent = __rb_parent(pc);
__rb_change_child(node, child, parent, root);
if (child) {
child->__rb_parent_color = pc;
rebalance = NULL;
} else
rebalance = __rb_is_black(pc) ? parent : NULL;
tmp = parent;
} else if (!child) {
/* Still case 1, but this time the child is node->rb_left */
tmp->__rb_parent_color = pc = node->__rb_parent_color;
parent = __rb_parent(pc);
__rb_change_child(node, tmp, parent, root);
rebalance = NULL;
tmp = parent;
} else {
struct rb_node *successor = child, *child2;
tmp = child->rb_left;
if (!tmp) {
/*
* Case 2: node's successor is its right child
*
* (n) (s)
* / \ / \
* (x) (s) -> (x) (c)
* \
* (c)
*/
parent = successor;
child2 = successor->rb_right;
augment->copy(node, successor);
} else {
/*
* Case 3: node's successor is leftmost under
* node's right child subtree
*
* (n) (s)
* / \ / \
* (x) (y) -> (x) (y)
* / /
* (p) (p)
* / /
* (s) (c)
* \
* (c)
*/
do {
parent = successor;
successor = tmp;
tmp = tmp->rb_left;
} while (tmp);
child2 = successor->rb_right;
WRITE_ONCE(parent->rb_left, child2);
WRITE_ONCE(successor->rb_right, child);
rb_set_parent(child, successor);
augment->copy(node, successor);
augment->propagate(parent, successor);
}
tmp = node->rb_left;
WRITE_ONCE(successor->rb_left, tmp);
rb_set_parent(tmp, successor);
pc = node->__rb_parent_color;
tmp = __rb_parent(pc);
__rb_change_child(node, successor, tmp, root);
if (child2) {
successor->__rb_parent_color = pc;
rb_set_parent_color(child2, parent, RB_BLACK);
rebalance = NULL;
} else {
unsigned long pc2 = successor->__rb_parent_color;
successor->__rb_parent_color = pc;
rebalance = __rb_is_black(pc2) ? parent : NULL;
}
tmp = successor;
}
augment->propagate(tmp, NULL);
return rebalance;
}
static __always_inline void
rb_erase_augmented(struct rb_node *node, struct rb_root *root,
const struct rb_augment_callbacks *augment)
{
struct rb_node *rebalance = __rb_erase_augmented(node, root, augment);
if (rebalance)
__rb_erase_color(rebalance, root, augment->rotate);
}
/* --------- rbtree.c ---------- */
/*
Red Black Trees
(C) 1999 Andrea Arcangeli <andrea@suse.de>
(C) 2002 David Woodhouse <dwmw2@infradead.org>
(C) 2012 Michel Lespinasse <walken@google.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
linux/lib/rbtree.c
*/
/*
* red-black trees properties: http://en.wikipedia.org/wiki/Rbtree
*
* 1) A node is either red or black
* 2) The root is black
* 3) All leaves (NULL) are black
* 4) Both children of every red node are black
* 5) Every simple path from root to leaves contains the same number
* of black nodes.
*
* 4 and 5 give the O(log n) guarantee, since 4 implies you cannot have two
* consecutive red nodes in a path and every red node is therefore followed by
* a black. So if B is the number of black nodes on every simple path (as per
* 5), then the longest possible path due to 4 is 2B.
*
* We shall indicate color with case, where black nodes are uppercase and red
* nodes will be lowercase. Unknown color nodes shall be drawn as red within
* parentheses and have some accompanying text comment.
*/
/*
* Notes on lockless lookups:
*
* All stores to the tree structure (rb_left and rb_right) must be done using
* WRITE_ONCE(). And we must not inadvertently cause (temporary) loops in the
* tree structure as seen in program order.
*
* These two requirements will allow lockless iteration of the tree -- not
* correct iteration mind you, tree rotations are not atomic so a lookup might
* miss entire subtrees.
*
* But they do guarantee that any such traversal will only see valid elements
* and that it will indeed complete -- does not get stuck in a loop.
*
* It also guarantees that if the lookup returns an element it is the 'correct'
* one. But not returning an element does _NOT_ mean it's not present.
*
* NOTE:
*
* Stores to __rb_parent_color are not important for simple lookups so those
* are left undone as of now. Nor did I check for loops involving parent
* pointers.
*/
static inline void rb_set_black(struct rb_node *rb)
{
rb->__rb_parent_color |= RB_BLACK;
}
static inline struct rb_node *rb_red_parent(struct rb_node *red)
{
return (struct rb_node *)red->__rb_parent_color;
}
/*
* Helper function for rotations:
* - old's parent and color get assigned to new
* - old gets assigned new as a parent and 'color' as a color.
*/
static inline void
__rb_rotate_set_parents(struct rb_node *old, struct rb_node *new,
struct rb_root *root, int color)
{
struct rb_node *parent = rb_parent(old);
new->__rb_parent_color = old->__rb_parent_color;
rb_set_parent_color(old, new, color);
__rb_change_child(old, new, parent, root);
}
static __always_inline void
__rb_insert(struct rb_node *node, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new))
{
struct rb_node *parent = rb_red_parent(node), *gparent, *tmp;
while (true) {
/*
* Loop invariant: node is red
*
* If there is a black parent, we are done.
* Otherwise, take some corrective action as we don't
* want a red root or two consecutive red nodes.
*/
if (!parent) {
rb_set_parent_color(node, NULL, RB_BLACK);
break;
} else if (rb_is_black(parent))
break;
gparent = rb_red_parent(parent);
tmp = gparent->rb_right;
if (parent != tmp) { /* parent == gparent->rb_left */
if (tmp && rb_is_red(tmp)) {
/*
* Case 1 - color flips
*
* G g
* / \ / \
* p u --> P U
* / /
* n n
*
* However, since g's parent might be red, and
* 4) does not allow this, we need to recurse
* at g.
*/
rb_set_parent_color(tmp, gparent, RB_BLACK);
rb_set_parent_color(parent, gparent, RB_BLACK);
node = gparent;
parent = rb_parent(node);
rb_set_parent_color(node, parent, RB_RED);
continue;
}
tmp = parent->rb_right;
if (node == tmp) {
/*
* Case 2 - left rotate at parent
*
* G G
* / \ / \
* p U --> n U
* \ /
* n p
*
* This still leaves us in violation of 4), the
* continuation into Case 3 will fix that.
*/
tmp = node->rb_left;
WRITE_ONCE(parent->rb_right, tmp);
WRITE_ONCE(node->rb_left, parent);
if (tmp)
rb_set_parent_color(tmp, parent,
RB_BLACK);
rb_set_parent_color(parent, node, RB_RED);
augment_rotate(parent, node);
parent = node;
tmp = node->rb_right;
}
/*
* Case 3 - right rotate at gparent
*
* G P
* / \ / \
* p U --> n g
* / \
* n U
*/
WRITE_ONCE(gparent->rb_left, tmp); /* == parent->rb_right */
WRITE_ONCE(parent->rb_right, gparent);
if (tmp)
rb_set_parent_color(tmp, gparent, RB_BLACK);
__rb_rotate_set_parents(gparent, parent, root, RB_RED);
augment_rotate(gparent, parent);
break;
} else {
tmp = gparent->rb_left;
if (tmp && rb_is_red(tmp)) {
/* Case 1 - color flips */
rb_set_parent_color(tmp, gparent, RB_BLACK);
rb_set_parent_color(parent, gparent, RB_BLACK);
node = gparent;
parent = rb_parent(node);
rb_set_parent_color(node, parent, RB_RED);
continue;
}
tmp = parent->rb_left;
if (node == tmp) {
/* Case 2 - right rotate at parent */
tmp = node->rb_right;
WRITE_ONCE(parent->rb_left, tmp);
WRITE_ONCE(node->rb_right, parent);
if (tmp)
rb_set_parent_color(tmp, parent,
RB_BLACK);
rb_set_parent_color(parent, node, RB_RED);
augment_rotate(parent, node);
parent = node;
tmp = node->rb_left;
}
/* Case 3 - left rotate at gparent */
WRITE_ONCE(gparent->rb_right, tmp); /* == parent->rb_left */
WRITE_ONCE(parent->rb_left, gparent);
if (tmp)
rb_set_parent_color(tmp, gparent, RB_BLACK);
__rb_rotate_set_parents(gparent, parent, root, RB_RED);
augment_rotate(gparent, parent);
break;
}
}
}
/*
* Inline version for rb_erase() use - we want to be able to inline
* and eliminate the dummy_rotate callback there
*/
static __always_inline void
____rb_erase_color(struct rb_node *parent, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new))
{
struct rb_node *node = NULL, *sibling, *tmp1, *tmp2;
while (true) {
/*
* Loop invariants:
* - node is black (or NULL on first iteration)
* - node is not the root (parent is not NULL)
* - All leaf paths going through parent and node have a
* black node count that is 1 lower than other leaf paths.
*/
sibling = parent->rb_right;
if (node != sibling) { /* node == parent->rb_left */
if (rb_is_red(sibling)) {
/*
* Case 1 - left rotate at parent
*
* P S
* / \ / \
* N s --> p Sr
* / \ / \
* Sl Sr N Sl
*/
tmp1 = sibling->rb_left;
WRITE_ONCE(parent->rb_right, tmp1);
WRITE_ONCE(sibling->rb_left, parent);
rb_set_parent_color(tmp1, parent, RB_BLACK);
__rb_rotate_set_parents(parent, sibling, root,
RB_RED);
augment_rotate(parent, sibling);
sibling = tmp1;
}
tmp1 = sibling->rb_right;
if (!tmp1 || rb_is_black(tmp1)) {
tmp2 = sibling->rb_left;
if (!tmp2 || rb_is_black(tmp2)) {
/*
* Case 2 - sibling color flip
* (p could be either color here)
*
* (p) (p)
* / \ / \
* N S --> N s
* / \ / \
* Sl Sr Sl Sr
*
* This leaves us violating 5) which
* can be fixed by flipping p to black
* if it was red, or by recursing at p.
* p is red when coming from Case 1.
*/
rb_set_parent_color(sibling, parent,
RB_RED);
if (rb_is_red(parent))
rb_set_black(parent);
else {
node = parent;
parent = rb_parent(node);
if (parent)
continue;
}
break;
}
/*
* Case 3 - right rotate at sibling
* (p could be either color here)
*
* (p) (p)
* / \ / \
* N S --> N Sl
* / \ \
* sl Sr s
* \
* Sr
*/
tmp1 = tmp2->rb_right;
WRITE_ONCE(sibling->rb_left, tmp1);
WRITE_ONCE(tmp2->rb_right, sibling);
WRITE_ONCE(parent->rb_right, tmp2);
if (tmp1)
rb_set_parent_color(tmp1, sibling,
RB_BLACK);
augment_rotate(sibling, tmp2);
tmp1 = sibling;
sibling = tmp2;
}
/*
* Case 4 - left rotate at parent + color flips
* (p and sl could be either color here.
* After rotation, p becomes black, s acquires
* p's color, and sl keeps its color)
*
* (p) (s)
* / \ / \
* N S --> P Sr
* / \ / \
* (sl) sr N (sl)
*/
tmp2 = sibling->rb_left;
WRITE_ONCE(parent->rb_right, tmp2);
WRITE_ONCE(sibling->rb_left, parent);
rb_set_parent_color(tmp1, sibling, RB_BLACK);
if (tmp2)
rb_set_parent(tmp2, parent);
__rb_rotate_set_parents(parent, sibling, root,
RB_BLACK);
augment_rotate(parent, sibling);
break;
} else {
sibling = parent->rb_left;
if (rb_is_red(sibling)) {
/* Case 1 - right rotate at parent */
tmp1 = sibling->rb_right;
WRITE_ONCE(parent->rb_left, tmp1);
WRITE_ONCE(sibling->rb_right, parent);
rb_set_parent_color(tmp1, parent, RB_BLACK);
__rb_rotate_set_parents(parent, sibling, root,
RB_RED);
augment_rotate(parent, sibling);
sibling = tmp1;
}
tmp1 = sibling->rb_left;
if (!tmp1 || rb_is_black(tmp1)) {
tmp2 = sibling->rb_right;
if (!tmp2 || rb_is_black(tmp2)) {
/* Case 2 - sibling color flip */
rb_set_parent_color(sibling, parent,
RB_RED);
if (rb_is_red(parent))
rb_set_black(parent);
else {
node = parent;
parent = rb_parent(node);
if (parent)
continue;
}
break;
}
/* Case 3 - right rotate at sibling */
tmp1 = tmp2->rb_left;
WRITE_ONCE(sibling->rb_right, tmp1);
WRITE_ONCE(tmp2->rb_left, sibling);
WRITE_ONCE(parent->rb_left, tmp2);
if (tmp1)
rb_set_parent_color(tmp1, sibling,
RB_BLACK);
augment_rotate(sibling, tmp2);
tmp1 = sibling;
sibling = tmp2;
}
/* Case 4 - left rotate at parent + color flips */
tmp2 = sibling->rb_right;
WRITE_ONCE(parent->rb_left, tmp2);
WRITE_ONCE(sibling->rb_right, parent);
rb_set_parent_color(tmp1, sibling, RB_BLACK);
if (tmp2)
rb_set_parent(tmp2, parent);
__rb_rotate_set_parents(parent, sibling, root,
RB_BLACK);
augment_rotate(parent, sibling);
break;
}
}
}
/* Non-inline version for rb_erase_augmented() use */
void __rb_erase_color(struct rb_node *parent, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new))
{
____rb_erase_color(parent, root, augment_rotate);
}
EXPORT_SYMBOL(__rb_erase_color);
/*
* Non-augmented rbtree manipulation functions.
*
* We use dummy augmented callbacks here, and have the compiler optimize them
* out of the rb_insert_color() and rb_erase() function definitions.
*/
static inline void dummy_propagate(struct rb_node *node, struct rb_node *stop) {}
static inline void dummy_copy(struct rb_node *old, struct rb_node *new) {}
static inline void dummy_rotate(struct rb_node *old, struct rb_node *new) {}
static const struct rb_augment_callbacks dummy_callbacks = {
dummy_propagate, dummy_copy, dummy_rotate
};
void rb_insert_color(struct rb_node *node, struct rb_root *root)
{
__rb_insert(node, root, dummy_rotate);
}
EXPORT_SYMBOL(rb_insert_color);
void rb_erase(struct rb_node *node, struct rb_root *root)
{
struct rb_node *rebalance;
rebalance = __rb_erase_augmented(node, root, &dummy_callbacks);
if (rebalance)
____rb_erase_color(rebalance, root, dummy_rotate);
}
EXPORT_SYMBOL(rb_erase);
/*
* Augmented rbtree manipulation functions.
*
* This instantiates the same __always_inline functions as in the non-augmented
* case, but this time with user-defined callbacks.
*/
void __rb_insert_augmented(struct rb_node *node, struct rb_root *root,
void (*augment_rotate)(struct rb_node *old, struct rb_node *new))
{
__rb_insert(node, root, augment_rotate);
}
EXPORT_SYMBOL(__rb_insert_augmented);
/*
* This function returns the first node (in sort order) of the tree.
*/
struct rb_node *rb_first(const struct rb_root *root)
{
struct rb_node *n;
n = root->rb_node;
if (!n)
return NULL;
while (n->rb_left)
n = n->rb_left;
return n;
}
EXPORT_SYMBOL(rb_first);
struct rb_node *rb_last(const struct rb_root *root)
{
struct rb_node *n;
n = root->rb_node;
if (!n)
return NULL;
while (n->rb_right)
n = n->rb_right;
return n;
}
EXPORT_SYMBOL(rb_last);
struct rb_node *rb_next(const struct rb_node *node)
{
struct rb_node *parent;
if (RB_EMPTY_NODE(node))
return NULL;
/*
* If we have a right-hand child, go down and then left as far
* as we can.
*/
if (node->rb_right) {
node = node->rb_right;
while (node->rb_left)
node=node->rb_left;
return (struct rb_node *)node;
}
/*
* No right-hand children. Everything down and left is smaller than us,
* so any 'next' node must be in the general direction of our parent.
* Go up the tree; any time the ancestor is a right-hand child of its
* parent, keep going up. First time it's a left-hand child of its
* parent, said parent is our 'next' node.
*/
while ((parent = rb_parent(node)) && node == parent->rb_right)
node = parent;
return parent;
}
EXPORT_SYMBOL(rb_next);
struct rb_node *rb_prev(const struct rb_node *node)
{
struct rb_node *parent;
if (RB_EMPTY_NODE(node))
return NULL;
/*
* If we have a left-hand child, go down and then right as far
* as we can.
*/
if (node->rb_left) {
node = node->rb_left;
while (node->rb_right)
node=node->rb_right;
return (struct rb_node *)node;
}
/*
* No left-hand children. Go up till we find an ancestor which
* is a right-hand child of its parent.
*/
while ((parent = rb_parent(node)) && node == parent->rb_left)
node = parent;
return parent;
}
EXPORT_SYMBOL(rb_prev);
void rb_replace_node(struct rb_node *victim, struct rb_node *new,
struct rb_root *root)
{
struct rb_node *parent = rb_parent(victim);
/* Set the surrounding nodes to point to the replacement */
__rb_change_child(victim, new, parent, root);
if (victim->rb_left)
rb_set_parent(victim->rb_left, new);
if (victim->rb_right)
rb_set_parent(victim->rb_right, new);
/* Copy the pointers/colour from the victim to the replacement */
*new = *victim;
}
EXPORT_SYMBOL(rb_replace_node);
static struct rb_node *rb_left_deepest_node(const struct rb_node *node)
{
for (;;) {
if (node->rb_left)
node = node->rb_left;
else if (node->rb_right)
node = node->rb_right;
else
return (struct rb_node *)node;
}
}
struct rb_node *rb_next_postorder(const struct rb_node *node)
{
const struct rb_node *parent;
if (!node)
return NULL;
parent = rb_parent(node);
/* If we're sitting on node, we've already seen our children */
if (parent && node == parent->rb_left && parent->rb_right) {
/* If we are the parent's left node, go to the parent's right
* node then all the way down to the left */
return rb_left_deepest_node(parent->rb_right);
} else
/* Otherwise we are the parent's right node, and the parent
* should be next */
return (struct rb_node *)parent;
}
EXPORT_SYMBOL(rb_next_postorder);
struct rb_node *rb_first_postorder(const struct rb_root *root)
{
if (!root->rb_node)
return NULL;
return rb_left_deepest_node(root->rb_node);
}
EXPORT_SYMBOL(rb_first_postorder);
#endif /* _GIANT_RBTREE_HACK_ */

View File

@@ -1,250 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/buffer_head.h>
#include <linux/fs.h>
#include "format.h"
#include "dir.h"
#include "inode.h"
#include "key.h"
#include "super.h"
#include "manifest.h"
#include "chunk.h"
#include "block.h"
#include "ring.h"
static int replay_ring_block(struct super_block *sb, struct buffer_head *bh)
{
struct scoutfs_ring_block *ring = (void *)bh->b_data;
struct scoutfs_ring_entry *ent = (void *)(ring + 1);
struct scoutfs_manifest_entry *ment;
struct scoutfs_ring_bitmap *bm;
int ret = 0;
int i;
/* XXX verify */
for (i = 0; i < le16_to_cpu(ring->nr_entries); i++) {
switch(ent->type) {
case SCOUTFS_RING_ADD_MANIFEST:
ment = (void *)(ent + 1);
ret = scoutfs_insert_manifest(sb, ment);
break;
case SCOUTFS_RING_DEL_MANIFEST:
ment = (void *)(ent + 1);
scoutfs_delete_manifest(sb, ment);
break;
case SCOUTFS_RING_BITMAP:
bm = (void *)(ent + 1);
scoutfs_set_chunk_alloc_bits(sb, bm);
break;
default:
/* XXX */
break;
}
ent = (void *)(ent + 1) + le16_to_cpu(ent->len);
}
return ret;
}
/*
* Return the block number of the block that contains the given logical
* block in the ring. We look up ring block chunks in the map blocks
* in the chunk described by the super.
*/
static u64 map_ring_block(struct super_block *sb, u64 block)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_ring_map_block *map;
struct buffer_head *bh;
u64 ring_chunk;
u32 ring_block;
u64 blkno;
u64 div;
u32 rem;
ring_block = block & SCOUTFS_CHUNK_BLOCK_MASK;
ring_chunk = block >> SCOUTFS_CHUNK_BLOCK_SHIFT;
div = div_u64_rem(ring_chunk, SCOUTFS_RING_MAP_BLOCKS, &rem);
bh = scoutfs_read_block(sb, le64_to_cpu(super->ring_map_blkno) + div);
if (!bh)
return 0;
/* XXX verify map block */
map = (void *)bh->b_data;
blkno = le64_to_cpu(map->blknos[rem]) + ring_block;
brelse(bh);
return blkno;
}
/*
* Read a given logical ring block.
*/
static struct buffer_head *read_ring_block(struct super_block *sb, u64 block)
{
u64 blkno = map_ring_block(sb, block);
if (!blkno)
return NULL;
return scoutfs_read_block(sb, blkno);
}
/*
* Return a dirty locked logical ring block.
*/
static struct buffer_head *new_ring_block(struct super_block *sb, u64 block)
{
u64 blkno = map_ring_block(sb, block);
if (!blkno)
return NULL;
return scoutfs_new_block(sb, blkno);
}
int scoutfs_replay_ring(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct buffer_head *bh;
u64 block;
int ret;
int i;
/* XXX read-ahead map blocks and each set of ring blocks */
block = le64_to_cpu(super->ring_first_block);
for (i = 0; i < le64_to_cpu(super->ring_active_blocks); i++) {
bh = read_ring_block(sb, block);
if (!bh) {
ret = -EIO;
break;
}
ret = replay_ring_block(sb, bh);
brelse(bh);
if (ret)
break;
if (++block == le64_to_cpu(super->ring_total_blocks))
block = 0;
}
return ret;
}
/*
* The caller is generating ring entries for manifest and allocator
* bitmap as they write items to blocks. We pin the block that we're
* working on so that it isn't written out until we fill it and
* calculate its checksum.
*/
int scoutfs_dirty_ring_entry(struct super_block *sb, u8 type, void *data,
u16 len)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_ring_block *ring;
struct scoutfs_ring_entry *ent;
struct buffer_head *bh;
unsigned int avail;
u64 block;
int ret = 0;
bh = sbi->dirty_ring_bh;
ent = sbi->dirty_ring_ent;
avail = sbi->dirty_ring_ent_avail;
if (bh && len > avail) {
scoutfs_finish_dirty_ring(sb);
bh = NULL;
}
if (!bh) {
block = le64_to_cpu(super->ring_first_block) +
le64_to_cpu(super->ring_active_blocks);
if (block >= le64_to_cpu(super->ring_total_blocks))
block -= le64_to_cpu(super->ring_total_blocks);
bh = new_ring_block(sb, block);
if (!bh) {
ret = -ENOMEM;
goto out;
}
ring = (void *)bh->b_data;
ring->nr_entries = 0;
ent = (void *)(ring + 1);
/* assuming len fits in new empty block */
}
ring = (void *)bh->b_data;
ent->type = type;
ent->len = cpu_to_le16(len);
memcpy(ent + 1, data, len);
le16_add_cpu(&ring->nr_entries, 1);
ent = (void *)(ent + 1) + le16_to_cpu(ent->len);
avail = SCOUTFS_BLOCK_SIZE - ((char *)(ent + 1) - (char *)ring);
out:
sbi->dirty_ring_bh = bh;
sbi->dirty_ring_ent = ent;
sbi->dirty_ring_ent_avail = avail;
return ret;
}
/*
* The super might have a pinned partial dirty ring block. This is
* called as we finish the block or when the commit is done. We
* calculate the checksum and unlock it so it can be written.
*
* XXX This is about to write a partial block. We might as well fill
* that space with more old entries from the manifest and ring before
* we write it.
*/
int scoutfs_finish_dirty_ring(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct buffer_head *bh;
bh = sbi->dirty_ring_bh;
if (!bh)
return 0;
sbi->dirty_ring_bh = NULL;
/*
* XXX we're not zeroing the tail of the block here. We will
* when we change the item block format to let us append to
* the block without walking all the items.
*/
scoutfs_calc_hdr_crc(bh);
mark_buffer_dirty(bh);
unlock_buffer(bh);
brelse(bh);
le64_add_cpu(&super->ring_active_blocks, 1);
return 0;
}

View File

@@ -1,9 +0,0 @@
#ifndef _SCOUTFS_RING_H_
#define _SCOUTFS_RING_H_
int scoutfs_replay_ring(struct super_block *sb);
int scoutfs_dirty_ring_entry(struct super_block *sb, u8 type, void *data,
u16 len);
int scoutfs_finish_dirty_ring(struct super_block *sb);
#endif

View File

@@ -24,9 +24,6 @@
#include "dir.h"
#include "msg.h"
#include "block.h"
#include "manifest.h"
#include "ring.h"
#include "segment.h"
#define CREATE_TRACE_POINTS
#include "scoutfs_trace.h"

View File

@@ -27,48 +27,6 @@
#include "key.h"
#include "format.h"
TRACE_EVENT(scoutfs_bloom_hit,
TP_PROTO(struct scoutfs_key *key),
TP_ARGS(key),
TP_STRUCT__entry(
__field(__u64, inode)
__field(__u8, type)
__field(__u64, offset)
),
TP_fast_assign(
__entry->inode = le64_to_cpu(key->inode);
__entry->type = key->type;
__entry->offset = le64_to_cpu(key->offset);
),
TP_printk("key %llu.%u.%llu",
__entry->inode, __entry->type, __entry->offset)
);
TRACE_EVENT(scoutfs_bloom_miss,
TP_PROTO(struct scoutfs_key *key),
TP_ARGS(key),
TP_STRUCT__entry(
__field(__u64, inode)
__field(__u8, type)
__field(__u64, offset)
),
TP_fast_assign(
__entry->inode = le64_to_cpu(key->inode);
__entry->type = key->type;
__entry->offset = le64_to_cpu(key->offset);
),
TP_printk("key %llu.%u.%llu",
__entry->inode, __entry->type, __entry->offset)
);
TRACE_EVENT(scoutfs_write_begin,
TP_PROTO(u64 ino, loff_t pos, unsigned len),
@@ -151,116 +109,6 @@ TRACE_EVENT(scoutfs_update_inode,
__entry->ino, __entry->size)
);
TRACE_EVENT(scoutfs_dirty_super,
TP_PROTO(struct scoutfs_super_block *super),
TP_ARGS(super),
TP_STRUCT__entry(
__field(__u64, blkno)
__field(__u64, seq)
),
TP_fast_assign(
__entry->blkno = le64_to_cpu(super->hdr.blkno);
__entry->seq = le64_to_cpu(super->hdr.seq);
),
TP_printk("blkno %llu seq %llu",
__entry->blkno, __entry->seq)
);
TRACE_EVENT(scoutfs_write_super,
TP_PROTO(struct scoutfs_super_block *super),
TP_ARGS(super),
TP_STRUCT__entry(
__field(__u64, blkno)
__field(__u64, seq)
),
TP_fast_assign(
__entry->blkno = le64_to_cpu(super->hdr.blkno);
__entry->seq = le64_to_cpu(super->hdr.seq);
),
TP_printk("blkno %llu seq %llu",
__entry->blkno, __entry->seq)
);
TRACE_EVENT(scoutfs_insert_manifest,
TP_PROTO(struct scoutfs_manifest_entry *ment),
TP_ARGS(ment),
TP_STRUCT__entry(
__field(__u64, blkno)
__field(__u64, seq)
__field(__u8, level)
__field(__u64, first_inode)
__field(__u8, first_type)
__field(__u64, first_offset)
__field(__u64, last_inode)
__field(__u8, last_type)
__field(__u64, last_offset)
),
TP_fast_assign(
__entry->blkno = le64_to_cpu(ment->blkno);
__entry->seq = le64_to_cpu(ment->seq);
__entry->level = ment->level;
__entry->first_inode = le64_to_cpu(ment->first.inode);
__entry->first_type = ment->first.type;
__entry->first_offset = le64_to_cpu(ment->first.offset);
__entry->last_inode = le64_to_cpu(ment->last.inode);
__entry->last_type = ment->last.type;
__entry->last_offset = le64_to_cpu(ment->last.offset);
),
TP_printk("blkno %llu seq %llu level %u first "CKF" last "CKF,
__entry->blkno, __entry->seq, __entry->level,
__entry->first_inode, __entry->first_type,
__entry->first_offset, __entry->last_inode,
__entry->last_type, __entry->last_offset)
);
TRACE_EVENT(scoutfs_delete_manifest,
TP_PROTO(struct scoutfs_manifest_entry *ment),
TP_ARGS(ment),
TP_STRUCT__entry(
__field(__u64, blkno)
__field(__u64, seq)
__field(__u8, level)
__field(__u64, first_inode)
__field(__u8, first_type)
__field(__u64, first_offset)
__field(__u64, last_inode)
__field(__u8, last_type)
__field(__u64, last_offset)
),
TP_fast_assign(
__entry->blkno = le64_to_cpu(ment->blkno);
__entry->seq = le64_to_cpu(ment->seq);
__entry->level = ment->level;
__entry->first_inode = le64_to_cpu(ment->first.inode);
__entry->first_type = ment->first.type;
__entry->first_offset = le64_to_cpu(ment->first.offset);
__entry->last_inode = le64_to_cpu(ment->last.inode);
__entry->last_type = ment->last.type;
__entry->last_offset = le64_to_cpu(ment->last.offset);
),
TP_printk("blkno %llu seq %llu level %u first "CKF" last "CKF,
__entry->blkno, __entry->seq, __entry->level,
__entry->first_inode, __entry->first_type,
__entry->first_offset, __entry->last_inode,
__entry->last_type, __entry->last_offset)
);
#endif /* _TRACE_SCOUTFS_H */
/* This part must be outside protection */

View File

@@ -1,805 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/buffer_head.h>
#include <linux/fs.h>
#include <linux/spinlock.h>
#include <linux/rbtree.h>
#include <linux/slab.h>
#include "super.h"
#include "key.h"
#include "segment.h"
#include "manifest.h"
#include "block.h"
#include "chunk.h"
#include "ring.h"
#include "bloom.h"
#include "skip.h"
/*
* scoutfs log segments are large multi-block structures that contain
* key/value items. This file implements manipulations of the items.
*
* Each log segment starts with a bloom filter to supports quickly
* testing for key values without having to search the whole block for a
* key.
*
* After the bloom filter come the packed structures that describe the
* items that are present in the block. They're sorted in a skip list
* to support reasonably efficient insertion, sorted iteration, and
* deletion.
*
* Finally the item values are stored at the end of the block. This
* supports finding that an item's key isn't present by only reading the
* item structs, not the values.
*
* All told, should we chose to, we can have three large portions of the
* blocks resident for searching. It's likely that we'll keep the bloom
* filters hot but that the items and especially the values may age out
* of the cache.
*/
void scoutfs_put_ref(struct scoutfs_item_ref *ref)
{
if (ref->item_bh)
brelse(ref->item_bh);
if (ref->val_bh)
brelse(ref->val_bh);
memset(ref, 0, sizeof(struct scoutfs_item_ref));
}
/* private to here */
struct scoutfs_item_iter {
struct list_head list;
struct buffer_head *bh;
struct scoutfs_item *item;
u64 blkno;
struct scoutfs_key after_seg;
};
void scoutfs_put_iter_list(struct list_head *list)
{
struct scoutfs_item_iter *iter;
struct scoutfs_item_iter *pos;
list_for_each_entry_safe(iter, pos, list, list) {
list_del_init(&iter->list);
brelse(iter->bh);
kfree(iter);
}
}
/*
* The caller has a pointer to an item and a reference to its block. We
* read the value block and populate the reference.
*
* The item references get their own buffer head references so that the
* caller doesn't have to play funny games. They always have to drop
* their release bh. If this succeeds then they also need to put the
* ref.
*/
static int populate_ref(struct super_block *sb, u64 blkno,
struct buffer_head *item_bh, struct scoutfs_item *item,
struct scoutfs_item_ref *ref)
{
struct buffer_head *bh;
bh = scoutfs_read_block_off(sb, blkno, le32_to_cpu(item->offset));
if (!bh)
return -EIO;
ref->key = &item->key;
ref->val_len = le16_to_cpu(item->len);
ref->val = bh->b_data + (le32_to_cpu(item->offset) &
SCOUTFS_BLOCK_MASK);
get_bh(item_bh);
ref->item_bh = item_bh;
ref->val_bh = bh;
return 0;
}
/*
* Segments are immutable once they're written. As they're being
* dirtied we need to lock concurrent access. XXX the dirty blkno test
* is probably racey. We could use reader/writer locks here. And we
* could probably make the skip lists support concurrent access.
*/
static bool try_lock_dirty_mutex(struct super_block *sb, u64 blkno)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (blkno == sbi->dirty_blkno) {
mutex_lock(&sbi->dirty_mutex);
if (blkno == sbi->dirty_blkno)
return true;
mutex_unlock(&sbi->dirty_mutex);
}
return false;
}
/*
* Return a reference to the item at the given key. We walk the manifest
* to find blocks that might contain the key from most recent to oldest.
* To find the key in each log segment we test it's bloom filter and
* then search through the item keys. The first matching item we find
* is returned.
*
* -ENOENT is returned if the item isn't present. The caller needs to put
* the ref if we return success.
*/
int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_item *item = NULL;
struct scoutfs_bloom_bits bits;
struct scoutfs_manifest_entry *ments;
struct buffer_head *bh;
bool locked;
u64 blkno;
int ret;
int nr;
int i;
/* XXX hold manifest */
scoutfs_calc_bloom_bits(&bits, key, sbi->super.bloom_salts);
item = NULL;
ret = -ENOENT;
nr = scoutfs_manifest_find_key(sb, key, &ments);
if (nr < 0)
return nr;
if (nr == 0)
return -ENOENT;
for (i = 0; i < nr; i++) {
/* XXX read-ahead all bloom blocks */
blkno = le64_to_cpu(ments[i].blkno);
/* XXX verify seqs */
ret = scoutfs_test_bloom_bits(sb, blkno, key, &bits);
if (ret < 0)
break;
if (!ret) {
ret = -ENOENT;
continue;
}
/* XXX read-ahead all item header blocks */
locked = try_lock_dirty_mutex(sb, blkno);
ret = scoutfs_skip_lookup(sb, blkno, key, &bh, &item);
if (locked)
mutex_unlock(&sbi->dirty_mutex);
if (ret) {
if (ret == -ENOENT)
continue;
break;
}
break;
}
kfree(ments);
/* XXX release manifest */
/* XXX read-ahead all value blocks? */
if (!ret) {
ret = populate_ref(sb, blkno, bh, item, ref);
brelse(bh);
}
return ret;
}
/* return the byte length of the item header including its skip elements */
static int item_bytes(int height)
{
return offsetof(struct scoutfs_item, skip_next[height]);
}
/*
* The dirty_item_off points to the byte offset after the last item.
* Advance it past block tails and initial block headers until there's
* room for an item with the given skip list elements height. Then set
* the dirty_item_off past the item offset item we return.
*/
static int add_item_off(struct scoutfs_sb_info *sbi, int height)
{
int len = item_bytes(height);
int off = sbi->dirty_item_off;
int block_off;
int tail_free;
/* items can't start in a block header */
block_off = off & SCOUTFS_BLOCK_MASK;
if (block_off < sizeof(struct scoutfs_block_header))
off += sizeof(struct scoutfs_block_header) - block_off;
/* items can't cross a block boundary */
tail_free = SCOUTFS_BLOCK_SIZE - (off & SCOUTFS_BLOCK_MASK);
if (tail_free < len)
off += tail_free + sizeof(struct scoutfs_block_header);
sbi->dirty_item_off = off + len;
return off;
}
/*
* The dirty_val_off points to the first byte of the last value that
* was allocated. Subtract the offset to make room for a new item
* of the given length. If that crosses a block boundary or wanders
* into the block header then pull it back into the tail of the previous
* block.
*/
static int sub_val_off(struct scoutfs_sb_info *sbi, int len)
{
int off = sbi->dirty_val_off - len;
int block_off;
int tail_free;
/* values can't start in a block header */
block_off = off & SCOUTFS_BLOCK_MASK;
if (block_off < sizeof(struct scoutfs_block_header))
off -= (block_off + 1);
/* values can't cross a block boundary */
tail_free = SCOUTFS_BLOCK_SIZE - (off & SCOUTFS_BLOCK_MASK);
if (tail_free < len)
off -= len - tail_free;
sbi->dirty_val_off = off;
return off;
}
/*
* Initialize the buffers for the next dirty segment. We have to initialize
* the bloom filter bits and the item block header.
*
* XXX we need to really pin the blocks somehow
*/
static int start_dirty_segment(struct super_block *sb, u64 blkno)
{
struct scoutfs_bloom_block *blm;
struct scoutfs_item_block *iblk;
struct buffer_head *bh;
int ret = 0;
int i;
for (i = 0; i < SCOUTFS_BLOCKS_PER_CHUNK; i++) {
bh = scoutfs_new_block(sb, blkno + i);
if (!bh) {
ret = -EIO;
break;
}
if (i < SCOUTFS_BLOOM_BLOCKS) {
blm = (void *)bh->b_data;
memset(blm->bits, 0, SCOUTFS_BLOCK_SIZE -
offsetof(struct scoutfs_bloom_block, bits));
}
if (i == SCOUTFS_BLOOM_BLOCKS) {
iblk = (void *)bh->b_data;
memset(&iblk->first, ~0, sizeof(struct scoutfs_key));
memset(&iblk->last, 0, sizeof(struct scoutfs_key));
memset(&iblk->skip_root, 0, sizeof(iblk->skip_root) +
sizeof(struct scoutfs_item));
}
/* bh is pinned by sbi->dirty_blkno */
}
while (ret && i--) {
/* unwind pinned blocks on failure */
bh = sb_getblk(sb, blkno + i);
if (bh) {
brelse(bh);
brelse(bh);
}
}
return ret;
}
/*
* As we start to fill a dirty segment we don't know which keys it's
* going to contain. We add a manifest entry in memory that has it
* contain all items so that reading will know to search the dirty
* segment.
*
* Once it's finalized we know the specific range of items it contains
* and we update the manifest entry in memory for that range and write
* that to the ring.
*
* Inserting the updated segment can fail. If we deleted the segment,
* then insertion failed, then reinserting the original entry could fail.
* Instead we briefly allow two manifest entries for the same segment.
*/
static int update_dirty_segment_manifest(struct super_block *sb, u64 blkno,
bool all_items)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest_entry ment;
struct scoutfs_manifest_entry updated;
struct scoutfs_item_block *iblk;
struct buffer_head *bh;
ment.blkno = cpu_to_le64(blkno);
ment.seq = sbi->super.hdr.seq;
ment.level = 0;
memset(&ment.first, 0, sizeof(struct scoutfs_key));
memset(&ment.last, ~0, sizeof(struct scoutfs_key));
if (all_items)
return scoutfs_insert_manifest(sb, &ment);
bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS);
if (!bh)
return -EIO;
updated = ment;
iblk = (void *)bh->b_data;
updated.first = iblk->first;
updated.last = iblk->last;
brelse(bh);
return scoutfs_finalize_manifest(sb, &ment, &updated);
}
/*
* Zero the portion of this block that intersects with the free space in
* the middle of the segment. @start and @end are chunk-relative byte
* offsets of the inclusive start and exclusive end of the free region.
*/
static void zero_unused_block(struct super_block *sb, struct buffer_head *bh,
u32 start, u32 end)
{
u32 off = bh->b_blocknr << SCOUTFS_BLOCK_SHIFT;
/* see if the segment range falls outside our block */
if (start >= off + SCOUTFS_BLOCK_SIZE || end <= off)
return;
/* convert the chunk offsets to our block offsets */
start = max(start, off) - off;
end = min(off + SCOUTFS_BLOCK_SIZE, end) - off;
/* don't zero block headers */
start = max_t(u32, start, sizeof(struct scoutfs_block_header));
end = max_t(u32, start, sizeof(struct scoutfs_block_header));
if (start < end)
memset(bh->b_data + start, 0, end - start);
}
/*
* Finish off a dirty segment if we have one. Calculate the checksums of
* all the blocks, mark them dirty, and drop their pinned reference.
*
* XXX should do something with empty dirty segments.
*/
static int finish_dirty_segment(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct address_space *mapping = sb->s_bdev->bd_inode->i_mapping;
struct buffer_head *bh;
u64 blkno = sbi->dirty_blkno;
int ret = 0;
u64 i;
WARN_ON_ONCE(!blkno);
for (i = 0; i < SCOUTFS_BLOCKS_PER_CHUNK; i++) {
bh = scoutfs_read_block(sb, blkno + i);
/* should have been pinned */
if (WARN_ON_ONCE(!bh)) {
ret = -EIO;
break;
}
zero_unused_block(sb, bh, sbi->dirty_item_off,
sbi->dirty_val_off);
scoutfs_calc_hdr_crc(bh);
mark_buffer_dirty(bh);
brelse(bh);
/* extra release to unpin */
brelse(bh);
}
/* update manifest with range of items and add to ring */
ret = update_dirty_segment_manifest(sb, blkno, false);
/*
* Try to kick off a background write of the finished segment. Callers
* can wait for the buffers in writeback if they need to.
*/
if (!ret) {
filemap_fdatawrite_range(mapping, blkno << SCOUTFS_CHUNK_SHIFT,
((blkno + 1) << SCOUTFS_CHUNK_SHIFT) - 1);
sbi->dirty_blkno = 0;
}
return ret;
}
/*
* We've been dirtying log segment blocks and ring blocks as items were
* modified. sync makes sure that they're all persistent and updates
* the super.
*
* XXX need to synchronize with transactions
* XXX is state clean after errors?
*/
int scoutfs_sync_fs(struct super_block *sb, int wait)
{
struct address_space *mapping = sb->s_bdev->bd_inode->i_mapping;
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
int ret = 0;
mutex_unlock(&sbi->dirty_mutex);
if (sbi->dirty_blkno) {
ret = finish_dirty_segment(sb) ?:
scoutfs_finish_dirty_ring(sb) ?:
filemap_write_and_wait(mapping) ?:
scoutfs_write_dirty_super(sb) ?:
scoutfs_advance_dirty_super(sb);
}
mutex_unlock(&sbi->dirty_mutex);
return ret;
}
/*
* Return a reference to a newly allocated and initialized item in a
* block in the currently dirty log segment.
*
* Item creation is purposely kept very simple. Item and value offset
* allocation proceed from either end of the log segment. Once they
* intersect the log segment is full and written out. Deleted dirty
* items don't reclaim their space. The free space will be reclaimed by
* the level 0 -> level 1 merge that happens anyway. Not reclaiming
* free space makes item location more rigid and lets us relax the
* locking requirements of item references. An item reference doesn't
* have to worry about unrelated item modification moving their item
* around to, say, defragment free space.
*/
int scoutfs_create_item(struct super_block *sb, struct scoutfs_key *key,
unsigned bytes, struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_bloom_bits bits;
struct scoutfs_item *item;
struct scoutfs_item_block *iblk;
struct buffer_head *bh;
int item_off;
int val_off;
int height;
u64 blkno;
int ret = 0;
/* XXX how big should items really get? */
if (WARN_ON_ONCE(bytes == 0 || bytes > 4096))
return -EINVAL;
height = scoutfs_skip_random_height();
mutex_lock(&sbi->dirty_mutex);
next_chunk:
if (!sbi->dirty_blkno) {
ret = scoutfs_alloc_chunk(sb, &blkno);
if (ret)
goto out;
/* XXX free blkno on error? */
ret = start_dirty_segment(sb, blkno);
if (ret)
goto out;
/* add initial in-memory manifest entry with all items */
ret = update_dirty_segment_manifest(sb, blkno, true);
if (ret)
goto out;
sbi->dirty_blkno = blkno;
sbi->dirty_item_off =
(SCOUTFS_BLOCK_SIZE * SCOUTFS_BLOOM_BLOCKS) +
sizeof(struct scoutfs_item_block);
sbi->dirty_val_off = SCOUTFS_CHUNK_SIZE;
}
item_off = add_item_off(sbi, height);
val_off = sub_val_off(sbi, bytes);
trace_printk("item_off %u val_off %u\n", item_off, val_off);
if (item_off + item_bytes(height) > val_off) {
ret = finish_dirty_segment(sb);
if (ret)
goto out;
goto next_chunk;
}
/* XXX fix up this error handling in general */
bh = scoutfs_read_block_off(sb, sbi->dirty_blkno, item_off);
if (!bh) {
ret = -EIO;
goto out;
}
item = (void *)bh->b_data + (item_off & SCOUTFS_BLOCK_MASK);
item->key = *key;
item->offset = cpu_to_le32(val_off);
item->len = cpu_to_le16(bytes);
item->skip_height = height;
ret = scoutfs_skip_insert(sb, sbi->dirty_blkno, item, item_off);
if (ret)
goto out;
ret = populate_ref(sb, sbi->dirty_blkno, bh, item, ref);
brelse(bh);
if (ret)
goto out;
bh = scoutfs_read_block(sb, sbi->dirty_blkno + SCOUTFS_BLOOM_BLOCKS);
if (!bh) {
ret = -EIO;
goto out;
}
/*
* Update first and last keys as we go. It's ok if future deletions
* make this range larger than the actual keys. That'll almost
* never happen and it'll get fixed up in merging.
*/
iblk = (void *)bh->b_data;
if (scoutfs_key_cmp(key, &iblk->first) < 0)
iblk->first = *key;
if (scoutfs_key_cmp(key, &iblk->last) > 0)
iblk->last = *key;
brelse(bh);
/* XXX delete skip on failure? */
/* set the bloom bits last because we can't unset them */
scoutfs_calc_bloom_bits(&bits, key, sbi->super.bloom_salts);
ret = scoutfs_set_bloom_bits(sb, sbi->dirty_blkno, &bits);
out:
WARN_ON_ONCE(ret); /* XXX error paths are not robust */
mutex_unlock(&sbi->dirty_mutex);
return ret;
}
/*
* Ensure that there is a dirty item with the given key in the current
* dirty segment.
*
* The caller locks access to the item and prevents sync and made sure
* that there's enough free space in the segment for their dirty inodes.
*
* This is better than getting -EEXIST from create_item because that
* will leave the allocated item and val dangling in the block when it
* returns the error.
*/
int scoutfs_dirty_item(struct super_block *sb, struct scoutfs_key *key,
unsigned bytes, struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_item *item;
struct buffer_head *bh;
bool create = false;
int ret;
mutex_lock(&sbi->dirty_mutex);
if (sbi->dirty_blkno) {
ret = scoutfs_skip_lookup(sb, sbi->dirty_blkno, key, &bh,
&item);
if (ret == -ENOENT)
create = true;
else if (!ret) {
ret = populate_ref(sb, sbi->dirty_blkno, bh, item,
ref);
brelse(bh);
}
} else {
create = true;
}
mutex_unlock(&sbi->dirty_mutex);
if (create)
ret = scoutfs_create_item(sb, key, bytes, ref);
return ret;
}
/*
* This is a really cheesy temporary delete method. It only works on items
* that are stored in dirty blocks. The caller is responsible for dropping
* the ref. XXX be less bad.
*/
int scoutfs_delete_item(struct super_block *sb, struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
u64 blkno;
int ret;
mutex_lock(&sbi->dirty_mutex);
blkno = round_down(ref->item_bh->b_blocknr, SCOUTFS_BLOCKS_PER_CHUNK);
if (WARN_ON_ONCE(blkno != sbi->dirty_blkno)) {
ret = -EINVAL;
} else {
ret = scoutfs_skip_delete(sb, blkno, ref->key);
WARN_ON_ONCE(ret);
}
mutex_unlock(&sbi->dirty_mutex);
return ret;
}
/*
* Return a reference to the next item in the inclusive search range.
* The caller should have access to the search key range.
*
* We walk the manifest to find all the log segments that could contain
* the start of the range. We hold cursors on the blocks in the
* segments. Each next item iteration comes from finding the least of
* the next item at all these cursors.
*
* If we exhaust a segment at a given level we may need to search the
* next segment in that level to find the next item. The manifest may
* have changed under us while we walked our old set of segments. So we
* restart the entire search to get another consistent collection of
* segments to search.
*
* We put the segment references and iteration cursors in a list in the
* caller so that they can find many next items by advancing the cursors
* without having to walk the manifest and perform initial skip list
* searches in each segment.
*
* The caller is responsible for putting the item ref if we return
* success. -ENOENT is returned if there are no more items in the
* search range.
*/
int scoutfs_next_item(struct super_block *sb, struct scoutfs_key *first,
struct scoutfs_key *last, struct list_head *iter_list,
struct scoutfs_item_ref *ref)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_manifest_entry *ments = NULL;
struct scoutfs_key key = *first;
struct scoutfs_key least_hole;
struct scoutfs_item_iter *least;
struct scoutfs_item_iter *iter;
struct scoutfs_item_iter *pos;
bool locked;
int ret;
int nr;
int i;
restart:
if (list_empty(iter_list)) {
/* find all the segments that may contain the key */
ret = scoutfs_manifest_find_key(sb, &key, &ments);
if (ret == 0)
ret = -ENOENT;
if (ret < 0)
goto out;
nr = ret;
for (i = 0; i < nr; i++) {
iter = kzalloc(sizeof(struct scoutfs_item_iter),
GFP_NOFS);
if (!iter) {
ret = -ENOMEM;
goto out;
}
iter->blkno = le64_to_cpu(ments[i].blkno);
iter->after_seg = ments[i].last;
scoutfs_inc_key(&iter->after_seg);
list_add_tail(&iter->list, iter_list);
}
kfree(ments);
ments = NULL;
}
memset(&least_hole, ~0, sizeof(least_hole));
least = NULL;
list_for_each_entry_safe(iter, pos, iter_list, list) {
locked = try_lock_dirty_mutex(sb, iter->blkno);
/* search towards the key if we haven't yet */
if (!iter->item) {
ret = scoutfs_skip_search(sb, iter->blkno, &key,
&iter->bh, &iter->item);
} else {
ret = 0;
}
/* then iterate until we find or pass the key */
while (!ret && scoutfs_key_cmp(&iter->item->key, &key) < 0) {
ret = scoutfs_skip_next(sb, iter->blkno,
&iter->bh, &iter->item);
}
if (locked)
mutex_unlock(&sbi->dirty_mutex);
/* we're done with this segment if it has an item after last */
if (!ret && scoutfs_key_cmp(&iter->item->key, last) > 0) {
list_del_init(&iter->list);
brelse(iter->bh);
kfree(iter);
continue;
}
/*
* If we run out of keys in the segment then we don't know
* the state of keys after this segment in this level. If
* the hole after the segment is still inside the search
* range then we might need to search it for the next
* item if the least item of the remaining blocks is
* greater than the hole.
*/
if (ret == -ENOENT) {
if (scoutfs_key_cmp(&iter->after_seg, last) <= 0 &&
scoutfs_key_cmp(&iter->after_seg, &least_hole) < 0)
least_hole = iter->after_seg;
list_del_init(&iter->list);
brelse(iter->bh);
kfree(iter);
continue;
}
/* remember the most recent smallest key */
if (!least ||
scoutfs_key_cmp(&iter->item->key, &least->item->key) < 0)
least = iter;
}
/* if we had a gap before the least then we need a new search */
if (least && scoutfs_key_cmp(&least_hole, &least->item->key) < 0) {
scoutfs_put_iter_list(iter_list);
key = least_hole;
goto restart;
}
if (least)
ret = populate_ref(sb, least->blkno, least->bh, least->item,
ref);
else
ret = -ENOENT;
out:
kfree(ments);
if (ret)
scoutfs_put_iter_list(iter_list);
return ret;
}

View File

@@ -1,35 +0,0 @@
#ifndef _SCOUTFS_SEGMENT_H_
#define _SCOUTFS_SEGMENT_H_
struct scoutfs_item_ref {
/* usable by callers */
struct scoutfs_key *key;
unsigned int val_len;
void *val;
/* private buffer head refs */
struct buffer_head *item_bh;
struct buffer_head *val_bh;
};
#define DECLARE_SCOUTFS_ITEM_REF(name) \
struct scoutfs_item_ref name = {NULL ,}
void scoutfs_put_ref(struct scoutfs_item_ref *ref);
void scoutfs_put_iter_list(struct list_head *list);
int scoutfs_read_item(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_item_ref *ref);
int scoutfs_create_item(struct super_block *sb, struct scoutfs_key *key,
unsigned bytes, struct scoutfs_item_ref *ref);
int scoutfs_dirty_item(struct super_block *sb, struct scoutfs_key *key,
unsigned bytes, struct scoutfs_item_ref *ref);
int scoutfs_delete_item(struct super_block *sb, struct scoutfs_item_ref *ref);
int scoutfs_next_item(struct super_block *sb, struct scoutfs_key *first,
struct scoutfs_key *last, struct list_head *iter_list,
struct scoutfs_item_ref *ref);
int scoutfs_sync_fs(struct super_block *sb, int wait);
#endif

View File

@@ -1,338 +0,0 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/fs.h>
#include <linux/buffer_head.h>
#include <linux/random.h>
#include "format.h"
#include "key.h"
#include "block.h"
#include "skip.h"
#include "counters.h"
/*
* The items in a log segment block are sorted by their keys in a skip
* list. The skip list was chosen because it is so easy to implement
* and could, maybe some day, offer solid concurrent updates and reads.
* It also adds surprisingly little per-item overhead because half of
* the items only have one link.
*
* The list is rooted in the item block which follows the last bloom
* block in the segment. The links in the skip list elements are byte
* offsets of the start of items relative to the start of the log
* segment.
*
* We chose a limit on the height of 16 links. That gives around 64k
* items without going too crazy. That's around the higher end of the
* number of items we expect in log segments.
*
* This isn't quite a generic implementation. It knows that the items
* are rooted in the item block at a given offset in the log segment.
* It knows that the pointers are items and where the skip links are in
* its struct. It knows to compare the items by their key.
*
* The caller is completely responsible for serialization.
*
* The buffer_head reads here won't be as expensive as they might seem.
* The caller holds the blocks pinned so the worst case are block device
* page radix rcu lookups. Repeated reads of the recent blocks will hit
* the per-cpu lru bh reference caches.
*/
struct skip_path {
struct buffer_head *root_bh;
/*
* Pointers to the buffer heads which contain the blocks which are
* referenced by the next pointers in the path.
*/
struct buffer_head *bh[SCOUTFS_SKIP_HEIGHT];
/*
* Store the location of the index that references the item that
* we found. Insertion will modify the referenced index to add
* an entry before the item and deletion will modify the referenced
* index to remove the item.
*/
__le32 *next[SCOUTFS_SKIP_HEIGHT];
};
#define DECLARE_SKIP_PATH(name) \
struct skip_path name = {NULL, }
/*
* Not all byte offsets are possible locations of items. Items have to
* be after the bloom blocks and item block header, can't be in
* the block headers for the rest of the blocks, and can't be a partial
* struct at the end of a block.
*
* This is just a rough check. It doesn't catch items offsets that overlap
* with other items or values.
*/
static int invalid_item_off(u32 off)
{
if (off < ((SCOUTFS_BLOCK_SIZE * SCOUTFS_BLOOM_BLOCKS) +
sizeof(struct scoutfs_item_block)) ||
(off & SCOUTFS_BLOCK_MASK) < sizeof(struct scoutfs_block_header) ||
(off & SCOUTFS_BLOCK_MASK) >
(SCOUTFS_BLOCK_SIZE - sizeof(struct scoutfs_item))) {
trace_printk("invalid offset %u\n", off);
return 1;
}
return 0;
}
/*
* Set the caller's item to the item in the segment at the given byte
* offset and set their bh to the block that contains it.
*/
static int skip_read_item(struct super_block *sb, u64 blkno, __le32 off,
struct buffer_head **bh, struct scoutfs_item **item)
{
if (WARN_ON_ONCE(invalid_item_off(le32_to_cpu(off))))
return -EINVAL;
*bh = scoutfs_read_block_off(sb, blkno, le32_to_cpu(off));
if (!(*bh)) {
*bh = NULL;
*item = NULL;
return -EIO;
}
*item = (void *)(*bh)->b_data + (le32_to_cpu(off) & SCOUTFS_BLOCK_MASK);
return 0;
}
/*
* Find the next item in the skiplist with a key greater than or equal
* to the given key. Set the path pointers to the hops before this item
* so that we can modify those pointers to insert an item before it in
* the list or delete it.
*
* The caller is responsible for initializing the path and cleaning it up.
*/
static int skip_search(struct super_block *sb, u64 blkno,
struct skip_path *path, struct scoutfs_key *key,
int *cmp)
{
struct scoutfs_item_block *iblk;
struct scoutfs_item *item;
struct buffer_head *bh;
__le32 *next;
int ret = 0;
int i;
/* fake lesser comparison for insertion into an empty list */
*cmp = -1;
bh = scoutfs_read_block(sb, blkno + SCOUTFS_BLOOM_BLOCKS);
if (!bh)
return -EIO;
/* XXX verify */
iblk = (void *)bh->b_data;
next = iblk->skip_root.next;
path->root_bh = bh;
for (i = SCOUTFS_SKIP_HEIGHT - 1; i >= 0; i--) {
while (next[i]) {
ret = skip_read_item(sb, blkno, next[i], &bh, &item);
if (ret)
goto out;
*cmp = scoutfs_key_cmp(key, &item->key);
if (*cmp <= 0) {
brelse(bh);
break;
}
next = item->skip_next;
if (path->bh[i])
brelse(path->bh[i]);
path->bh[i] = bh;
}
path->next[i] = &next[i];
}
out:
return ret;
}
static void skip_release_path(struct skip_path *path)
{
int i;
if (path->root_bh)
brelse(path->root_bh);
for (i = 0; i < SCOUTFS_SKIP_HEIGHT; i++) {
if (path->bh[i]) {
brelse(path->bh[i]);
path->bh[i] = NULL;
}
}
}
/*
* We want heights with a distribution of 1 / (2^h). Half the items
* have a height of 1, a quarter have 2, an eighth have 3, etc.
*
* Finding the first low set bit in a random number achieves this
* nicely. ffs() even counts the bits from 1 so it matches our height.
*
* But ffs() returns 0 if no bits are set. We prevent a 0 height and
* limit the max height returned by oring in our max height.
*/
u8 scoutfs_skip_random_height(void)
{
return ffs(get_random_int() | (1 << (SCOUTFS_SKIP_HEIGHT - 1)));
}
/*
* Insert a new item in the item block's skip list. The caller provides
* an initialized item, particularly it's skip height and key, and
* the byte offset in the log segment of the item struct.
*/
int scoutfs_skip_insert(struct super_block *sb, u64 blkno,
struct scoutfs_item *item, u32 off)
{
DECLARE_SKIP_PATH(path);
int cmp;
int ret;
int i;
if (WARN_ON_ONCE(invalid_item_off(off)) ||
WARN_ON_ONCE(item->skip_height > SCOUTFS_SKIP_HEIGHT))
return -EINVAL;
scoutfs_inc_counter(sb, skip_insert);
ret = skip_search(sb, blkno, &path, &item->key, &cmp);
if (ret == 0) {
if (cmp == 0) {
ret = -EEXIST;
} else {
for (i = 0; i < item->skip_height; i++) {
item->skip_next[i] = *path.next[i];
*path.next[i] = cpu_to_le32(off);
}
}
}
skip_release_path(&path);
return ret;
}
static int skip_lookup(struct super_block *sb, u64 blkno,
struct scoutfs_key *key, struct buffer_head **bh,
struct scoutfs_item **item, bool exact)
{
DECLARE_SKIP_PATH(path);
int cmp;
int ret;
ret = skip_search(sb, blkno, &path, key, &cmp);
if (ret == 0) {
if ((exact && cmp) || *path.next[0] == 0) {
ret = -ENOENT;
} else {
ret = skip_read_item(sb, blkno, *path.next[0],
bh, item);
}
}
skip_release_path(&path);
return ret;
}
/*
* Find the item at the given key in the skip list.
*/
int scoutfs_skip_lookup(struct super_block *sb, u64 blkno,
struct scoutfs_key *key, struct buffer_head **bh,
struct scoutfs_item **item)
{
scoutfs_inc_counter(sb, skip_lookup);
return skip_lookup(sb, blkno, key, bh, item, true);
}
/*
* Find the next item after the given key in the skip list.
*/
int scoutfs_skip_search(struct super_block *sb, u64 blkno,
struct scoutfs_key *key, struct buffer_head **bh,
struct scoutfs_item **item)
{
scoutfs_inc_counter(sb, skip_search);
return skip_lookup(sb, blkno, key, bh, item, false);
}
int scoutfs_skip_delete(struct super_block *sb, u64 blkno,
struct scoutfs_key *key)
{
struct scoutfs_item *item;
DECLARE_SKIP_PATH(path);
struct buffer_head *bh;
int cmp;
int ret;
int i;
scoutfs_inc_counter(sb, skip_delete);
ret = skip_search(sb, blkno, &path, key, &cmp);
if (ret == 0) {
if (*path.next[0] && cmp) {
ret = -ENOENT;
} else {
ret = skip_read_item(sb, blkno, *path.next[0],
&bh, &item);
if (!ret) {
for (i = 0; i < item->skip_height; i++)
*path.next[i] = item->skip_next[i];
brelse(bh);
}
}
}
skip_release_path(&path);
return ret;
}
/*
* The caller has found a valid item with search or lookup. We can use
* the lowest level links to advance through the rest of the items. The
* caller has made sure that this is safe.
*/
int scoutfs_skip_next(struct super_block *sb, u64 blkno,
struct buffer_head **bh, struct scoutfs_item **item)
{
__le32 next;
if (!(*bh))
return -ENOENT;
scoutfs_inc_counter(sb, skip_next);
next = (*item)->skip_next[0];
brelse(*bh);
if (!next) {
*bh = NULL;
*item = NULL;
return -ENOENT;
}
return skip_read_item(sb, blkno, next, bh, item);
}

View File

@@ -1,18 +0,0 @@
#ifndef _SCOUTFS_SKIP_H_
#define _SCOUTFS_SKIP_H_
u8 scoutfs_skip_random_height(void);
int scoutfs_skip_insert(struct super_block *sb, u64 blkno,
struct scoutfs_item *item, u32 off);
int scoutfs_skip_lookup(struct super_block *sb, u64 blkno,
struct scoutfs_key *key, struct buffer_head **bh,
struct scoutfs_item **item);
int scoutfs_skip_search(struct super_block *sb, u64 blkno,
struct scoutfs_key *key, struct buffer_head **bh,
struct scoutfs_item **item);
int scoutfs_skip_delete(struct super_block *sb, u64 blkno,
struct scoutfs_key *key);
int scoutfs_skip_next(struct super_block *sb, u64 blkno,
struct buffer_head **bh, struct scoutfs_item **item);
#endif

View File

@@ -24,95 +24,33 @@
#include "dir.h"
#include "msg.h"
#include "block.h"
#include "manifest.h"
#include "ring.h"
#include "segment.h"
#include "counters.h"
#include "scoutfs_trace.h"
/* only for giant rbtree hack */
#include <linux/rbtree.h>
#include "ival.h"
static struct kset *scoutfs_kset;
static const struct super_operations scoutfs_super_ops = {
.alloc_inode = scoutfs_alloc_inode,
.destroy_inode = scoutfs_destroy_inode,
.sync_fs = scoutfs_sync_fs,
};
/*
* The caller advances the block number and sequence number in the super
* every time it wants to dirty it and eventually write it to reference
* dirty data that's been written.
*/
int scoutfs_advance_dirty_super(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
u64 blkno;
blkno = le64_to_cpu(super->hdr.blkno) - SCOUTFS_SUPER_BLKNO;
if (++blkno == SCOUTFS_SUPER_NR)
blkno = 0;
super->hdr.blkno = cpu_to_le64(SCOUTFS_SUPER_BLKNO + blkno);
le64_add_cpu(&super->hdr.seq, 1);
trace_scoutfs_dirty_super(super);
return 0;
}
/*
* We've been modifying the super copy in the info as we made changes.
* Write the super to finalize.
*/
int scoutfs_write_dirty_super(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct buffer_head *bh;
size_t sz;
int ret;
bh = scoutfs_new_block(sb, le64_to_cpu(super->hdr.blkno));
if (!bh)
return -ENOMEM;
sz = sizeof(struct scoutfs_super_block);
memcpy(bh->b_data, super, sz);
memset(bh->b_data + sz, 0, SCOUTFS_BLOCK_SIZE - sz);
scoutfs_calc_hdr_crc(bh);
mark_buffer_dirty(bh);
trace_scoutfs_write_super(super);
ret = sync_dirty_buffer(bh);
brelse(bh);
return ret;
}
static int read_supers(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super;
struct buffer_head *bh = NULL;
unsigned long bytes;
struct scoutfs_block *bl = NULL;
int found = -1;
int i;
for (i = 0; i < SCOUTFS_SUPER_NR; i++) {
if (bh)
brelse(bh);
bh = scoutfs_read_block(sb, SCOUTFS_SUPER_BLKNO + i);
if (!bh) {
scoutfs_put_block(bl);
bl = scoutfs_read_block(sb, SCOUTFS_SUPER_BLKNO + i);
if (IS_ERR(bl)) {
scoutfs_warn(sb, "couldn't read super block %u", i);
continue;
}
super = (void *)bh->b_data;
super = bl->data;
if (super->id != cpu_to_le64(SCOUTFS_SUPER_ID)) {
scoutfs_warn(sb, "super block %u has invalid id %llx",
@@ -128,8 +66,7 @@ static int read_supers(struct super_block *sb)
}
}
if (bh)
brelse(bh);
scoutfs_put_block(bl);
if (found < 0) {
scoutfs_err(sb, "unable to read valid super block");
@@ -145,17 +82,6 @@ static int read_supers(struct super_block *sb)
atomic64_set(&sbi->next_ino, SCOUTFS_ROOT_INO + 1);
atomic64_set(&sbi->next_blkno, 2);
/* Initialize all the sb info fields which depends on the supers. */
bytes = DIV_ROUND_UP(le64_to_cpu(sbi->super.total_chunks), 64) *
sizeof(u64);
sbi->chunk_alloc_bits = vmalloc(bytes);
if (!sbi->chunk_alloc_bits)
return -ENOMEM;
/* the alloc bits default to all free then ring entries update them */
memset(sbi->chunk_alloc_bits, 0xff, bytes);
return 0;
}
@@ -174,16 +100,9 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
if (!sbi)
return -ENOMEM;
spin_lock_init(&sbi->item_lock);
sbi->item_root = RB_ROOT;
sbi->dirty_item_root = RB_ROOT;
spin_lock_init(&sbi->chunk_alloc_lock);
mutex_init(&sbi->dirty_mutex);
if (!sb_set_blocksize(sb, SCOUTFS_BLOCK_SIZE)) {
printk(KERN_ERR "couldn't set blocksize\n");
return -EINVAL;
}
spin_lock_init(&sbi->block_lock);
INIT_RADIX_TREE(&sbi->block_radix, GFP_NOFS);
init_waitqueue_head(&sbi->block_wq);
/* XXX can have multiple mounts of a device, need mount id */
sbi->kset = kset_create_and_add(sb->s_id, NULL, &scoutfs_kset->kobj);
@@ -191,9 +110,7 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
return -ENOMEM;
ret = scoutfs_setup_counters(sb) ?:
read_supers(sb) ?:
scoutfs_setup_manifest(sb) ?:
scoutfs_replay_ring(sb);
read_supers(sb);
if (ret)
return ret;
@@ -205,8 +122,6 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
if (!sb->s_root)
return -ENOMEM;
scoutfs_advance_dirty_super(sb);
return 0;
}
@@ -222,9 +137,6 @@ static void scoutfs_kill_sb(struct super_block *sb)
kill_block_super(sb);
if (sbi) {
/* kill block super should have synced */
WARN_ON_ONCE(sbi->dirty_blkno);
scoutfs_destroy_manifest(sb);
scoutfs_destroy_counters(sb);
if (sbi->kset)
kset_unregister(sbi->kset);
@@ -253,8 +165,6 @@ static int __init scoutfs_module_init(void)
{
int ret;
giant_rbtree_hack_build_bugs();
scoutfs_init_counters();
scoutfs_kset = kset_create_and_add("scoutfs", NULL, fs_kobj);

View File

@@ -1,38 +1,23 @@
#ifndef _SCOUTFS_SUPER_H_
#define _SCOUTFS_SUPER_H_
#include <linux/fs.h>
#include <linux/rbtree.h>
#include "format.h"
struct scoutfs_manifest;
struct scoutfs_counters;
struct scoutfs_sb_info {
struct scoutfs_super_block super;
spinlock_t block_lock;
struct radix_tree_root block_radix;
wait_queue_head_t block_wq;
atomic64_t next_ino;
atomic64_t next_blkno;
spinlock_t item_lock;
struct rb_root item_root;
struct rb_root dirty_item_root;
struct scoutfs_manifest *mani;
spinlock_t chunk_alloc_lock;
__le64 *chunk_alloc_bits;
/* pinned dirty ring block during commit */
struct buffer_head *dirty_ring_bh;
struct scoutfs_ring_entry *dirty_ring_ent;
unsigned int dirty_ring_ent_avail;
/* pinned log segment during fs modifications */
struct mutex dirty_mutex;
u64 dirty_blkno;
int dirty_item_off;
int dirty_val_off;
/* $sysfs/fs/scoutfs/$id/ */
struct kset *kset;
@@ -44,7 +29,4 @@ static inline struct scoutfs_sb_info *SCOUTFS_SB(struct super_block *sb)
return sb->s_fs_info;
}
int scoutfs_advance_dirty_super(struct super_block *sb);
int scoutfs_write_dirty_super(struct super_block *sb);
#endif