Merge pull request #82 from versity/zab/server_alloc_reservation

Zab/server alloc reservation
This commit is contained in:
Zach Brown
2022-04-01 17:36:22 -07:00
committed by GitHub
7 changed files with 355 additions and 101 deletions

View File

@@ -1318,6 +1318,17 @@ bool scoutfs_alloc_meta_low(struct super_block *sb,
return lo;
}
void scoutfs_alloc_meta_remaining(struct scoutfs_alloc *alloc, u32 *avail_total, u32 *freed_space)
{
unsigned int seq;
do {
seq = read_seqbegin(&alloc->seqlock);
*avail_total = le32_to_cpu(alloc->avail.first_nr);
*freed_space = list_block_space(alloc->freed.first_nr);
} while (read_seqretry(&alloc->seqlock, seq));
}
bool scoutfs_alloc_test_flag(struct super_block *sb,
struct scoutfs_alloc *alloc, u32 flag)
{

View File

@@ -158,6 +158,7 @@ int scoutfs_alloc_splice_list(struct super_block *sb,
bool scoutfs_alloc_meta_low(struct super_block *sb,
struct scoutfs_alloc *alloc, u32 nr);
void scoutfs_alloc_meta_remaining(struct scoutfs_alloc *alloc, u32 *avail_total, u32 *freed_space);
bool scoutfs_alloc_test_flag(struct super_block *sb,
struct scoutfs_alloc *alloc, u32 flag);

View File

@@ -2449,7 +2449,7 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_key *key,
struct scoutfs_btree_root *root, int alloc_low)
struct scoutfs_btree_root *root, int free_budget)
{
u64 blknos[SCOUTFS_BTREE_MAX_HEIGHT];
struct scoutfs_block *bl = NULL;
@@ -2459,11 +2459,15 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
struct scoutfs_avl_node *node;
struct scoutfs_avl_node *next;
struct scoutfs_key par_next;
int nr_freed = 0;
int nr_par;
int level;
int ret;
int i;
if (WARN_ON_ONCE(free_budget <= 0))
return -EINVAL;
if (WARN_ON_ONCE(root->height > ARRAY_SIZE(blknos)))
return -EIO; /* XXX corruption */
@@ -2538,8 +2542,7 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
while (node) {
/* make sure we can always free parents after leaves */
if (scoutfs_alloc_meta_low(sb, alloc,
alloc_low + nr_par + 1)) {
if ((nr_freed + 1 + nr_par) > free_budget) {
ret = 0;
goto out;
}
@@ -2553,6 +2556,7 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
le64_to_cpu(ref.blkno));
if (ret < 0)
goto out;
nr_freed++;
node = scoutfs_avl_next(&bt->item_root, node);
if (node) {
@@ -2568,6 +2572,7 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
blknos[i]);
ret = scoutfs_free_meta(sb, alloc, wri, blknos[i]);
BUG_ON(ret); /* checked meta low, freed should fit */
nr_freed++;
}
/* restart walk past the subtree we just freed */

View File

@@ -125,7 +125,7 @@ int scoutfs_btree_free_blocks(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_key *key,
struct scoutfs_btree_root *root, int alloc_low);
struct scoutfs_btree_root *root, int free_budget);
void scoutfs_btree_put_iref(struct scoutfs_btree_item_ref *iref);

View File

@@ -1843,6 +1843,53 @@ DEFINE_EVENT(scoutfs_server_client_count_class, scoutfs_server_client_down,
TP_ARGS(sb, rid, nr_clients)
);
DECLARE_EVENT_CLASS(scoutfs_server_commit_users_class,
TP_PROTO(struct super_block *sb, int holding, int applying, int nr_holders,
u32 avail_before, u32 freed_before, int exceeded),
TP_ARGS(sb, holding, applying, nr_holders, avail_before, freed_before, exceeded),
TP_STRUCT__entry(
SCSB_TRACE_FIELDS
__field(int, holding)
__field(int, applying)
__field(int, nr_holders)
__field(__u32, avail_before)
__field(__u32, freed_before)
__field(int, exceeded)
),
TP_fast_assign(
SCSB_TRACE_ASSIGN(sb);
__entry->holding = !!holding;
__entry->applying = !!applying;
__entry->nr_holders = nr_holders;
__entry->avail_before = avail_before;
__entry->freed_before = freed_before;
__entry->exceeded = !!exceeded;
),
TP_printk(SCSBF" holding %u applying %u nr %u avail_before %u freed_before %u exceeded %u",
SCSB_TRACE_ARGS, __entry->holding, __entry->applying, __entry->nr_holders,
__entry->avail_before, __entry->freed_before, __entry->exceeded)
);
DEFINE_EVENT(scoutfs_server_commit_users_class, scoutfs_server_commit_hold,
TP_PROTO(struct super_block *sb, int holding, int applying, int nr_holders,
u32 avail_before, u32 freed_before, int exceeded),
TP_ARGS(sb, holding, applying, nr_holders, avail_before, freed_before, exceeded)
);
DEFINE_EVENT(scoutfs_server_commit_users_class, scoutfs_server_commit_apply,
TP_PROTO(struct super_block *sb, int holding, int applying, int nr_holders,
u32 avail_before, u32 freed_before, int exceeded),
TP_ARGS(sb, holding, applying, nr_holders, avail_before, freed_before, exceeded)
);
DEFINE_EVENT(scoutfs_server_commit_users_class, scoutfs_server_commit_start,
TP_PROTO(struct super_block *sb, int holding, int applying, int nr_holders,
u32 avail_before, u32 freed_before, int exceeded),
TP_ARGS(sb, holding, applying, nr_holders, avail_before, freed_before, exceeded)
);
DEFINE_EVENT(scoutfs_server_commit_users_class, scoutfs_server_commit_end,
TP_PROTO(struct super_block *sb, int holding, int applying, int nr_holders,
u32 avail_before, u32 freed_before, int exceeded),
TP_ARGS(sb, holding, applying, nr_holders, avail_before, freed_before, exceeded)
);
#define slt_symbolic(mode) \
__print_symbolic(mode, \
{ SLT_CLIENT, "client" }, \

View File

@@ -52,6 +52,41 @@
* mount will become the leader and have less trouble.
*/
/*
* Tracks all the holders and commit work that are operating on server
* commits. It synchronizes holders modifying the blocks in the commit
* and the commit work writing dirty blocks that make up a consistent
* commit. It limits the number of active holders so that they don't
* fully consume the allocation resources prepared for a commit.
*/
struct commit_users {
wait_queue_head_t waitq;
spinlock_t lock;
struct list_head holding;
struct list_head applying;
unsigned int nr_holders;
u32 avail_before;
u32 freed_before;
bool exceeded;
};
static void init_commit_users(struct commit_users *cusers)
{
memset(cusers, 0, sizeof(struct commit_users));
init_waitqueue_head(&cusers->waitq);
spin_lock_init(&cusers->lock);
INIT_LIST_HEAD(&cusers->holding);
INIT_LIST_HEAD(&cusers->applying);
}
#define TRACE_COMMIT_USERS(sb, cusers, which) \
do { \
__typeof__(cusers) _cusers = (cusers); \
trace_scoutfs_server_commit_##which(sb, !list_empty(&_cusers->holding), \
!list_empty(&_cusers->applying), _cusers->nr_holders, _cusers->avail_before, \
_cusers->freed_before, _cusers->exceeded); \
} while (0)
struct server_info {
struct super_block *sb;
spinlock_t lock;
@@ -67,8 +102,7 @@ struct server_info {
atomic64_t seq_atomic;
/* request processing coordinates shared commits */
struct rw_semaphore commit_rwsem;
struct llist_head commit_waiters;
struct commit_users cusers;
struct work_struct commit_work;
struct list_head clients;
@@ -210,68 +244,223 @@ static void server_down(struct server_info *server)
cmpxchg(&server->status, was, SERVER_DOWN);
}
struct commit_waiter {
struct completion comp;
struct llist_node node;
/*
* The per-holder allocation block use budget balances batching
* efficiency and concurrency. We can easily have a few holders per
* client trying to make concurrent updates in a commit.
*/
#define COMMIT_HOLD_ALLOC_BUDGET 250
struct commit_hold {
struct list_head entry;
ktime_t start;
u32 avail;
u32 freed;
int ret;
bool exceeded;
};
#define COMMIT_HOLD(name) \
struct commit_hold name = { .entry = LIST_HEAD_INIT(name.entry) }
/*
* Hold the shared rwsem that lets multiple holders modify blocks in the
* current commit and prevents the commit worker from acquiring the
* exclusive write lock to write the commit.
*
* This is exported for server components isolated in their own files
* (lock_server) and which are not called directly by the server core
* (async timeout work).
* See if the currently active holders have, all together, consumed more
* allocation resources than they were allowed. We don't have
* per-holder allocation consumption tracking. The best we can do is
* flag all the current holders so that as they release we can see
* everyone involved in crossing the limit.
*/
void scoutfs_server_hold_commit(struct super_block *sb)
static void check_holder_budget(struct super_block *sb, struct server_info *server,
struct commit_users *cusers)
{
static bool exceeded_once = false;
struct commit_hold *hold;
struct timespec ts;
u32 avail_used;
u32 freed_used;
u32 avail_now;
u32 freed_now;
u32 budget;
assert_spin_locked(&cusers->lock);
if (cusers->exceeded || cusers->nr_holders == 0 || exceeded_once)
return;
scoutfs_alloc_meta_remaining(&server->alloc, &avail_now, &freed_now);
avail_used = cusers->avail_before - avail_now;
freed_used = cusers->freed_before - freed_now;
budget = cusers->nr_holders * COMMIT_HOLD_ALLOC_BUDGET;
if (avail_used <= budget && freed_used <= budget)
return;
exceeded_once = true;
cusers->exceeded = cusers->nr_holders;
scoutfs_err(sb, "%u holders exceeded alloc budget av: bef %u now %u, fr: bef %u now %u",
cusers->nr_holders, cusers->avail_before, avail_now,
cusers->freed_before, freed_now);
list_for_each_entry(hold, &cusers->holding, entry) {
ts = ktime_to_timespec(hold->start);
scoutfs_err(sb, "exceeding hold start %llu.%09llu av %u fr %u",
(u64)ts.tv_sec, (u64)ts.tv_nsec, hold->avail, hold->freed);
hold->exceeded = true;
}
}
/*
* We don't have per-holder consumption. We allow commit holders as
* long as the total budget of all the holders doesn't exceed the alloc
* resources that were available
*/
static bool commit_alloc_has_room(struct server_info *server, struct commit_users *cusers,
unsigned int more_holders)
{
u32 avail_before;
u32 freed_before;
u32 budget;
if (cusers->nr_holders > 0) {
avail_before = cusers->avail_before;
freed_before = cusers->freed_before;
} else {
scoutfs_alloc_meta_remaining(&server->alloc, &avail_before, &freed_before);
}
budget = (cusers->nr_holders + more_holders) * COMMIT_HOLD_ALLOC_BUDGET;
return avail_before >= budget && freed_before >= budget;
}
static bool hold_commit(struct super_block *sb, struct server_info *server,
struct commit_users *cusers, struct commit_hold *hold)
{
bool held = false;
spin_lock(&cusers->lock);
TRACE_COMMIT_USERS(sb, cusers, hold);
check_holder_budget(sb, server, cusers);
/* +2 for our additional hold and then for the final commit work the server does */
if (list_empty(&cusers->applying) && commit_alloc_has_room(server, cusers, 2)) {
scoutfs_alloc_meta_remaining(&server->alloc, &hold->avail, &hold->freed);
if (cusers->nr_holders == 0) {
cusers->avail_before = hold->avail;
cusers->freed_before = hold->freed;
cusers->exceeded = false;
}
hold->exceeded = false;
hold->start = ktime_get();
list_add_tail(&hold->entry, &cusers->holding);
cusers->nr_holders++;
held = true;
}
spin_unlock(&cusers->lock);
return held;
}
/*
* Hold the server commit so that we can make a consistent change to the
* dirty blocks in the commit. The commit won't be written while we
* hold it.
*/
static void server_hold_commit(struct super_block *sb, struct commit_hold *hold)
{
DECLARE_SERVER_INFO(sb, server);
struct commit_users *cusers = &server->cusers;
BUG_ON(!list_empty(&hold->entry));
scoutfs_inc_counter(sb, server_commit_hold);
down_read(&server->commit_rwsem);
wait_event(cusers->waitq, hold_commit(sb, server, cusers, hold));
}
/*
* This is called while holding the commit and returns once the commit
* is successfully written. Many holders can all wait for all holders
* to drain before their shared commit is applied and they're all woken.
*
* It's important to realize that our commit_waiter list node might be
* serviced by a currently executing commit work that is blocked waiting
* for the holders to release the commit_rwsem. This caller can return
* from wait_for_commit() while another future commit_work is still
* queued.
*
* This could queue delayed work but we're first trying to have batching
* work by having concurrent modification line up behind a commit in
* flight. Once the commit finishes it'll unlock and hopefully everyone
* will race to make their changes and they'll all be applied by the
* next commit after that.
*/
int scoutfs_server_apply_commit(struct super_block *sb, int err)
static int server_apply_commit(struct super_block *sb, struct commit_hold *hold, int err)
{
DECLARE_SERVER_INFO(sb, server);
struct commit_waiter cw;
struct commit_users *cusers = &server->cusers;
struct timespec ts;
bool start_commit;
spin_lock(&cusers->lock);
TRACE_COMMIT_USERS(sb, cusers, apply);
check_holder_budget(sb, server, cusers);
if (hold->exceeded) {
ts = ktime_to_timespec(hold->start);
scoutfs_err(sb, "exceeding hold start %llu.%09llu stack:",
(u64)ts.tv_sec, (u64)ts.tv_nsec);
dump_stack();
}
if (err == 0) {
cw.ret = 0;
init_completion(&cw.comp);
llist_add(&cw.node, &server->commit_waiters);
scoutfs_inc_counter(sb, server_commit_queue);
list_move_tail(&hold->entry, &cusers->applying);
} else {
list_del_init(&hold->entry);
hold->ret = err;
}
cusers->nr_holders--;
start_commit = cusers->nr_holders == 0 && !list_empty(&cusers->applying);
spin_unlock(&cusers->lock);
if (start_commit)
queue_work(server->wq, &server->commit_work);
}
up_read(&server->commit_rwsem);
wait_event(cusers->waitq, list_empty_careful(&hold->entry));
smp_rmb(); /* entry load before ret */
return hold->ret;
}
if (err == 0) {
wait_for_completion(&cw.comp);
err = cw.ret;
}
/*
* Start a commit from the commit work. We should only have been queued
* while a holder is waiting to apply after all active holders have
* finished.
*/
static int commit_start(struct super_block *sb, struct commit_users *cusers)
{
int ret = 0;
return err;
/* make sure holders held off once commit started */
spin_lock(&cusers->lock);
TRACE_COMMIT_USERS(sb, cusers, start);
if (WARN_ON_ONCE(list_empty(&cusers->applying) || cusers->nr_holders != 0))
ret = -EINVAL;
spin_unlock(&cusers->lock);
return ret;
}
/*
* Finish a commit from the commit work. Give the result to all the
* holders who are waiting for the commit to be applied.
*/
static void commit_end(struct super_block *sb, struct commit_users *cusers, int ret)
{
struct commit_hold *hold;
struct commit_hold *tmp;
spin_lock(&cusers->lock);
TRACE_COMMIT_USERS(sb, cusers, end);
list_for_each_entry(hold, &cusers->applying, entry)
hold->ret = ret;
smp_wmb(); /* ret stores before list updates */
list_for_each_entry_safe(hold, tmp, &cusers->applying, entry)
list_del_init(&hold->entry);
spin_unlock(&cusers->lock);
wake_up(&cusers->waitq);
}
static void get_roots(struct super_block *sb,
@@ -333,19 +522,17 @@ static void set_roots(struct server_info *server,
* Concurrent request processing dirties blocks in a commit and makes
* the modifications persistent before replying. We'd like to batch
* these commits as much as is reasonable so that we don't degrade to a
* few IO round trips per request.
* few synchronous IOs per request.
*
* Getting that batching right is bound up in the concurrency of request
* processing so a clear way to implement the batched commits is to
* implement commits with a single pending work func like the
* processing.
* implement commits with a single pending work func.
*
* Processing paths acquire the rwsem for reading while they're making
* multiple dependent changes. When they're done and want it persistent
* they add themselves to the list of waiters and queue the commit work.
* This work runs, acquires the lock to exclude other writers, and
* performs the commit. Readers can run concurrently with these
* commits.
* Processing paths hold the commit while they're making multiple
* dependent changes. When they're done and want it persistent they add
* queue the commit work. This work runs, performs the commit, and
* wakes all the applying waiters with the result. Readers can run
* concurrently with these commits.
*/
static void scoutfs_server_commit_func(struct work_struct *work)
{
@@ -353,15 +540,15 @@ static void scoutfs_server_commit_func(struct work_struct *work)
commit_work);
struct super_block *sb = server->sb;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct commit_waiter *cw;
struct commit_waiter *pos;
struct llist_node *node;
struct commit_users *cusers = &server->cusers;
int ret;
trace_scoutfs_server_commit_work_enter(sb, 0, 0);
scoutfs_inc_counter(sb, server_commit_worker);
down_write(&server->commit_rwsem);
ret = commit_start(sb, cusers);
if (ret < 0)
goto out;
if (scoutfs_forcing_unmount(sb)) {
ret = -EIO;
@@ -438,15 +625,8 @@ static void scoutfs_server_commit_func(struct work_struct *work)
ret = 0;
out:
node = llist_del_all(&server->commit_waiters);
commit_end(sb, cusers, ret);
/* waiters always wait on completion, cw could be free after complete */
llist_for_each_entry_safe(cw, pos, node, node) {
cw->ret = ret;
complete(&cw->comp);
}
up_write(&server->commit_rwsem);
trace_scoutfs_server_commit_work_exit(sb, 0, ret);
}
@@ -457,6 +637,7 @@ static int server_alloc_inodes(struct super_block *sb,
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_net_inode_alloc ial = { 0, };
COMMIT_HOLD(hold);
__le64 lecount;
u64 ino;
u64 nr;
@@ -469,7 +650,7 @@ static int server_alloc_inodes(struct super_block *sb,
memcpy(&lecount, arg, arg_len);
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
spin_lock(&sbi->next_ino_lock);
ino = le64_to_cpu(super->next_ino);
@@ -477,7 +658,7 @@ static int server_alloc_inodes(struct super_block *sb,
le64_add_cpu(&super->next_ino, nr);
spin_unlock(&sbi->next_ino_lock);
ret = scoutfs_server_apply_commit(sb, 0);
ret = server_apply_commit(sb, &hold, 0);
if (ret == 0) {
ial.ino = cpu_to_le64(ino);
ial.nr = cpu_to_le64(nr);
@@ -855,7 +1036,7 @@ static int next_log_merge_item(struct super_block *sb,
#define FINALIZE_POLL_MS (11)
#define FINALIZE_TIMEOUT_MS (MSEC_PER_SEC / 2)
static int finalize_and_start_log_merge(struct super_block *sb, struct scoutfs_log_trees *lt,
u64 rid)
u64 rid, struct commit_hold *hold)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
@@ -981,13 +1162,13 @@ static int finalize_and_start_log_merge(struct super_block *sb, struct scoutfs_l
/* wait a bit for mounts to arrive */
if (others_active) {
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, 0);
ret = server_apply_commit(sb, hold, 0);
if (ret < 0)
err_str = "applying commit before waiting for finalized";
msleep(FINALIZE_POLL_MS);
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, hold);
mutex_lock(&server->logs_mutex);
/* done if we timed out */
@@ -1080,6 +1261,7 @@ static int server_get_log_trees(struct super_block *sb,
struct scoutfs_log_trees lt;
struct scoutfs_key key;
bool unlock_alloc = false;
COMMIT_HOLD(hold);
u64 data_zone_blocks;
char *err_str = NULL;
u64 nr;
@@ -1090,7 +1272,7 @@ static int server_get_log_trees(struct super_block *sb,
goto out;
}
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->logs_mutex);
@@ -1128,7 +1310,7 @@ static int server_get_log_trees(struct super_block *sb,
}
/* drops and re-acquires the mutex and commit if it has to wait */
ret = finalize_and_start_log_merge(sb, &lt, rid);
ret = finalize_and_start_log_merge(sb, &lt, rid, &hold);
if (ret < 0)
goto unlock;
@@ -1223,7 +1405,7 @@ unlock:
mutex_unlock(&server->alloc_mutex);
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
out:
if (ret < 0)
scoutfs_err(sb, "error %d getting log trees for rid %016llx: %s",
@@ -1249,6 +1431,7 @@ static int server_commit_log_trees(struct super_block *sb,
struct scoutfs_log_trees *exist;
struct scoutfs_log_trees lt;
struct scoutfs_key key;
COMMIT_HOLD(hold);
char *err_str = NULL;
bool committed = false;
int ret;
@@ -1267,7 +1450,7 @@ static int server_commit_log_trees(struct super_block *sb,
goto out;
}
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->logs_mutex);
@@ -1316,7 +1499,7 @@ static int server_commit_log_trees(struct super_block *sb,
unlock:
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
if (ret < 0)
scoutfs_err(sb, "server error %d committing client logs for rid %016llx: %s",
ret, rid, err_str);
@@ -1625,6 +1808,7 @@ static int server_srch_get_compact(struct super_block *sb,
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_srch_compact *sc = NULL;
COMMIT_HOLD(hold);
int ret;
if (arg_len != 0) {
@@ -1638,7 +1822,7 @@ static int server_srch_get_compact(struct super_block *sb,
goto out;
}
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->srch_mutex);
ret = scoutfs_srch_get_compact(sb, &server->alloc, &server->wri,
@@ -1666,7 +1850,7 @@ static int server_srch_get_compact(struct super_block *sb,
mutex_unlock(&server->srch_mutex);
apply:
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
WARN_ON_ONCE(ret < 0 && ret != -ENOENT); /* XXX leaked busy item */
out:
ret = scoutfs_net_response(sb, conn, cmd, id, ret,
@@ -1692,6 +1876,7 @@ static int server_srch_commit_compact(struct super_block *sb,
struct scoutfs_srch_compact *sc;
struct scoutfs_alloc_list_head av;
struct scoutfs_alloc_list_head fr;
COMMIT_HOLD(hold);
int ret;
if (arg_len != sizeof(struct scoutfs_srch_compact)) {
@@ -1700,7 +1885,7 @@ static int server_srch_commit_compact(struct super_block *sb,
}
sc = arg;
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->srch_mutex);
ret = scoutfs_srch_commit_compact(sb, &server->alloc, &server->wri,
@@ -1718,7 +1903,7 @@ static int server_srch_commit_compact(struct super_block *sb,
server->other_freed, &fr);
mutex_unlock(&server->alloc_mutex);
apply:
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
out:
WARN_ON(ret < 0); /* XXX leaks allocators */
return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0);
@@ -2083,13 +2268,14 @@ static void server_log_merge_free_work(struct work_struct *work)
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_log_merge_freeing fr;
struct scoutfs_key key;
COMMIT_HOLD(hold);
char *err_str = NULL;
bool commit = false;
int ret = 0;
while (!server_is_stopping(server)) {
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->logs_mutex);
commit = true;
@@ -2119,7 +2305,7 @@ static void server_log_merge_free_work(struct work_struct *work)
ret = scoutfs_btree_free_blocks(sb, &server->alloc,
&server->wri, &fr.key,
&fr.root, 10);
&fr.root, COMMIT_HOLD_ALLOC_BUDGET / 2);
if (ret < 0) {
err_str = "freeing log btree";
break;
@@ -2139,7 +2325,7 @@ static void server_log_merge_free_work(struct work_struct *work)
BUG_ON(ret < 0);
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
commit = false;
if (ret < 0) {
err_str = "looping commit del/upd freeing item";
@@ -2149,7 +2335,7 @@ static void server_log_merge_free_work(struct work_struct *work)
if (commit) {
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
if (ret < 0)
err_str = "final commit del/upd freeing item";
}
@@ -2181,6 +2367,7 @@ static int server_get_log_merge(struct super_block *sb,
struct scoutfs_key par_end;
struct scoutfs_key next_key;
struct scoutfs_key key;
COMMIT_HOLD(hold);
char *err_str = NULL;
bool ins_rng;
bool del_remain;
@@ -2194,7 +2381,7 @@ static int server_get_log_merge(struct super_block *sb,
if (arg_len != 0)
return -EINVAL;
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->logs_mutex);
restart:
@@ -2437,7 +2624,7 @@ out:
}
mutex_unlock(&server->logs_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
return scoutfs_net_response(sb, conn, cmd, id, ret, &req, sizeof(req));
}
@@ -2461,6 +2648,7 @@ static int server_commit_log_merge(struct super_block *sb,
struct scoutfs_log_merge_status stat;
struct scoutfs_log_merge_range rng;
struct scoutfs_key key;
COMMIT_HOLD(hold);
char *err_str = NULL;
bool deleted = false;
int ret = 0;
@@ -2478,7 +2666,7 @@ static int server_commit_log_merge(struct super_block *sb,
le64_to_cpu(comp->seq),
le64_to_cpu(comp->flags));
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->logs_mutex);
/* find the status of the current log merge */
@@ -2571,7 +2759,7 @@ out:
if (ret < 0 && err_str)
scoutfs_err(sb, "error %d committing log merge: %s", ret, err_str);
err = scoutfs_server_apply_commit(sb, ret);
err = server_apply_commit(sb, &hold, ret);
BUG_ON(ret < 0 && deleted); /* inconsistent */
if (ret == 0)
@@ -2691,6 +2879,7 @@ static int server_set_volopt(struct super_block *sb, struct scoutfs_net_connecti
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_volume_options *volopt;
COMMIT_HOLD(hold);
u64 opt;
u64 nr;
int ret = 0;
@@ -2708,7 +2897,7 @@ static int server_set_volopt(struct super_block *sb, struct scoutfs_net_connecti
mutex_lock(&server->volopt_mutex);
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
if (le64_to_cpu(volopt->set_bits) & SCOUTFS_VOLOPT_DATA_ALLOC_ZONE_BLOCKS_BIT) {
opt = le64_to_cpu(volopt->data_alloc_zone_blocks);
@@ -2739,7 +2928,7 @@ static int server_set_volopt(struct super_block *sb, struct scoutfs_net_connecti
}
apply:
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
write_seqcount_begin(&server->volopt_seqcount);
if (ret == 0)
@@ -2759,6 +2948,7 @@ static int server_clear_volopt(struct super_block *sb, struct scoutfs_net_connec
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_volume_options *volopt;
COMMIT_HOLD(hold);
__le64 *opt;
u64 bit;
int ret = 0;
@@ -2777,7 +2967,7 @@ static int server_clear_volopt(struct super_block *sb, struct scoutfs_net_connec
mutex_lock(&server->volopt_mutex);
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
for (i = 0, bit = 1, opt = first_valopt(&super->volopt); i < 64; i++, bit <<= 1, opt++) {
if (le64_to_cpu(volopt->set_bits) & bit) {
@@ -2786,7 +2976,7 @@ static int server_clear_volopt(struct super_block *sb, struct scoutfs_net_connec
}
}
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
write_seqcount_begin(&server->volopt_seqcount);
if (ret == 0)
@@ -2812,6 +3002,7 @@ static int server_resize_devices(struct super_block *sb, struct scoutfs_net_conn
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_net_resize_devices *nrd;
COMMIT_HOLD(hold);
u64 meta_tot;
u64 meta_start;
u64 meta_len;
@@ -2830,7 +3021,7 @@ static int server_resize_devices(struct super_block *sb, struct scoutfs_net_conn
meta_tot = le64_to_cpu(nrd->new_total_meta_blocks);
data_tot = le64_to_cpu(nrd->new_total_data_blocks);
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
mutex_lock(&server->alloc_mutex);
if (meta_tot == le64_to_cpu(super->total_meta_blocks))
@@ -2892,7 +3083,7 @@ static int server_resize_devices(struct super_block *sb, struct scoutfs_net_conn
ret = 0;
unlock:
mutex_unlock(&server->alloc_mutex);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
out:
return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0);
};
@@ -3246,6 +3437,7 @@ static int server_greeting(struct super_block *sb,
struct scoutfs_net_greeting *gr = arg;
struct scoutfs_net_greeting greet;
DECLARE_SERVER_INFO(sb, server);
COMMIT_HOLD(hold);
bool reconnecting;
bool first_contact;
bool farewell;
@@ -3273,12 +3465,12 @@ static int server_greeting(struct super_block *sb,
}
if (gr->server_term == 0) {
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
ret = insert_mounted_client(sb, le64_to_cpu(gr->rid), le64_to_cpu(gr->flags),
&conn->peername);
ret = scoutfs_server_apply_commit(sb, ret);
ret = server_apply_commit(sb, &hold, ret);
queue_work(server->wq, &server->farewell_work);
if (ret < 0)
goto send_err;
@@ -3344,9 +3536,10 @@ struct farewell_request {
*/
static int reclaim_rid(struct super_block *sb, u64 rid)
{
COMMIT_HOLD(hold);
int ret;
scoutfs_server_hold_commit(sb);
server_hold_commit(sb, &hold);
/* delete mounted client last, recovery looks for it */
ret = scoutfs_lock_server_farewell(sb, rid) ?:
@@ -3356,7 +3549,7 @@ static int reclaim_rid(struct super_block *sb, u64 rid)
scoutfs_omap_remove_rid(sb, rid) ?:
delete_mounted_client(sb, rid);
return scoutfs_server_apply_commit(sb, ret);
return server_apply_commit(sb, &hold, ret);
}
/*
@@ -4062,8 +4255,7 @@ int scoutfs_server_setup(struct super_block *sb)
init_waitqueue_head(&server->waitq);
INIT_WORK(&server->work, scoutfs_server_worker);
server->status = SERVER_DOWN;
init_rwsem(&server->commit_rwsem);
init_llist_head(&server->commit_waiters);
init_commit_users(&server->cusers);
INIT_WORK(&server->commit_work, scoutfs_server_commit_func);
INIT_LIST_HEAD(&server->clients);
spin_lock_init(&server->farewell_lock);

View File

@@ -64,8 +64,6 @@ int scoutfs_server_lock_response(struct super_block *sb, u64 rid, u64 id,
struct scoutfs_net_lock *nl);
int scoutfs_server_lock_recover_request(struct super_block *sb, u64 rid,
struct scoutfs_key *key);
void scoutfs_server_hold_commit(struct super_block *sb);
int scoutfs_server_apply_commit(struct super_block *sb, int err);
void scoutfs_server_recov_finish(struct super_block *sb, u64 rid, int which);
int scoutfs_server_send_omap_request(struct super_block *sb, u64 rid,