scoutfs: don't lose block wakeups

The block end_io path could lose wakeups.  Both the bio submission
task and a bio's end_io completion could see an io_count > 1 and neither
would set the block uptodate before dropping their io_count and waking.

It got into this mess because readers were waiting for io_count to drop
to 0.  We add a io_busy bit which indicates that io is still in flight
which waiters now wait for.  This gives the final io_count drop a chance
to do work before clearing io_busy and dropping their reference before
waking.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2020-02-25 17:10:39 -08:00
committed by Zach Brown
parent 44a7e2ab56
commit 757ee85520

View File

@@ -63,6 +63,7 @@ enum {
BLOCK_BIT_UPTODATE = 0, /* contents consistent with media */
BLOCK_BIT_NEW, /* newly allocated, contents undefined */
BLOCK_BIT_DIRTY, /* dirty, writer will write */
BLOCK_BIT_IO_BUSY, /* bios are in flight */
BLOCK_BIT_ERROR, /* saw IO error */
BLOCK_BIT_DELETED, /* has been deleted from rbtree */
BLOCK_BIT_PAGE_ALLOC, /* page (possibly high order) allocation */
@@ -354,6 +355,12 @@ static void block_remove(struct super_block *sb, struct block_private *bp)
}
}
static bool io_busy(struct block_private *bp)
{
smp_rmb(); /* test after adding to wait queue */
return test_bit(BLOCK_BIT_IO_BUSY, &bp->bits);
}
/*
* Called during shutdown with no other users.
*/
@@ -366,7 +373,7 @@ static void block_remove_all(struct super_block *sb)
for (node = rb_first(&binf->root); node; ) {
bp = container_of(node, struct block_private, node);
node = rb_next(node);
wait_event(binf->waitq, atomic_read(&bp->io_count) == 0);
wait_event(binf->waitq, !io_busy(bp));
block_remove(sb, bp);
}
@@ -395,17 +402,19 @@ static void block_end_io(struct super_block *sb, int rw,
set_bit(BLOCK_BIT_ERROR, &bp->bits);
}
/* update bits before waiters see io_count == 0 */
if (atomic_read(&bp->io_count) == 1) {
if (is_read && !test_bit(BLOCK_BIT_ERROR, &bp->bits))
set_bit(BLOCK_BIT_UPTODATE, &bp->bits);
}
if (!atomic_dec_and_test(&bp->io_count))
return;
/* make sure bits are visible to woken */
smp_mb__after_atomic();
if (is_read && !test_bit(BLOCK_BIT_ERROR, &bp->bits))
set_bit(BLOCK_BIT_UPTODATE, &bp->bits);
/* then wake */
if (atomic_dec_and_test(&bp->io_count))
clear_bit(BLOCK_BIT_IO_BUSY, &bp->bits);
block_put(sb, bp);
/* make sure set and cleared bits are visible to woken */
smp_mb();
if (waitqueue_active(&binf->waitq))
wake_up(&binf->waitq);
}
@@ -414,11 +423,9 @@ static void block_bio_end_io(struct bio *bio, int err)
struct block_private *bp = bio->bi_private;
struct super_block *sb = bp->sb;
TRACE_BLOCK(end_io, bp);
block_end_io(sb, bio->bi_rw, bp, err);
bio_put(bio);
TRACE_BLOCK(end_io, bp);
block_put(sb, bp);
}
/*
@@ -441,6 +448,8 @@ static int block_submit_bio(struct super_block *sb, struct block_private *bp,
/* don't let racing end_io during submission think block is complete */
atomic_inc(&bp->io_count);
set_bit(BLOCK_BIT_IO_BUSY, &bp->bits);
atomic_inc(&bp->refcount);
blk_start_plug(&plug);
@@ -457,7 +466,6 @@ static int block_submit_bio(struct super_block *sb, struct block_private *bp,
bio->bi_end_io = block_bio_end_io;
bio->bi_private = bp;
atomic_inc(&bp->refcount);
atomic_inc(&bp->io_count);
TRACE_BLOCK(submit, bp);
@@ -732,7 +740,7 @@ int scoutfs_block_writer_write(struct super_block *sb,
list_for_each_entry(bp, &wri->dirty_list, dirty_entry) {
/* XXX should this be interruptible? */
wait_event(binf->waitq, atomic_read(&bp->io_count) == 0);
wait_event(binf->waitq, !io_busy(bp));
if (ret == 0 && test_bit(BLOCK_BIT_ERROR, &bp->bits)) {
clear_bit(BLOCK_BIT_ERROR, &bp->bits);
ret = -EIO;