scoutfs: read and write supers as sm blocks

Back in ancient LSM times these functions to read and write the super
block reused the bio functions that LSM segment IO used.  Each IO would
be performed with privately allocated pages and bios.

When we got rid of the LSM code we got rid of the bio functions.  It was
quick and easy to transition super read/write to use buffer_heads.  This
introduced sharing of the super's buffer_head between readers and
writers.  First we saw concurrent readers being confused by the uptodate
bit and added a bunch of complexity to coordinate use of the uptodate
bit.

Now we're seeing the writer copy its super for writing into the buffer
that readers are using, causing crc failures on read.  Let's not use
buffer_heads anymore (always good advice).

We added quick block functions to read and write small blocks with
private pages and bios.  Use those here to read and write the super so
that readers and writers operate on their own buffers again.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2020-04-07 10:18:02 -07:00
committed by Zach Brown
parent 44ac668afa
commit ae9e060fbf

View File

@@ -21,7 +21,6 @@
#include <linux/sched.h>
#include <linux/debugfs.h>
#include <linux/percpu.h>
#include <linux/buffer_head.h>
#include "super.h"
#include "block.h"
@@ -224,108 +223,36 @@ static const struct super_operations scoutfs_super_ops = {
* Write the caller's super. The caller has always read a valid super
* before modifying and writing it. The caller's super is modified
* to reflect the write.
*
* XXX it'd be pretty easy to preallocate to avoid failure here.
*/
int scoutfs_write_super(struct super_block *sb,
struct scoutfs_super_block *caller)
struct scoutfs_super_block *super)
{
struct scoutfs_super_block *super;
struct buffer_head *bh;
int ret;
le64_add_cpu(&super->hdr.seq, 1);
bh = sb_getblk(sb, SCOUTFS_SUPER_BLKNO);
if (!bh)
return -ENOMEM;
le64_add_cpu(&caller->hdr.seq, 1);
memset(bh->b_data, 0, bh->b_size);
super = (void *)bh->b_data;
memcpy(super, caller, sizeof(*super));
super->hdr.crc = scoutfs_block_calc_crc(&super->hdr);
lock_buffer(bh);
set_buffer_mapped(bh);
set_buffer_dirty(bh);
unlock_buffer(bh);
ll_rw_block(WRITE, 1, &bh);
wait_on_buffer(bh);
if (!buffer_uptodate(bh))
ret = -EIO;
else
ret = 0;
brelse(bh);
return ret;
return scoutfs_block_write_sm(sb, SCOUTFS_SUPER_BLKNO, &super->hdr,
sizeof(struct scoutfs_super_block));
}
enum {
BH_ScoutfsReadGroup = BH_PrivateStart,
};
BUFFER_FNS(ScoutfsReadGroup, scoutfs_read_group);
/*
* Read the super block. If it's valid store it in the caller's super
* struct.
*
* The caller requires a current version of the super as they call. We
* can't use a cached version from before they called. We use a bit and
* bh_private as a counter to satisfy groups of waiting readers with
* each issued read rather than have every call issue a read. We have
* to be careful to serialize clearing uptodate when forcing media
* access so that it doesn't cause blocked readers to get spurious read
* errors.
*/
int scoutfs_read_super(struct super_block *sb,
struct scoutfs_super_block *super_res)
{
struct scoutfs_super_block *super;
struct buffer_head *bh = NULL;
__le32 calc;
int group;
int ret;
bh = sb_getblk(sb, SCOUTFS_SUPER_BLKNO);
if (bh == NULL) {
ret = -ENOMEM;
scoutfs_err(sb, "error alloationg buffer for super block: %d",
ret);
super = kmalloc(sizeof(struct scoutfs_super_block), GFP_NOFS);
if (!super)
return -ENOMEM;
ret = scoutfs_block_read_sm(sb, SCOUTFS_SUPER_BLKNO, &super->hdr,
sizeof(struct scoutfs_super_block),
&calc);
if (ret < 0)
goto out;
}
/* wait for current group to finish */
group = !!buffer_scoutfs_read_group(bh);
lock_buffer(bh);
while (!!buffer_scoutfs_read_group(bh) == group &&
(unsigned long)bh->b_private != 0) {
unlock_buffer(bh);
cond_resched();
lock_buffer(bh);
}
if ((unsigned long)(bh->b_private)++ == 0) {
/* first group locker advances group, submits, and relocks */
if (buffer_scoutfs_read_group(bh))
clear_buffer_scoutfs_read_group(bh);
else
set_buffer_scoutfs_read_group(bh);
clear_buffer_uptodate(bh);
bh->b_end_io = end_buffer_read_sync;
get_bh(bh);
submit_bh(READ | REQ_META | REQ_PRIO, bh);
lock_buffer(bh);
} else {
/* additional group lockers acquired after read completed */
}
if (!buffer_uptodate(bh)) {
ret = -EIO;
scoutfs_err(sb, "error reading super block: %d", ret);
goto out;
}
super = (void *)(bh->b_data);
if (super->hdr.magic != cpu_to_le32(SCOUTFS_BLOCK_MAGIC_SUPER)) {
scoutfs_err(sb, "super block has invalid magic value 0x%08x",
@@ -334,11 +261,11 @@ int scoutfs_read_super(struct super_block *sb,
goto out;
}
calc = scoutfs_block_calc_crc(&super->hdr);
if (calc != super->hdr.crc) {
scoutfs_err(sb, "super block has invalid crc 0x%08x, calculated 0x%08x",
le32_to_cpu(super->hdr.crc), le32_to_cpu(calc));
ret = -EINVAL;
goto out;
}
if (le64_to_cpu(super->hdr.blkno) != SCOUTFS_SUPER_BLKNO) {
@@ -370,11 +297,7 @@ int scoutfs_read_super(struct super_block *sb,
*super_res = *super;
ret = 0;
out:
if (bh != NULL) {
(unsigned long)(bh->b_private)--;
unlock_buffer(bh);
brelse(bh);
}
kfree(super);
return ret;
}