diff --git a/kmod/src/block.c b/kmod/src/block.c index 7b199c9f..6d93e116 100644 --- a/kmod/src/block.c +++ b/kmod/src/block.c @@ -275,16 +275,15 @@ static const struct rhashtable_params block_ht_params = { /* * Insert a new block into the hash table. Once it is inserted in the - * hash table readers can start getting references. The caller only has - * its initial ref so inserted can't be set and there can be no other - * references. + * hash table readers can start getting references. The caller may have + * multiple refs but the block can't already be inserted. */ static int block_insert(struct super_block *sb, struct block_private *bp) { DECLARE_BLOCK_INFO(sb, binf); int ret; - WARN_ON_ONCE(atomic_read(&bp->refcount) != 1); + WARN_ON_ONCE(atomic_read(&bp->refcount) & BLOCK_REF_INSERTED); atomic_add(BLOCK_REF_INSERTED, &bp->refcount); ret = rhashtable_insert_fast(&binf->ht, &bp->ht_head, block_ht_params); @@ -520,6 +519,20 @@ static int block_submit_bio(struct super_block *sb, struct block_private *bp, return ret; } +static struct block_private *block_lookup(struct super_block *sb, u64 blkno) +{ + DECLARE_BLOCK_INFO(sb, binf); + struct block_private *bp; + + rcu_read_lock(); + bp = rhashtable_lookup(&binf->ht, &blkno, block_ht_params); + if (bp) + bp = block_get_if_inserted(bp); + rcu_read_unlock(); + + return bp; +} + /* * Return a reference to a cached block found in the hash table. If one * isn't found then we try and allocate and insert a new one. Its @@ -533,16 +546,11 @@ static int block_submit_bio(struct super_block *sb, struct block_private *bp, static struct block_private *block_lookup_create(struct super_block *sb, u64 blkno) { - DECLARE_BLOCK_INFO(sb, binf); struct block_private *bp; int ret; restart: - rcu_read_lock(); - bp = rhashtable_lookup(&binf->ht, &blkno, block_ht_params); - if (bp) - bp = block_get_if_inserted(bp); - rcu_read_unlock(); + bp = block_lookup(sb, blkno); /* drop failed reads that interrupted waiters abandoned */ if (bp && (test_bit(BLOCK_BIT_ERROR, &bp->bits) && @@ -581,24 +589,6 @@ out: return bp; } -/* - * Return a cached block or a newly allocated block whose contents are - * undefined. The caller is going to initialize the block contents. - */ -struct scoutfs_block *scoutfs_block_create(struct super_block *sb, u64 blkno) -{ - struct block_private *bp; - - bp = block_lookup_create(sb, blkno); - if (IS_ERR(bp)) - return ERR_CAST(bp); - - set_bit(BLOCK_BIT_UPTODATE, &bp->bits); - set_bit(BLOCK_BIT_CRC_VALID, &bp->bits); - - return &bp->bl; -} - static bool uptodate_or_error(struct block_private *bp) { smp_rmb(); /* test after adding to wait queue */ @@ -606,6 +596,11 @@ static bool uptodate_or_error(struct block_private *bp) test_bit(BLOCK_BIT_ERROR, &bp->bits); } +static bool block_is_dirty(struct block_private *bp) +{ + return test_bit(BLOCK_BIT_DIRTY, &bp->bits) != 0; +} + static struct block_private *block_read(struct super_block *sb, u64 blkno) { DECLARE_BLOCK_INFO(sb, binf); @@ -639,13 +634,18 @@ out: } /* - * Btree blocks don't have rigid cache consistency. We can be following - * block references into cached blocks that are now stale or can be - * following a stale root into blocks that have been overwritten. If we - * hit a block that looks stale we first invalidate the cache and retry, - * returning -ESTALE if it still looks wrong. The caller can retry the - * read from a more current root or decide that this is a persistent - * error. + * Read a referenced metadata block. + * + * The caller may be following a stale reference to a block location + * that has since been rewritten. We check that destination block + * header fields match the reference. If they don't, we return -ESTALE + * and the caller can chose to retry with newer references or return an + * error. -ESTALE can be a sign of block corruption when the refs are + * current. + * + * Once the caller has the cached block it won't be modified. A writer + * trying to dirty a block at that location will remove our existing + * block and insert a new block with modified headers. */ int scoutfs_block_read_ref(struct super_block *sb, struct scoutfs_block_ref *ref, u32 magic, struct scoutfs_block **bl_ret) @@ -682,7 +682,7 @@ retry: ret = 0; out: - if (ret == -ESTALE && !retried) { + if (ret == -ESTALE && !retried && !block_is_dirty(bp)) { retried = true; scoutfs_inc_counter(sb, block_cache_remove_stale); block_remove(sb, bp); @@ -736,11 +736,6 @@ static void block_mark_dirty(struct super_block *sb, struct scoutfs_block_writer } } -static bool block_is_dirty(struct block_private *bp) -{ - return test_bit(BLOCK_BIT_DIRTY, &bp->bits) != 0; -} - /* * Give the caller a dirty block that is pointed to by their ref. * @@ -754,6 +749,15 @@ static bool block_is_dirty(struct block_private *bp) * If a new blkno is allocated then the ref is updated and any existing * blkno is freed. * + * A newly allocated block that we insert into the cache and return + * might already have an old stale copy inserted in the cache and it + * might be actively in use by readers. Future readers may also try to + * read their old block from our newly allocated block. We always + * remove any existing blocks and insert our new block only after + * modifying its headers and marking it dirty. Readers will never have + * their blocks modified and they can always identify new mismatched + * cached blocks. + * * The dirty_blkno and ref_blkno arguments are used by the metadata * allocator to avoid recursing into itself. dirty_blkno provides the * blkno of the new dirty block to avoid calling _alloc_meta and @@ -768,6 +772,8 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_block *cow_bl = NULL; struct scoutfs_block *bl = NULL; + struct block_private *exist_bp = NULL; + struct block_private *cow_bp = NULL; struct block_private *bp = NULL; struct scoutfs_block_header *hdr; bool undo_alloc = false; @@ -775,6 +781,7 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, int ret; int err; + /* read existing referenced block, if any */ blkno = le64_to_cpu(ref->blkno); if (blkno) { ret = scoutfs_block_read_ref(sb, ref, magic, bl_ret); @@ -789,6 +796,7 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, } } + /* allocate new blkno if caller didn't give us one */ if (dirty_blkno == 0) { ret = scoutfs_alloc_meta(sb, alloc, wri, &dirty_blkno); if (ret < 0) @@ -796,12 +804,18 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, undo_alloc = true; } - cow_bl = scoutfs_block_create(sb, dirty_blkno); - if (IS_ERR(cow_bl)) { - ret = PTR_ERR(cow_bl); + /* allocate new cached block */ + cow_bp = block_alloc(sb, dirty_blkno); + if (cow_bp == NULL) { + ret = -ENOMEM; goto out; } + cow_bl = &cow_bp->bl; + set_bit(BLOCK_BIT_UPTODATE, &cow_bp->bits); + set_bit(BLOCK_BIT_CRC_VALID, &cow_bp->bits); + + /* free original referenced blkno, or give it to the caller to deal with */ if (ref_blkno) { *ref_blkno = blkno; } else if (blkno) { @@ -810,6 +824,7 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, goto out; } + /* copy original block contents, or initialize */ if (bl) memcpy(cow_bl->data, bl->data, SCOUTFS_BLOCK_LG_SIZE); else @@ -817,6 +832,8 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, scoutfs_block_put(sb, bl); bl = cow_bl; cow_bl = NULL; + bp = cow_bp; + cow_bp = NULL; hdr = bl->data; hdr->magic = cpu_to_le32(magic); @@ -827,10 +844,24 @@ int scoutfs_block_dirty_ref(struct super_block *sb, struct scoutfs_alloc *alloc, trace_scoutfs_block_dirty_ref(sb, le64_to_cpu(ref->blkno), le64_to_cpu(ref->seq), le64_to_cpu(hdr->blkno), le64_to_cpu(hdr->seq)); + /* mark the block dirty before it's visible */ + block_mark_dirty(sb, wri, bl); + + /* insert the new block, maybe removing any existing blocks */ + while ((ret = block_insert(sb, bp)) == -EEXIST) { + exist_bp = block_lookup(sb, dirty_blkno); + if (exist_bp) { + block_remove(sb, exist_bp); + block_put(sb, exist_bp); + } + } + if (ret < 0) + goto out; + + /* set the ref now that the block is visible */ ref->blkno = hdr->blkno; ref->seq = hdr->seq; - block_mark_dirty(sb, wri, bl); ret = 0; out: diff --git a/kmod/src/block.h b/kmod/src/block.h index 8278f3d3..93d88731 100644 --- a/kmod/src/block.h +++ b/kmod/src/block.h @@ -13,7 +13,6 @@ struct scoutfs_block { void *priv; }; -struct scoutfs_block *scoutfs_block_create(struct super_block *sb, u64 blkno); int scoutfs_block_read_ref(struct super_block *sb, struct scoutfs_block_ref *ref, u32 magic, struct scoutfs_block **bl_ret); void scoutfs_block_put(struct super_block *sb, struct scoutfs_block *bl);