scoutfs: throttle addition of level 0 segments

Writers can add level 0 segments much faster (~20x) than compaction can
compact them down into the lower levels.  Without a limit on the number
of level 0 segments item readind can try to read an extraordinary number
of level 0 segments and wedge the box nonreclaimable page allocations.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-06-21 20:47:11 -07:00
parent 9f545782fb
commit f7701177d2
3 changed files with 88 additions and 12 deletions

View File

@@ -45,9 +45,13 @@ struct manifest {
/* calculated on mount, const thereafter */
u64 level_limits[SCOUTFS_MANIFEST_MAX_LEVEL + 1];
unsigned long flags;
struct scoutfs_key_buf *compact_keys[SCOUTFS_MANIFEST_MAX_LEVEL + 1];
};
#define MANI_FLAG_LEVEL0_FULL (1 << 0)
#define DECLARE_MANIFEST(sb, name) \
struct manifest *name = SCOUTFS_SB(sb)->manifest
@@ -109,6 +113,46 @@ static bool cmp_range_ment(struct scoutfs_key_buf *key,
return scoutfs_key_compare_ranges(key, end, &first, &last);
}
/*
* Change the level count under the manifest lock. We then maintain a
* bit that can be tested outside the lock to determine if the caller
* should wait for level 0 segments to drain.
*/
static void add_level_count(struct super_block *sb, int level, s64 val)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
__le64 count;
int full;
le64_add_cpu(&super->manifest.level_counts[level], val);
if (level == 0) {
count = super->manifest.level_counts[level];
full = test_bit(MANI_FLAG_LEVEL0_FULL, &mani->flags);
if (count && !full)
set_bit(MANI_FLAG_LEVEL0_FULL, &mani->flags);
else if (!count && full)
clear_bit(MANI_FLAG_LEVEL0_FULL, &mani->flags);
}
}
/*
* Return whether or not level 0 segments are full. It's safe to use
* this as a wait_event condition because it doesn't block.
*
* Callers rely on on the spin locks in wait queues to synchronize
* testing this as a sleeping condition with addition to the wait queue
* and waking of the waitqueue.
*/
bool scoutfs_manifest_level0_full(struct super_block *sb)
{
DECLARE_MANIFEST(sb, mani);
return test_bit(MANI_FLAG_LEVEL0_FULL, &mani->flags);
}
/*
* Insert a new manifest entry in the ring. The ring allocates a new
* node for us and we fill it.
@@ -121,8 +165,6 @@ int scoutfs_manifest_add(struct super_block *sb,
u8 level)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_manifest_entry *ment;
struct scoutfs_key_buf ment_first;
struct scoutfs_key_buf ment_last;
@@ -154,7 +196,7 @@ int scoutfs_manifest_add(struct super_block *sb,
scoutfs_key_copy(&ment_last, last);
mani->nr_levels = max_t(u8, mani->nr_levels, level + 1);
le64_add_cpu(&super->manifest.level_counts[level], 1);
add_level_count(sb, level, 1);
return 0;
}
@@ -168,8 +210,6 @@ int scoutfs_manifest_add_ment(struct super_block *sb,
struct scoutfs_manifest_entry *add)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_manifest_entry *ment;
struct manifest_search_key skey;
struct scoutfs_key_buf first;
@@ -195,7 +235,7 @@ int scoutfs_manifest_add_ment(struct super_block *sb,
memcpy(ment, add, bytes);
mani->nr_levels = max_t(u8, mani->nr_levels, add->level + 1);
le64_add_cpu(&super->manifest.level_counts[add->level], 1);
add_level_count(sb, add->level, 1);
return 0;
}
@@ -229,8 +269,6 @@ int scoutfs_manifest_del(struct super_block *sb, struct scoutfs_key_buf *first,
u64 seq, u8 level)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_manifest_entry *ment;
struct manifest_search_key skey;
struct scoutfs_key_buf last;
@@ -248,7 +286,7 @@ int scoutfs_manifest_del(struct super_block *sb, struct scoutfs_key_buf *first,
le64_to_cpu(ment->seq), first, &last);
scoutfs_ring_delete(&mani->ring, ment);
le64_add_cpu(&super->manifest.level_counts[level], -1ULL);
add_level_count(sb, level, -1ULL);
return 0;
}

View File

@@ -45,6 +45,8 @@ int scoutfs_manifest_add_ment_ref(struct super_block *sb,
int scoutfs_manifest_next_compact(struct super_block *sb, void *data);
bool scoutfs_manifest_level0_full(struct super_block *sb);
int scoutfs_manifest_setup(struct super_block *sb);
void scoutfs_manifest_destroy(struct super_block *sb);

View File

@@ -84,6 +84,9 @@ struct net_info {
struct llist_head ring_commit_waiters;
struct work_struct ring_commit_work;
/* level 0 segment addition waits for it to clear */
wait_queue_head_t waitq;
/* server tracks seq use */
spinlock_t seq_lock;
struct list_head pending_seqs;
@@ -422,6 +425,20 @@ static struct send_buf *process_bulk_alloc(struct super_block *sb,void *req,
return sbuf;
}
/*
* This is new segments arriving. It needs to wait for level 0 to be
* free. It has relatively little visibility into the manifest, though.
* We don't want it to block holding commits because that'll stop
* manifest updates from emptying level 0.
*
* Maybe the easiest way is to protect the level counts with a seqlock,
* or whatever.
*/
/*
* The sender has written their level 0 segment and has given us its
* details. We wait for there to be room in level 0 before adding it.
*/
static struct send_buf *process_record_segment(struct super_block *sb,
void *req, int req_len)
{
@@ -443,9 +460,18 @@ static struct send_buf *process_record_segment(struct super_block *sb,
goto out;
}
retry:
down_read(&nti->ring_commit_rwsem);
scoutfs_manifest_lock(sb);
if (scoutfs_manifest_level0_full(sb)) {
scoutfs_manifest_unlock(sb);
up_read(&nti->ring_commit_rwsem);
/* XXX waits indefinitely? io errors? */
wait_event(nti->waitq, !scoutfs_manifest_level0_full(sb));
goto retry;
}
ret = scoutfs_manifest_add_ment(sb, ment);
scoutfs_manifest_unlock(sb);
@@ -1446,20 +1472,29 @@ int scoutfs_net_get_compaction(struct super_block *sb, void *curs)
* In the future we'd encode the manifest and segnos in requests sent to
* the server who'd update the manifest and allocator in request
* processing.
*
* As we finish a compaction we wait level0 writers if it opened up
* space in level 0.
*/
int scoutfs_net_finish_compaction(struct super_block *sb, void *curs,
void *list)
{
DECLARE_NET_INFO(sb, nti);
struct commit_waiter cw;
bool level0_was_full;
int ret;
down_read(&nti->ring_commit_rwsem);
ret = scoutfs_compact_commit(sb, curs, list);
level0_was_full = scoutfs_manifest_level0_full(sb);
if (ret == 0)
ret = scoutfs_compact_commit(sb, curs, list);
if (ret == 0) {
queue_commit_work(nti, &cw);
if (level0_was_full && !scoutfs_manifest_level0_full(sb))
wake_up(&nti->waitq);
}
up_read(&nti->ring_commit_rwsem);
if (ret == 0)
@@ -2150,6 +2185,7 @@ int scoutfs_net_setup(struct super_block *sb)
init_rwsem(&nti->ring_commit_rwsem);
init_llist_head(&nti->ring_commit_waiters);
INIT_WORK(&nti->ring_commit_work, scoutfs_net_ring_commit_func);
init_waitqueue_head(&nti->waitq);
spin_lock_init(&nti->seq_lock);
INIT_LIST_HEAD(&nti->pending_seqs);
INIT_LIST_HEAD(&nti->active_socks);