Compare commits

..

2 Commits

Author SHA1 Message Date
Zach Brown
8efb30afbc No i_mutex in aio_read for data_wait_check
As a regular file reader first acquires a cluster lock it checks the
file's extents to see if the region it is reading contains offline
extents.

When this was written the extent operations didn't have their own
internal locking.  It was up to callers to use vfs mechanisms to
serialize readers and writers.  The aio_read data_wait_check extent
caller tried to use dio_count and an i_mutex acquisition to ensure that
our ioctls wouldn't modify extents.

This creates a bad inversion between the vfs i_mutex and our cluster
inode lock.  There are lots of fs methods which are called by the vfs
with i_mutex held which acquire a lock.  This read case was holding a
cluster lock and then acquiring i_mutex.

Since the data waiting was written the file data extent operations have
added their own extent_sem to protect the extent items from concurrent
callers.  We can rely on that internal locking and drop the bad i_mutex
use which causes the inversion.

Not surprisingly, this was trivial to deadlock by racing simple
utilities like cat and touch.

Signed-off-by: Zach Brown <zab@versity.com>
2021-04-27 11:20:40 -07:00
Zach Brown
df90b3eb90 Add export-lookup-evict-race test
Add a test that creates races between fh_to_dentry and eviction
triggered by lock invalidation.

Signed-off-by: Zach Brown <zab@versity.com>
2021-04-27 11:20:40 -07:00
13 changed files with 192 additions and 125 deletions

View File

@@ -112,6 +112,9 @@
EXPAND_COUNTER(item_write_dirty) \
EXPAND_COUNTER(lock_alloc) \
EXPAND_COUNTER(lock_free) \
EXPAND_COUNTER(lock_grace_extended) \
EXPAND_COUNTER(lock_grace_set) \
EXPAND_COUNTER(lock_grace_wait) \
EXPAND_COUNTER(lock_grant_request) \
EXPAND_COUNTER(lock_grant_response) \
EXPAND_COUNTER(lock_grant_work) \

View File

@@ -757,9 +757,9 @@ enum scoutfs_dentry_type {
DIV_ROUND_UP(sizeof(struct scoutfs_xattr) + name_len + val_len, \
(unsigned int)SCOUTFS_XATTR_MAX_PART_SIZE)
#define SCOUTFS_LOCK_INODE_GROUP_NR 128
#define SCOUTFS_LOCK_INODE_GROUP_NR 1024
#define SCOUTFS_LOCK_INODE_GROUP_MASK (SCOUTFS_LOCK_INODE_GROUP_NR - 1)
#define SCOUTFS_LOCK_SEQ_GROUP_MASK ((1ULL << 7) - 1)
#define SCOUTFS_LOCK_SEQ_GROUP_MASK ((1ULL << 10) - 1)
/*
* messages over the wire.
@@ -909,7 +909,6 @@ enum scoutfs_lock_trace {
SLT_INVALIDATE,
SLT_REQUEST,
SLT_RESPONSE,
SLT_NR,
};
/*
@@ -962,7 +961,7 @@ enum scoutfs_corruption_sources {
#define SC_NR_LONGS DIV_ROUND_UP(SC_NR_SOURCES, BITS_PER_LONG)
#define SCOUTFS_OPEN_INO_MAP_SHIFT 7
#define SCOUTFS_OPEN_INO_MAP_SHIFT 10
#define SCOUTFS_OPEN_INO_MAP_BITS (1 << SCOUTFS_OPEN_INO_MAP_SHIFT)
#define SCOUTFS_OPEN_INO_MAP_MASK (SCOUTFS_OPEN_INO_MAP_BITS - 1)
#define SCOUTFS_OPEN_INO_MAP_LE64S (SCOUTFS_OPEN_INO_MAP_BITS / 64)

View File

@@ -648,22 +648,12 @@ void scoutfs_inode_get_onoff(struct inode *inode, s64 *on, s64 *off)
} while (read_seqcount_retry(&si->seqcount, seq));
}
/*
* We have inversions between getting cluster locks while performing
* final deletion on a freeing inode and waiting on a freeing inode
* while holding a cluster lock.
*
* We can avoid these deadlocks by hiding freeing inodes in our hash
* lookup function. We're fine with either returning null or populating
* a new inode overlapping with eviction freeing a previous instance of
* the inode.
*/
static int scoutfs_iget_test(struct inode *inode, void *arg)
{
struct scoutfs_inode_info *si = SCOUTFS_I(inode);
u64 *ino = arg;
return (si->ino == *ino) && !(inode->i_state & I_FREEING);
return si->ino == *ino;
}
static int scoutfs_iget_set(struct inode *inode, void *arg)
@@ -682,6 +672,28 @@ struct inode *scoutfs_ilookup(struct super_block *sb, u64 ino)
return ilookup5(sb, ino, scoutfs_iget_test, &ino);
}
static int iget_test_nofreeing(struct inode *inode, void *arg)
{
return !(inode->i_state & I_FREEING) && scoutfs_iget_test(inode, arg);
}
/*
* There's a natural risk of a deadlock between lock invalidation and
* eviction. Invalidation blocks locks while looking up inodes and
* invalidating local caches. Inode eviction gets a lock to check final
* inode deletion while the inode is marked FREEING which blocks
* lookups.
*
* We have a lookup variant which doesn't return I_FREEING inodes
* instead of waiting on them. If an inode has made it to I_FREEING
* then it doesn't have any local caches that are reachable and the lock
* invalidation promise is kept.
*/
struct inode *scoutfs_ilookup_nofreeing(struct super_block *sb, u64 ino)
{
return ilookup5(sb, ino, iget_test_nofreeing, &ino);
}
struct inode *scoutfs_iget(struct super_block *sb, u64 ino)
{
struct scoutfs_lock *lock = NULL;

View File

@@ -79,6 +79,7 @@ int scoutfs_orphan_inode(struct inode *inode);
struct inode *scoutfs_iget(struct super_block *sb, u64 ino);
struct inode *scoutfs_ilookup(struct super_block *sb, u64 ino);
struct inode *scoutfs_ilookup_nofreeing(struct super_block *sb, u64 ino);
void scoutfs_inode_init_index_key(struct scoutfs_key *key, u8 type, u64 major,
u32 minor, u64 ino);

View File

@@ -66,6 +66,8 @@
* relative to that lock state we resend.
*/
#define GRACE_PERIOD_KT ms_to_ktime(10)
/*
* allocated per-super, freed on unmount.
*/
@@ -82,7 +84,7 @@ struct lock_info {
struct workqueue_struct *workq;
struct work_struct grant_work;
struct list_head grant_list;
struct work_struct inv_work;
struct delayed_work inv_dwork;
struct list_head inv_list;
struct work_struct shrink_work;
struct list_head shrink_list;
@@ -154,7 +156,9 @@ static void lock_inv_iput_worker(struct work_struct *work)
/*
* Invalidate cached data associated with an inode whose lock is going
* away.
* away. We ignore indoes with I_FREEING instead of waiting on them to
* avoid a deadlock, if they're freeing then they won't be visible to
* future lock users and we don't need to invalidate them.
*
* We try to drop cached dentries and inodes covered by the lock if they
* aren't referenced. This removes them from the mount's open map and
@@ -174,7 +178,7 @@ static void invalidate_inode(struct super_block *sb, u64 ino)
struct scoutfs_inode_info *si;
struct inode *inode;
inode = scoutfs_ilookup(sb, ino);
inode = scoutfs_ilookup_nofreeing(sb, ino);
if (inode) {
si = SCOUTFS_I(inode);
@@ -362,6 +366,23 @@ static bool lock_counts_match(int granted, unsigned int *counts)
return true;
}
/*
* Returns true if there are any mode counts that match with the desired
* mode. There can be other non-matching counts as well but we're only
* testing for the existence of any matching counts.
*/
static bool lock_count_match_exists(int desired, unsigned int *counts)
{
enum scoutfs_lock_mode mode;
for (mode = 0; mode < SCOUTFS_LOCK_NR_MODES; mode++) {
if (counts[mode] && lock_modes_match(desired, mode))
return true;
}
return false;
}
/*
* An idle lock has nothing going on. It can be present in the lru and
* can be freed by the final put when it has a null mode.
@@ -578,6 +599,24 @@ static void put_lock(struct lock_info *linfo,struct scoutfs_lock *lock)
}
}
/*
* Locks have a grace period that extends after activity and prevents
* invalidation. It's intended to let nodes do reasonable batches of
* work as locks ping pong between nodes that are doing conflicting
* work.
*/
static void extend_grace(struct super_block *sb, struct scoutfs_lock *lock)
{
ktime_t now = ktime_get();
if (ktime_after(now, lock->grace_deadline))
scoutfs_inc_counter(sb, lock_grace_set);
else
scoutfs_inc_counter(sb, lock_grace_extended);
lock->grace_deadline = ktime_add(now, GRACE_PERIOD_KT);
}
static void queue_grant_work(struct lock_info *linfo)
{
assert_spin_locked(&linfo->lock);
@@ -587,15 +626,19 @@ static void queue_grant_work(struct lock_info *linfo)
}
/*
* The caller has made a change (set a lock mode) which can let one of the
* invalidating locks make forward progress.
* We immediately queue work on the assumption that the caller might
* have made a change (set a lock mode) which can let one of the
* invalidating locks make forward progress, even if other locks are
* waiting for their grace period to elapse. It's a trade-off between
* invalidation latency and burning cpu repeatedly finding that locks
* are still in their grace period.
*/
static void queue_inv_work(struct lock_info *linfo)
{
assert_spin_locked(&linfo->lock);
if (!list_empty(&linfo->inv_list))
queue_work(linfo->workq, &linfo->inv_work);
mod_delayed_work(linfo->workq, &linfo->inv_dwork, 0);
}
/*
@@ -648,6 +691,15 @@ static void bug_on_inconsistent_grant_cache(struct super_block *sb,
* Grant responses can be reordered with incoming invalidation requests
* from the server so we have to be careful to only set the new mode
* once the old mode matches.
*
* We extend the grace period as we grant the lock if there is a waiting
* locker who can use the lock. This stops invalidation from pulling
* the granted lock out from under the requester, resulting in a lot of
* churn with no forward progress. Using the grace period avoids having
* to identify a specific waiter and give it an acquired lock. It's
* also very similar to waking up the locker and having it win the race
* against the invalidation. In that case they'd extend the grace
* period anyway as they unlock.
*/
static void lock_grant_worker(struct work_struct *work)
{
@@ -682,6 +734,9 @@ static void lock_grant_worker(struct work_struct *work)
lock->mode = nl->new_mode;
lock->write_version = le64_to_cpu(nl->write_version);
if (lock_count_match_exists(nl->new_mode, lock->waiters))
extend_grace(sb, lock);
trace_scoutfs_lock_granted(sb, lock);
list_del_init(&lock->grant_head);
wake_up(&lock->waitq);
@@ -745,6 +800,11 @@ int scoutfs_lock_grant_response(struct super_block *sb,
* invalidate once the lock mode matches what the server told us to
* invalidate.
*
* We delay invalidation processing until a grace period has elapsed
* since the last unlock. The intent is to let users do a reasonable
* batch of work before dropping the lock. Continuous unlocking can
* continuously extend the deadline.
*
* Before we start invalidating the lock we set the lock to the new
* mode, preventing further incompatible users of the old mode from
* using the lock while we're invalidating.
@@ -756,11 +816,15 @@ int scoutfs_lock_grant_response(struct super_block *sb,
*/
static void lock_invalidate_worker(struct work_struct *work)
{
struct lock_info *linfo = container_of(work, struct lock_info, inv_work);
struct lock_info *linfo = container_of(work, struct lock_info,
inv_dwork.work);
struct super_block *sb = linfo->sb;
struct scoutfs_net_lock *nl;
struct scoutfs_lock *lock;
struct scoutfs_lock *tmp;
unsigned long delay = MAX_JIFFY_OFFSET;
ktime_t now = ktime_get();
ktime_t deadline;
LIST_HEAD(ready);
u64 net_id;
int ret;
@@ -780,6 +844,15 @@ static void lock_invalidate_worker(struct work_struct *work)
if (!lock_counts_match(nl->new_mode, lock->users))
continue;
/* skip if grace hasn't elapsed, record earliest */
deadline = lock->grace_deadline;
if (!linfo->shutdown && ktime_before(now, deadline)) {
delay = min(delay,
nsecs_to_jiffies(ktime_to_ns(
ktime_sub(deadline, now))));
scoutfs_inc_counter(linfo->sb, lock_grace_wait);
continue;
}
/* set the new mode, no incompatible users during inval */
lock->mode = nl->new_mode;
@@ -790,7 +863,7 @@ static void lock_invalidate_worker(struct work_struct *work)
spin_unlock(&linfo->lock);
if (list_empty(&ready))
return;
goto out;
/* invalidate once the lock is read */
list_for_each_entry(lock, &ready, inv_head) {
@@ -823,9 +896,6 @@ static void lock_invalidate_worker(struct work_struct *work)
list_del_init(&lock->inv_head);
lock->invalidate_pending = 0;
wake_up(&lock->waitq);
} else {
/* another request filled nl/net_id, put it back on the list */
list_move_tail(&lock->inv_head, &linfo->inv_list);
}
put_lock(linfo, lock);
}
@@ -833,6 +903,11 @@ static void lock_invalidate_worker(struct work_struct *work)
/* grant might have been waiting for invalidate request */
queue_grant_work(linfo);
spin_unlock(&linfo->lock);
out:
/* queue delayed work if invalidations waiting on grace deadline */
if (delay != MAX_JIFFY_OFFSET)
queue_delayed_work(linfo->workq, &linfo->inv_dwork, delay);
}
/*
@@ -1298,6 +1373,10 @@ int scoutfs_lock_rid(struct super_block *sb, enum scoutfs_lock_mode mode, int fl
return lock_key_range(sb, mode, flags, &start, &end, lock);
}
/*
* As we unlock we always extend the grace period to give the caller
* another pass at the lock before its invalidated.
*/
void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scoutfs_lock_mode mode)
{
DECLARE_LOCK_INFO(sb, linfo);
@@ -1310,6 +1389,7 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, enum scou
spin_lock(&linfo->lock);
lock_dec_count(lock->users, mode);
extend_grace(sb, lock);
if (lock_mode_can_write(mode))
lock->dirty_trans_seq = scoutfs_trans_sample_seq(sb);
@@ -1549,7 +1629,7 @@ void scoutfs_lock_unmount_begin(struct super_block *sb)
if (linfo) {
linfo->unmounting = true;
flush_work(&linfo->inv_work);
flush_delayed_work(&linfo->inv_dwork);
}
}
@@ -1642,6 +1722,8 @@ void scoutfs_lock_destroy(struct super_block *sb)
spin_unlock(&linfo->lock);
if (linfo->workq) {
/* pending grace work queues normal work */
flush_workqueue(linfo->workq);
/* now all work won't queue itself */
destroy_workqueue(linfo->workq);
}
@@ -1702,7 +1784,7 @@ int scoutfs_lock_setup(struct super_block *sb)
INIT_LIST_HEAD(&linfo->lru_list);
INIT_WORK(&linfo->grant_work, lock_grant_worker);
INIT_LIST_HEAD(&linfo->grant_list);
INIT_WORK(&linfo->inv_work, lock_invalidate_worker);
INIT_DELAYED_WORK(&linfo->inv_dwork, lock_invalidate_worker);
INIT_LIST_HEAD(&linfo->inv_list);
INIT_WORK(&linfo->shrink_work, lock_shrink_worker);
INIT_LIST_HEAD(&linfo->shrink_list);

View File

@@ -27,6 +27,7 @@ struct scoutfs_lock {
u64 dirty_trans_seq;
struct list_head lru_head;
wait_queue_head_t waitq;
ktime_t grace_deadline;
unsigned long request_pending:1,
invalidate_pending:1;

View File

@@ -78,8 +78,6 @@ struct lock_server_info {
struct scoutfs_tseq_tree tseq_tree;
struct dentry *tseq_dentry;
struct scoutfs_tseq_tree stats_tseq_tree;
struct dentry *stats_tseq_dentry;
struct scoutfs_alloc *alloc;
struct scoutfs_block_writer *wri;
@@ -111,9 +109,6 @@ struct server_lock_node {
struct list_head granted;
struct list_head requested;
struct list_head invalidated;
struct scoutfs_tseq_entry stats_tseq_entry;
u64 stats[SLT_NR];
};
/*
@@ -303,8 +298,6 @@ static struct server_lock_node *alloc_server_lock(struct lock_server_info *inf,
snode = get_server_lock(inf, key, ins, false);
if (snode != ins)
kfree(ins);
else
scoutfs_tseq_add(&inf->stats_tseq_tree, &snode->stats_tseq_entry);
}
}
@@ -334,10 +327,8 @@ static void put_server_lock(struct lock_server_info *inf,
mutex_unlock(&snode->mutex);
if (should_free) {
scoutfs_tseq_del(&inf->stats_tseq_tree, &snode->stats_tseq_entry);
if (should_free)
kfree(snode);
}
}
static struct client_lock_entry *find_entry(struct server_lock_node *snode,
@@ -399,8 +390,6 @@ int scoutfs_lock_server_request(struct super_block *sb, u64 rid,
goto out;
}
snode->stats[SLT_REQUEST]++;
clent->snode = snode;
add_client_entry(snode, &snode->requested, clent);
scoutfs_tseq_add(&inf->tseq_tree, &clent->tseq_entry);
@@ -441,8 +430,6 @@ int scoutfs_lock_server_response(struct super_block *sb, u64 rid,
goto out;
}
snode->stats[SLT_RESPONSE]++;
clent = find_entry(snode, &snode->invalidated, rid);
if (!clent) {
put_server_lock(inf, snode);
@@ -523,7 +510,6 @@ static int process_waiting_requests(struct super_block *sb,
trace_scoutfs_lock_message(sb, SLT_SERVER,
SLT_INVALIDATE, SLT_REQUEST,
gr->rid, 0, &nl);
snode->stats[SLT_INVALIDATE]++;
add_client_entry(snode, &snode->invalidated, gr);
}
@@ -558,7 +544,6 @@ static int process_waiting_requests(struct super_block *sb,
trace_scoutfs_lock_message(sb, SLT_SERVER, SLT_GRANT,
SLT_RESPONSE, req->rid,
req->net_id, &nl);
snode->stats[SLT_GRANT]++;
/* don't track null client locks, track all else */
if (req->mode == SCOUTFS_LOCK_NULL)
@@ -809,16 +794,6 @@ static void lock_server_tseq_show(struct seq_file *m,
clent->net_id);
}
static void stats_tseq_show(struct seq_file *m, struct scoutfs_tseq_entry *ent)
{
struct server_lock_node *snode = container_of(ent, struct server_lock_node,
stats_tseq_entry);
seq_printf(m, SK_FMT" req %llu inv %llu rsp %llu gr %llu\n",
SK_ARG(&snode->key), snode->stats[SLT_REQUEST], snode->stats[SLT_INVALIDATE],
snode->stats[SLT_RESPONSE], snode->stats[SLT_GRANT]);
}
/*
* Setup the lock server. This is called before networking can deliver
* requests.
@@ -838,7 +813,6 @@ int scoutfs_lock_server_setup(struct super_block *sb,
spin_lock_init(&inf->lock);
inf->locks_root = RB_ROOT;
scoutfs_tseq_tree_init(&inf->tseq_tree, lock_server_tseq_show);
scoutfs_tseq_tree_init(&inf->stats_tseq_tree, stats_tseq_show);
inf->alloc = alloc;
inf->wri = wri;
atomic64_set(&inf->write_version, max_vers); /* inc_return gives +1 */
@@ -850,14 +824,6 @@ int scoutfs_lock_server_setup(struct super_block *sb,
return -ENOMEM;
}
inf->stats_tseq_dentry = scoutfs_tseq_create("tmp_lock_stats", sbi->debug_root,
&inf->stats_tseq_tree);
if (!inf->stats_tseq_dentry) {
debugfs_remove(inf->tseq_dentry);
kfree(inf);
return -ENOMEM;
}
sbi->lock_server_info = inf;
return 0;
@@ -879,7 +845,6 @@ void scoutfs_lock_server_destroy(struct super_block *sb)
if (inf) {
debugfs_remove(inf->tseq_dentry);
debugfs_remove(inf->stats_tseq_dentry);
rbtree_postorder_for_each_entry_safe(snode, stmp,
&inf->locks_root, node) {

View File

@@ -0,0 +1,4 @@
== create per mount files
== time independent modification
== time concurrent independent modification
== time concurrent conflicting modification

View File

@@ -22,8 +22,8 @@ stage-multi-part.sh
stage-tmpfile.sh
basic-posix-consistency.sh
dirent-consistency.sh
mkdir-rename-rmdir.sh
lock-ex-race-processes.sh
lock-conflicting-batch-commit.sh
cross-mount-data-free.sh
persistent-item-vers.sh
setup-error-teardown.sh

View File

@@ -0,0 +1,59 @@
#
# If bulk work accidentally conflicts in the worst way we'd like to have
# it not result in catastrophic performance. Make sure that each
# instance of bulk work is given the opportunity to get as much as it
# can into the transaction under a lock before the lock is revoked
# and the transaction is committed.
#
t_require_commands setfattr
t_require_mounts 2
NR=3000
echo "== create per mount files"
for m in 0 1; do
eval dir="\$T_D${m}/dir/$m"
t_quiet mkdir -p "$dir"
for a in $(seq 1 $NR); do touch "$dir/$a"; done
done
echo "== time independent modification"
for m in 0 1; do
eval dir="\$T_D${m}/dir/$m"
START=$SECONDS
for a in $(seq 1 $NR); do
setfattr -n user.test_grace -v $a "$dir/$a"
done
echo "mount $m: $((SECONDS - START))" >> $T_TMP.log
done
echo "== time concurrent independent modification"
START=$SECONDS
for m in 0 1; do
eval dir="\$T_D${m}/dir/$m"
(for a in $(seq 1 $NR); do
setfattr -n user.test_grace -v $a "$dir/$a";
done) &
done
wait
IND="$((SECONDS - START))"
echo "ind: $IND" >> $T_TMP.log
echo "== time concurrent conflicting modification"
START=$SECONDS
for m in 0 1; do
eval dir="\$T_D${m}/dir/0"
(for a in $(seq 1 $NR); do
setfattr -n user.test_grace -v $a "$dir/$a";
done) &
done
wait
CONF="$((SECONDS - START))"
echo "conf: $CONF" >> $T_TMP.log
if [ "$CONF" -gt "$((IND * 5))" ]; then
t_fail "conflicting $CONF secs is more than 5x independent $IND secs"
fi
t_pass

View File

@@ -1,59 +0,0 @@
#
# Sequentially perform operations on a dir (mkdir; rename*2; rmdir) on
# all possible combinations of different mounts that could perform the
# operations.
#
# We're testing that the tracking of the entry key in our cached dirents
# stays consitent with the persistent entry items as they're modified
# around the cluster.
#
t_require_commands mkdir mv rmdir
NR_OPS=4
unset op_mnt
for op in $(seq 0 $NR_OPS); do
op_mnt[$op]=0
done
if [ $T_NR_MOUNTS -gt $NR_OPS ]; then
NR_MNTS=$NR_OPS
else
NR_MNTS=$T_NR_MOUNTS
fi
# test until final op mount dir wraps
while [ ${op_mnt[$NR_OPS]} == 0 ]; do
# sequentially perform each op from its mount dir
for op in $(seq 0 $((NR_OPS - 1))); do
m=${op_mnt[$op]}
eval dir="\$T_D${m}/dir"
case "$op" in
0) mkdir "$dir" ;;
1) mv "$dir" "$dir-1" ;;
2) mv "$dir-1" "$dir-2" ;;
3) rmdir "$dir-2" ;;
esac
if [ $? != 0 ]; then
t_fail "${op_mnt[*]} failed at op $op"
fi
done
# advance through mnt nrs for each op
i=0
while [ ${op_mnt[$NR_OPS]} == 0 ]; do
((op_mnt[$i]++))
if [ ${op_mnt[$i]} -ge $NR_MNTS ]; then
op_mnt[$i]=0
((i++))
else
break
fi
done
done
t_pass

View File

@@ -197,7 +197,7 @@ static int do_mkfs(struct mkfs_args *args)
memset(super, 0, SCOUTFS_BLOCK_SM_SIZE);
super->version = cpu_to_le64(SCOUTFS_INTEROP_VERSION);
uuid_generate(super->uuid);
super->next_ino = cpu_to_le64(round_up(SCOUTFS_ROOT_INO + 1, SCOUTFS_LOCK_INODE_GROUP_NR));
super->next_ino = cpu_to_le64(SCOUTFS_ROOT_INO + 1);
super->next_trans_seq = cpu_to_le64(1);
super->total_meta_blocks = cpu_to_le64(last_meta + 1);
super->first_meta_blkno = cpu_to_le64(next_meta);