diff --git a/kmod/src/counters.h b/kmod/src/counters.h index 94d175c2..9adc1ea2 100644 --- a/kmod/src/counters.h +++ b/kmod/src/counters.h @@ -113,6 +113,7 @@ EXPAND_COUNTER(net_unknown_message) \ EXPAND_COUNTER(net_unknown_request) \ EXPAND_COUNTER(seg_alloc) \ + EXPAND_COUNTER(seg_csum_error) \ EXPAND_COUNTER(seg_free) \ EXPAND_COUNTER(seg_shrink) \ EXPAND_COUNTER(seg_stale_read) \ diff --git a/kmod/src/format.h b/kmod/src/format.h index 6f2d46f8..5a01f57b 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -271,6 +271,9 @@ struct scoutfs_segment_item { /* * Each large segment starts with a segment block that describes the * rest of the blocks that make up the segment. + * + * The crc covers the initial total_bytes of the segment but starts + * after the padding. */ struct scoutfs_segment_block { __le32 crc; diff --git a/kmod/src/seg.c b/kmod/src/seg.c index a69a226d..3b4b33a0 100644 --- a/kmod/src/seg.c +++ b/kmod/src/seg.c @@ -15,6 +15,7 @@ #include #include #include +#include #include "super.h" #include "format.h" @@ -55,6 +56,9 @@ struct segment_cache { enum { SF_END_IO = 0, + SF_CALC_CRC_STARTED, + SF_CALC_CRC_DONE, + SF_INVALID_CRC, }; static void *off_ptr(struct scoutfs_segment *seg, u32 off) @@ -161,6 +165,26 @@ static void lru_check(struct segment_cache *cac, struct scoutfs_segment *seg) } } +static __le32 calc_seg_crc(struct scoutfs_segment *seg) +{ + u32 total = scoutfs_seg_total_bytes(seg); + u32 crc = ~0; + u32 off; + u32 len; + + off = offsetof(struct scoutfs_segment_block, _padding) + + FIELD_SIZEOF(struct scoutfs_segment_block, _padding); + + while (off < total) { + len = min(total - off, + SCOUTFS_BLOCK_SIZE - (off & SCOUTFS_BLOCK_MASK)); + crc = crc32c(crc, off_ptr(seg, off), len); + off += len; + } + + return cpu_to_le32(crc); +} + /* * This always inserts the segment into the rbtree. If there's already * a segment at the given seg then it is removed and returned. The @@ -346,12 +370,20 @@ struct scoutfs_segment *scoutfs_seg_submit_read(struct super_block *sb, return seg; } +/* + * The caller has ensured that the segment won't be modified while + * it is in flight. + */ int scoutfs_seg_submit_write(struct super_block *sb, struct scoutfs_segment *seg, struct scoutfs_bio_completion *comp) { + struct scoutfs_segment_block *sblk = off_ptr(seg, 0); + trace_scoutfs_seg_submit_write(sb, seg->segno); + sblk->crc = calc_seg_crc(seg); + scoutfs_bio_submit_comp(sb, WRITE, seg->pages, segno_to_blkno(seg->segno), SCOUTFS_SEGMENT_BLOCKS, comp); @@ -360,8 +392,7 @@ int scoutfs_seg_submit_write(struct super_block *sb, } /* - * Wait for IO on the segment to complete. In the cached read fast path - * the bit is already set by the reads that populated the cache. + * Wait for IO on the segment to complete. * * The caller provides the segno and seq from their segment reference to * validate that we found the version of the segment that they were @@ -370,8 +401,9 @@ int scoutfs_seg_submit_write(struct super_block *sb, * its operation. (Typically by getting a new manifest btree root and * searching for keys in the manifest.) * - * XXX drop stale segments from the cache - * XXX none of the callers perform that retry today. + * An invalid crc can be racing to read a stale segment while it's being + * written. The caller will retry and consider it corrupt if it keeps + * getting stale reads. */ int scoutfs_seg_wait(struct super_block *sb, struct scoutfs_segment *seg, u64 segno, u64 seq) @@ -393,10 +425,28 @@ int scoutfs_seg_wait(struct super_block *sb, struct scoutfs_segment *seg, goto out; } + /* calc crc in waiting task instead of end_io */ + if (!test_bit(SF_CALC_CRC_DONE, &seg->flags) && + !test_and_set_bit(SF_CALC_CRC_STARTED, &seg->flags)) { + if (sblk->crc != calc_seg_crc(seg)) { + scoutfs_inc_counter(sb, seg_csum_error); + set_bit(SF_INVALID_CRC, &seg->flags); + } + set_bit(SF_CALC_CRC_DONE, &seg->flags); + wake_up(&cac->waitq); + } + + /* very rarely race waiting for calc to finish */ + ret = wait_event_interruptible(cac->waitq, + test_bit(SF_CALC_CRC_DONE, &seg->flags)); + if (ret) + goto out; + sblk = off_ptr(seg, 0); - if (WARN_ON_ONCE(segno != le64_to_cpu(sblk->segno)) || - WARN_ON_ONCE(seq != le64_to_cpu(sblk->seq)) || + if (test_bit(SF_INVALID_CRC, &seg->flags) || + segno != le64_to_cpu(sblk->segno) || + seq != le64_to_cpu(sblk->seq) || scoutfs_trigger(sb, SEG_STALE_READ)) { spin_lock_irqsave(&cac->lock, flags); erased = erase_seg(cac, seg);