scoutfs: add transactions and metadata writing

Add the transaction machinery that writes out dirty metadata blocks as
atomic transactions.

The block radix tracks dirty blocks with a dirty radix tag.

Blocks are written with bios whose completion marks them clean and
propogates errors through the super info.  The blocks are left tagged
during writeout so that they won't be (someday) mistaken for clean by
eviction.  Since we're modifying the radix from io completion we change
all block lock acquisitions to be interrupt safe.

All the operations that modify blocks hold and release the transaction
while they're doing their work.

sync kicks off work that waits for the transaction to be released so
that it can write out all the dirty blocks and then the new supers that
reference them.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2016-04-14 14:35:32 -07:00
parent a2f55f02a1
commit e3b308c0d0
9 changed files with 471 additions and 25 deletions

View File

@@ -3,4 +3,4 @@ obj-$(CONFIG_SCOUTFS_FS) := scoutfs.o
CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include
scoutfs-y += block.o btree.o counters.o crc.o dir.o filerw.o inode.o msg.o \
scoutfs_trace.o super.o treap.o
scoutfs_trace.o super.o trans.o treap.o

View File

@@ -22,6 +22,8 @@
#include "crc.h"
#include "counters.h"
#define DIRTY_RADIX_TAG 0
/*
* XXX
* - tie into reclaim
@@ -113,6 +115,38 @@ static void block_read_end_io(struct bio *bio, int err)
wake_up(&sbi->block_wq);
scoutfs_put_block(bl);
bio_put(bio);
}
/*
* Once a transaction block is persistent it's fine to drop the dirty
* tag. It's been checksummed so it can be read in again. It's seq
* will be in the current transaction so it'll simply be dirtied and
* checksummed and written out again.
*/
static void block_write_end_io(struct bio *bio, int err)
{
struct scoutfs_block *bl = bio->bi_private;
struct scoutfs_sb_info *sbi = SCOUTFS_SB(bl->sb);
unsigned long flags;
if (!err) {
spin_lock_irqsave(&sbi->block_lock, flags);
radix_tree_tag_clear(&sbi->block_radix,
bl->blkno, DIRTY_RADIX_TAG);
spin_unlock_irqrestore(&sbi->block_lock, flags);
}
/* not too worried about racing ints */
if (err && !sbi->block_write_err)
sbi->block_write_err = err;
if (atomic_dec_and_test(&sbi->block_writes))
wake_up(&sbi->block_wq);
scoutfs_put_block(bl);
bio_put(bio);
}
static int block_submit_bio(struct scoutfs_block *bl, int rw)
@@ -121,19 +155,30 @@ static int block_submit_bio(struct scoutfs_block *bl, int rw)
struct bio *bio;
int ret;
if (WARN_ON_ONCE(bl->blkno >=
i_size_read(sb->s_bdev->bd_inode) >> SCOUTFS_BLOCK_SHIFT)) {
printk("trying to read bad blkno %llu\n", bl->blkno);
}
bio = bio_alloc(GFP_NOFS, SCOUTFS_PAGES_PER_BLOCK);
if (WARN_ON_ONCE(!bio))
return -ENOMEM;
bio->bi_sector = bl->blkno << (SCOUTFS_BLOCK_SHIFT - 9);
bio->bi_bdev = sb->s_bdev;
/* XXX can we do that? */
ret = bio_add_page(bio, bl->page, SCOUTFS_BLOCK_SIZE, 0);
if (rw & WRITE)
;
else
if (rw & WRITE) {
bio->bi_end_io = block_write_end_io;
} else
bio->bi_end_io = block_read_end_io;
bio->bi_private = bl;
ret = bio_add_page(bio, bl->page, SCOUTFS_BLOCK_SIZE, 0);
if (WARN_ON_ONCE(ret != SCOUTFS_BLOCK_SIZE)) {
bio_put(bio);
return -ENOMEM;
}
atomic_inc(&bl->refcount);
submit_bio(rw, bio);
@@ -148,10 +193,11 @@ struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno)
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_block *found;
struct scoutfs_block *bl;
unsigned long flags;
int ret;
/* find an existing block, dropping if it's errored */
spin_lock(&sbi->block_lock);
spin_lock_irqsave(&sbi->block_lock, flags);
bl = radix_tree_lookup(&sbi->block_radix, blkno);
if (bl) {
@@ -164,7 +210,7 @@ struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno)
}
}
spin_unlock(&sbi->block_lock);
spin_unlock_irqrestore(&sbi->block_lock, flags);
if (bl)
goto wait;
@@ -179,7 +225,7 @@ struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno)
if (ret)
goto out;
spin_lock(&sbi->block_lock);
spin_lock_irqsave(&sbi->block_lock, flags);
found = radix_tree_lookup(&sbi->block_radix, blkno);
if (found) {
@@ -191,7 +237,7 @@ struct scoutfs_block *scoutfs_read_block(struct super_block *sb, u64 blkno)
atomic_inc(&bl->refcount);
}
spin_unlock(&sbi->block_lock);
spin_unlock_irqrestore(&sbi->block_lock, flags);
radix_tree_preload_end();
if (!found) {
@@ -237,6 +283,7 @@ struct scoutfs_block *scoutfs_read_ref(struct super_block *sb,
struct scoutfs_block_header *hdr;
struct scoutfs_block *bl;
struct scoutfs_block *found;
unsigned long flags;
bl = scoutfs_read_block(sb, le64_to_cpu(ref->blkno));
if (!IS_ERR(bl)) {
@@ -244,14 +291,14 @@ struct scoutfs_block *scoutfs_read_ref(struct super_block *sb,
if (WARN_ON_ONCE(hdr->seq != ref->seq)) {
/* XXX hack, make this a function */
spin_lock(&sbi->block_lock);
spin_lock_irqsave(&sbi->block_lock, flags);
found = radix_tree_lookup(&sbi->block_radix,
bl->blkno);
if (found == bl) {
radix_tree_delete(&sbi->block_radix, bl->blkno);
scoutfs_put_block(bl);
}
spin_unlock(&sbi->block_lock);
spin_unlock_irqrestore(&sbi->block_lock, flags);
scoutfs_put_block(bl);
bl = ERR_PTR(-EAGAIN);
@@ -261,6 +308,100 @@ struct scoutfs_block *scoutfs_read_ref(struct super_block *sb,
return bl;
}
/*
* XXX This is a gross hack for writing the super. It doesn't have
* per-block write completion indication, it just knows that it's the
* only thing that will be writing.
*/
int scoutfs_write_block(struct scoutfs_block *bl)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(bl->sb);
int ret;
BUG_ON(atomic_read(&sbi->block_writes) != 0);
atomic_inc(&sbi->block_writes);
ret = block_submit_bio(bl, WRITE);
if (ret)
atomic_dec(&sbi->block_writes);
else
wait_event(sbi->block_wq, atomic_read(&sbi->block_writes) == 0);
return ret ?: sbi->block_write_err;
}
/*
* A quick cheap test so that write dirty blocks only has to return
* success or error, not also the lack of dirty blocks.
*/
int scoutfs_has_dirty_blocks(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
return radix_tree_tagged(&sbi->block_radix, DIRTY_RADIX_TAG);
}
/*
* Write out all the currently dirty blocks. The caller has waited
* for all the dirty blocks to be consistent and has prevented further
* writes while we're working.
*
* The blocks are kept dirty so that they won't be evicted by reclaim
* while they're in flight. Reads can traverse the blocks while they're
* in flight.
*/
int scoutfs_write_dirty_blocks(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_block *blocks[16];
struct scoutfs_block *bl;
unsigned long flags;
unsigned long blkno;
int ret;
int nr;
int i;
blkno = 0;
sbi->block_write_err = 0;
ret = 0;
atomic_inc(&sbi->block_writes);
do {
/* get refs to a bunch of dirty blocks */
spin_lock_irqsave(&sbi->block_lock, flags);
nr = radix_tree_gang_lookup_tag(&sbi->block_radix,
(void **)blocks, blkno,
ARRAY_SIZE(blocks),
DIRTY_RADIX_TAG);
if (nr > 0)
blkno = blocks[nr - 1]->blkno + 1;
for (i = 0; i < nr; i++)
atomic_inc(&blocks[i]->refcount);
spin_unlock_irqrestore(&sbi->block_lock, flags);
/* submit them in order, being careful to put all on err */
for (i = 0; i < nr; i++) {
bl = blocks[i];
if (ret == 0) {
/* XXX crc could be farmed out */
scoutfs_calc_hdr_crc(bl);
atomic_inc(&sbi->block_writes);
ret = block_submit_bio(bl, WRITE);
if (ret)
atomic_dec(&sbi->block_writes);
}
scoutfs_put_block(bl);
}
} while (nr && !ret);
/* wait for all io to drain */
atomic_dec(&sbi->block_writes);
wait_event(sbi->block_wq, atomic_read(&sbi->block_writes) == 0);
return ret ?: sbi->block_write_err;
}
/*
* Give the caller a dirty block that they can safely modify. If the
* reference refers to a stable clean block then we allocate a new block
@@ -284,6 +425,7 @@ struct scoutfs_block *scoutfs_dirty_ref(struct super_block *sb,
struct scoutfs_block_header *hdr;
struct scoutfs_block *found;
struct scoutfs_block *bl;
unsigned long flags;
u64 blkno;
int ret;
@@ -301,7 +443,7 @@ struct scoutfs_block *scoutfs_dirty_ref(struct super_block *sb,
blkno = atomic64_inc_return(&sbi->next_blkno);
hdr = bl->data;
spin_lock(&sbi->block_lock);
spin_lock_irqsave(&sbi->block_lock, flags);
/* XXX don't really like this */
found = radix_tree_lookup(&sbi->block_radix, bl->blkno);
@@ -314,9 +456,10 @@ struct scoutfs_block *scoutfs_dirty_ref(struct super_block *sb,
hdr->blkno = cpu_to_le64(blkno);
hdr->seq = sbi->super.hdr.seq;
radix_tree_insert(&sbi->block_radix, blkno, bl);
radix_tree_tag_set(&sbi->block_radix, blkno, DIRTY_RADIX_TAG);
atomic_inc(&bl->refcount);
spin_unlock(&sbi->block_lock);
spin_unlock_irqrestore(&sbi->block_lock, flags);
radix_tree_preload_end();
ref->blkno = hdr->blkno;
@@ -337,6 +480,7 @@ struct scoutfs_block *scoutfs_new_block(struct super_block *sb, u64 blkno)
struct scoutfs_block_header *hdr;
struct scoutfs_block *found;
struct scoutfs_block *bl;
unsigned long flags;
int ret;
/* allocate a new block and try to insert it */
@@ -357,7 +501,7 @@ struct scoutfs_block *scoutfs_new_block(struct super_block *sb, u64 blkno)
hdr->blkno = cpu_to_le64(blkno);
hdr->seq = sbi->super.hdr.seq;
spin_lock(&sbi->block_lock);
spin_lock_irqsave(&sbi->block_lock, flags);
found = radix_tree_lookup(&sbi->block_radix, blkno);
if (found) {
radix_tree_delete(&sbi->block_radix, blkno);
@@ -365,8 +509,9 @@ struct scoutfs_block *scoutfs_new_block(struct super_block *sb, u64 blkno)
}
radix_tree_insert(&sbi->block_radix, blkno, bl);
radix_tree_tag_set(&sbi->block_radix, blkno, DIRTY_RADIX_TAG);
atomic_inc(&bl->refcount);
spin_unlock(&sbi->block_lock);
spin_unlock_irqrestore(&sbi->block_lock, flags);
radix_tree_preload_end();
ret = 0;

View File

@@ -30,6 +30,10 @@ struct scoutfs_block *scoutfs_read_ref(struct super_block *sb,
struct scoutfs_block *scoutfs_dirty_ref(struct super_block *sb,
struct scoutfs_block_ref *ref);
int scoutfs_has_dirty_blocks(struct super_block *sb);
int scoutfs_write_block(struct scoutfs_block *bl);
int scoutfs_write_dirty_blocks(struct super_block *sb);
void scoutfs_put_block(struct scoutfs_block *bl);
void scoutfs_calc_hdr_crc(struct scoutfs_block *bl);

View File

@@ -22,6 +22,7 @@
#include "key.h"
#include "super.h"
#include "btree.h"
#include "trans.h"
/*
* Directory entries are stored in entries with offsets calculated from
@@ -332,13 +333,19 @@ static int scoutfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode,
if (dentry->d_name.len > SCOUTFS_NAME_LEN)
return -ENAMETOOLONG;
ret = scoutfs_dirty_inode_item(dir);
ret = scoutfs_hold_trans(sb);
if (ret)
return ret;
ret = scoutfs_dirty_inode_item(dir);
if (ret)
goto out;
inode = scoutfs_new_inode(sb, dir, mode, rdev);
if (IS_ERR(inode))
return PTR_ERR(inode);
if (IS_ERR(inode)) {
ret = PTR_ERR(inode);
goto out;
}
bytes = dent_bytes(dentry->d_name.len);
@@ -384,6 +391,7 @@ out:
/* XXX delete the inode item here */
if (ret && !IS_ERR_OR_NULL(inode))
iput(inode);
scoutfs_release_trans(sb);
return ret;
}
@@ -419,10 +427,14 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
if (S_ISDIR(inode->i_mode) && i_size_read(inode))
return -ENOTEMPTY;
ret = scoutfs_hold_trans(sb);
if (ret)
return ret;
ret = scoutfs_dirty_inode_item(dir) ?:
scoutfs_dirty_inode_item(inode);
if (ret)
return ret;
goto out;
scoutfs_set_key(&key, scoutfs_ino(dir), SCOUTFS_DIRENT_KEY, di->hash);
@@ -444,6 +456,7 @@ static int scoutfs_unlink(struct inode *dir, struct dentry *dentry)
scoutfs_update_inode_item(dir);
out:
scoutfs_release_trans(sb);
return ret;
}

View File

@@ -18,6 +18,7 @@
#include "inode.h"
#include "key.h"
#include "filerw.h"
#include "trans.h"
#include "scoutfs_trace.h"
#include "btree.h"
@@ -138,6 +139,10 @@ static int scoutfs_writepage(struct page *page, struct writeback_control *wbc)
set_page_writeback(page);
ret = scoutfs_hold_trans(sb);
if (ret)
goto out;
for_each_data_region(&dr, page, pos) {
scoutfs_set_key(&key, scoutfs_ino(inode), SCOUTFS_DATA_KEY,
dr.item_key);
@@ -156,7 +161,8 @@ static int scoutfs_writepage(struct page *page, struct writeback_control *wbc)
}
scoutfs_btree_release(&curs);
scoutfs_release_trans(sb);
out:
if (ret) {
SetPageError(page);
mapping_set_error(&inode->i_data, ret);
@@ -191,6 +197,7 @@ static int scoutfs_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata)
{
struct inode *inode = mapping->host;
struct super_block *sb = inode->i_sb;
unsigned off;
trace_scoutfs_write_end(scoutfs_ino(inode), pos, len, copied);
@@ -203,9 +210,19 @@ static int scoutfs_write_end(struct file *file, struct address_space *mapping,
if (pos + copied > inode->i_size) {
i_size_write(inode, pos + copied);
/* XXX need to think about pinning and enospc */
if (!scoutfs_dirty_inode_item(inode))
scoutfs_update_inode_item(inode);
/*
* XXX This is a crazy hack that will go away when the
* file data paths are more robust. We're barely
* holding them together with duct tape while building
* up the robust metadata support that's needed to do a
* good job with the data pats.
*/
if (!scoutfs_hold_trans(sb)) {
if (!scoutfs_dirty_inode_item(inode))
scoutfs_update_inode_item(inode);
scoutfs_release_trans(sb);
}
}
if (!PageUptodate(page))

View File

@@ -25,6 +25,7 @@
#include "msg.h"
#include "block.h"
#include "counters.h"
#include "trans.h"
#include "scoutfs_trace.h"
static struct kset *scoutfs_kset;
@@ -32,8 +33,53 @@ static struct kset *scoutfs_kset;
static const struct super_operations scoutfs_super_ops = {
.alloc_inode = scoutfs_alloc_inode,
.destroy_inode = scoutfs_destroy_inode,
.sync_fs = scoutfs_sync_fs,
};
/*
* The caller advances the block number and sequence number in the super
* every time it wants to dirty it and eventually write it to reference
* dirty data that's been written.
*/
void scoutfs_advance_dirty_super(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
le64_add_cpu(&super->hdr.blkno, 1);
if (le64_to_cpu(super->hdr.blkno) == (SCOUTFS_SUPER_BLKNO +
SCOUTFS_SUPER_NR))
super->hdr.blkno = cpu_to_le64(SCOUTFS_SUPER_BLKNO);
le64_add_cpu(&super->hdr.seq, 1);
}
/*
* The caller is responsible for setting the super header's blkno
* and seq to something reasonable.
*/
int scoutfs_write_dirty_super(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
size_t sz = sizeof(struct scoutfs_super_block);
u64 blkno = le64_to_cpu(sbi->super.hdr.blkno);
struct scoutfs_block *bl;
int ret;
/* XXX prealloc? */
bl = scoutfs_new_block(sb, blkno);
if (WARN_ON_ONCE(IS_ERR(bl)))
return PTR_ERR(bl);
memcpy(bl->data, &sbi->super, sz);
memset(bl->data + sz, 0, SCOUTFS_BLOCK_SIZE - sz);
scoutfs_calc_hdr_crc(bl);
ret = scoutfs_write_block(bl);
scoutfs_put_block(bl);
return ret;
}
static int read_supers(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
@@ -97,13 +143,20 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
sbi = kzalloc(sizeof(struct scoutfs_sb_info), GFP_KERNEL);
sb->s_fs_info = sbi;
sbi->sb = sb;
if (!sbi)
return -ENOMEM;
spin_lock_init(&sbi->block_lock);
INIT_RADIX_TREE(&sbi->block_radix, GFP_NOFS);
init_waitqueue_head(&sbi->block_wq);
atomic_set(&sbi->block_writes, 0);
init_rwsem(&sbi->btree_rwsem);
atomic_set(&sbi->trans_holds, 0);
init_waitqueue_head(&sbi->trans_hold_wq);
spin_lock_init(&sbi->trans_write_lock);
INIT_WORK(&sbi->trans_write_work, scoutfs_trans_write_func);
init_waitqueue_head(&sbi->trans_write_wq);
/* XXX can have multiple mounts of a device, need mount id */
sbi->kset = kset_create_and_add(sb->s_id, NULL, &scoutfs_kset->kobj);
@@ -111,10 +164,13 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
return -ENOMEM;
ret = scoutfs_setup_counters(sb) ?:
read_supers(sb);
read_supers(sb) ?:
scoutfs_setup_trans(sb);
if (ret)
return ret;
scoutfs_advance_dirty_super(sb);
inode = scoutfs_iget(sb, SCOUTFS_ROOT_INO);
if (IS_ERR(inode))
return PTR_ERR(inode);
@@ -138,6 +194,7 @@ static void scoutfs_kill_sb(struct super_block *sb)
kill_block_super(sb);
if (sbi) {
scoutfs_shutdown_trans(sb);
scoutfs_destroy_counters(sb);
if (sbi->kset)
kset_unregister(sbi->kset);

View File

@@ -9,11 +9,15 @@
struct scoutfs_counters;
struct scoutfs_sb_info {
struct super_block *sb;
struct scoutfs_super_block super;
spinlock_t block_lock;
struct radix_tree_root block_radix;
wait_queue_head_t block_wq;
atomic_t block_writes;
int block_write_err;
atomic64_t next_ino;
atomic64_t next_blkno;
@@ -21,6 +25,16 @@ struct scoutfs_sb_info {
/* XXX there will be a lot more of these :) */
struct rw_semaphore btree_rwsem;
atomic_t trans_holds;
wait_queue_head_t trans_hold_wq;
spinlock_t trans_write_lock;
u64 trans_write_count;
int trans_write_ret;
struct work_struct trans_write_work;
wait_queue_head_t trans_write_wq;
struct workqueue_struct *trans_write_workq;
/* $sysfs/fs/scoutfs/$id/ */
struct kset *kset;
@@ -32,4 +46,7 @@ static inline struct scoutfs_sb_info *SCOUTFS_SB(struct super_block *sb)
return sb->s_fs_info;
}
void scoutfs_advance_dirty_super(struct super_block *sb);
int scoutfs_write_dirty_super(struct super_block *sb);
#endif

180
kmod/src/trans.c Normal file
View File

@@ -0,0 +1,180 @@
/*
* Copyright (C) 2016 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/sched.h>
#include <linux/wait.h>
#include <linux/atomic.h>
#include "super.h"
#include "block.h"
#include "trans.h"
#include "scoutfs_trace.h"
/*
* scoutfs metadata blocks are written in atomic transactions.
*
* Writers hold transactions to dirty blocks. The transaction can't be
* written until these active writers release the transaction. We don't
* track the relationships between dirty blocks so there's only ever one
* transaction being built.
*
* The copy of the on-disk super block in the fs sb info has its header
* sequence advanced so that new dirty blocks inherit this dirty
* sequence number. It's only advanced once all those dirty blocks are
* reachable after having first written them all out and then the new
* super with that seq. It's first incremented at mount.
*
* Unfortunately writers can nest. We don't bother trying to special
* case holding a transaction that you're already holding because that
* requires per-task storage. We just let anyone hold transactions
* regardless of waiters waiting to write, which risks waiters waiting a
* very long time.
*/
/*
* It's critical that this not try to perform IO if there's nothing
* dirty. The sync at unmount can have this work scheduled after sync
* returns and the unmount path starts to tear down supers and block
* devices. We have to safely detect that there's nothing to do using
* nothing in the vfs.
*/
void scoutfs_trans_write_func(struct work_struct *work)
{
struct scoutfs_sb_info *sbi = container_of(work, struct scoutfs_sb_info,
trans_write_work);
struct super_block *sb = sbi->sb;
bool advance = false;
int ret = 0;
wait_event(sbi->trans_hold_wq,
atomic_cmpxchg(&sbi->trans_holds, 0, -1) == 0);
/* XXX probably want to write out dirty pages in inodes */
if (scoutfs_has_dirty_blocks(sb)) {
ret = scoutfs_write_dirty_blocks(sb) ?:
scoutfs_write_dirty_super(sb);
if (!ret)
advance = 1;
}
spin_lock(&sbi->trans_write_lock);
if (advance)
scoutfs_advance_dirty_super(sb);
sbi->trans_write_count++;
sbi->trans_write_ret = ret;
spin_unlock(&sbi->trans_write_lock);
wake_up(&sbi->trans_write_wq);
atomic_set(&sbi->trans_holds, 0);
wake_up(&sbi->trans_hold_wq);
}
struct write_attempt {
u64 seq;
u64 count;
int ret;
};
/* this is called as a wait_event() condition so it can't change task state */
static int write_attempted(struct scoutfs_sb_info *sbi,
struct write_attempt *attempt)
{
int done = 1;
spin_lock(&sbi->trans_write_lock);
if (le64_to_cpu(sbi->super.hdr.seq) > attempt->seq)
attempt->ret = 0;
else if (sbi->trans_write_count > attempt->count)
attempt->ret = sbi->trans_write_ret;
else
done = 0;
spin_unlock(&sbi->trans_write_lock);
return done;
}
/*
* sync records the current dirty seq and write count and waits for
* either to change. If there's nothing to write or the write returned
* an error then only the write count advances and sets the appropriate
* return code.
*/
int scoutfs_sync_fs(struct super_block *sb, int wait)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct write_attempt attempt;
int ret;
if (!wait) {
schedule_work(&sbi->trans_write_work);
return 0;
}
spin_lock(&sbi->trans_write_lock);
attempt.seq = le64_to_cpu(sbi->super.hdr.seq);
attempt.count = sbi->trans_write_count;
spin_unlock(&sbi->trans_write_lock);
schedule_work(&sbi->trans_write_work);
ret = wait_event_interruptible(sbi->trans_write_wq,
write_attempted(sbi, &attempt));
if (ret == 0)
ret = attempt.ret;
return ret;
}
int scoutfs_hold_trans(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
return wait_event_interruptible(sbi->trans_hold_wq,
atomic_add_unless(&sbi->trans_holds, 1, -1));
}
void scoutfs_release_trans(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (atomic_sub_return(1, &sbi->trans_holds) == 0)
wake_up(&sbi->trans_hold_wq);
}
int scoutfs_setup_trans(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
sbi->trans_write_workq = alloc_workqueue("scoutfs_trans", 0, 1);
if (!sbi->trans_write_workq)
return -ENOMEM;
return 0;
}
/*
* kill_sb calls sync before getting here so we know that dirty data
* should be in flight. We just have to wait for it to quiesce.
*/
void scoutfs_shutdown_trans(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
if (sbi->trans_write_workq) {
flush_work(&sbi->trans_write_work);
destroy_workqueue(sbi->trans_write_workq);
}
}

13
kmod/src/trans.h Normal file
View File

@@ -0,0 +1,13 @@
#ifndef _SCOUTFS_TRANS_H_
#define _SCOUTFS_TRANS_H_
void scoutfs_trans_write_func(struct work_struct *work);
int scoutfs_sync_fs(struct super_block *sb, int wait);
int scoutfs_hold_trans(struct super_block *sb);
void scoutfs_release_trans(struct super_block *sb);
int scoutfs_setup_trans(struct super_block *sb);
void scoutfs_shutdown_trans(struct super_block *sb);
#endif