Harden client transaction processing

There are a few bad corner cases in the state machine that governs how
client transactions are opened, modified, and committed.

The worst problem is on the server side.   All server request handlers
need to cope with resent requests without causing bad side effects.
Both get_log_trees and commit_log_trees would try to fully processes
resent requests.  _get_log_trees() looks safe because it works with the
log_trees that was stored previously.  _commit_log_trees() is not safe
because it can rotate out the srch log file referenced by the sent
log_trees every time it's processed.  This could create extra srch
entries which would delete the first instance of entries.  Worse still,
by injecting the same block structure into the system multiple times it
ends up causing multiple frees of the blocks that make up the srch file.

The client side problems are slightly different, but related.   There
aren't strong constraints which guarantee that we'll only send a commit
request after a get request succeeds.   In crazy circumstances the
commit request in the write worker could come before the first get in
mount succeeds.   Far worse is that we can send multiple commit requests
for one transaction if it changes as we get errors during multiple
queued write attempts, particularly if we get errors from get_log_trees
after having successfully committed.

This hardens all these paths to ensure a strict sequence of
get_log_trees, transaction modification, and commit_log_trees.

On the server we add *_trans_seq fields to the log_trees struct so that
both get_ and commit_ can see that they've already prepared a commit to
send or have already committed the incoming commit, respectively.   We
can use the get_trans_seq field as the trans_seq of the open transaction
and get rid of the entire seperate mechanism we used to have for
tracking open trans seqs in the clients.  We can get the same info by
walking the log_trees and looking at their *_trans_seq fields.

In the client we have the write worker immediately return success if
mount hasn't opened the first transaction.   Then we don't have the
worker return to allow further modification until it has gotten success
from get_log_trees.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2021-10-19 14:24:51 -07:00
parent 42c4c6dd24
commit 80ee2c6d57
8 changed files with 155 additions and 276 deletions

View File

@@ -117,21 +117,6 @@ int scoutfs_client_get_roots(struct super_block *sb,
NULL, 0, roots, sizeof(*roots));
}
int scoutfs_client_advance_seq(struct super_block *sb, u64 *seq)
{
struct client_info *client = SCOUTFS_SB(sb)->client_info;
__le64 leseq;
int ret;
ret = scoutfs_net_sync_request(sb, client->conn,
SCOUTFS_NET_CMD_ADVANCE_SEQ,
NULL, 0, &leseq, sizeof(leseq));
if (ret == 0)
*seq = le64_to_cpu(leseq);
return ret;
}
int scoutfs_client_get_last_seq(struct super_block *sb, u64 *seq)
{
struct client_info *client = SCOUTFS_SB(sb)->client_info;

View File

@@ -10,7 +10,6 @@ int scoutfs_client_commit_log_trees(struct super_block *sb,
int scoutfs_client_get_roots(struct super_block *sb,
struct scoutfs_net_roots *roots);
u64 *scoutfs_client_bulk_alloc(struct super_block *sb);
int scoutfs_client_advance_seq(struct super_block *sb, u64 *seq);
int scoutfs_client_get_last_seq(struct super_block *sb, u64 *seq);
int scoutfs_client_lock_request(struct super_block *sb,
struct scoutfs_net_lock *nl);

View File

@@ -207,10 +207,6 @@ struct scoutfs_key {
#define sklt_rid _sk_first
#define sklt_nr _sk_second
/* seqs */
#define skts_trans_seq _sk_first
#define skts_rid _sk_second
/* mounted clients */
#define skmc_rid _sk_first
@@ -461,6 +457,12 @@ struct scoutfs_srch_compact {
* XXX I imagine we should rename these now that they've evolved to track
* all the btrees that clients use during a transaction. It's not just
* about item logs, it's about clients making changes to trees.
*
* @get_trans_seq, @commit_trans_seq: These pair of sequence numbers
* determine if a transaction is currently open for the mount that owns
* the log_trees struct. get_trans_seq is advanced by the server as the
* transaction is opened. The server sets comimt_trans_seq equal to
* get_ as the transaction is committed.
*/
struct scoutfs_log_trees {
struct scoutfs_alloc_list_head meta_avail;
@@ -473,6 +475,8 @@ struct scoutfs_log_trees {
__le64 data_alloc_zone_blocks;
__le64 data_alloc_zones[SCOUTFS_DATA_ALLOC_ZONE_LE64S];
__le64 inode_count_delta;
__le64 get_trans_seq;
__le64 commit_trans_seq;
__le64 max_item_seq;
__le64 finalize_seq;
__le64 rid;
@@ -586,17 +590,16 @@ struct scoutfs_log_merge_freeing {
#define SCOUTFS_LOCK_ZONE 5
/* Items only stored in server btrees */
#define SCOUTFS_LOG_TREES_ZONE 6
#define SCOUTFS_TRANS_SEQ_ZONE 7
#define SCOUTFS_MOUNTED_CLIENT_ZONE 8
#define SCOUTFS_SRCH_ZONE 9
#define SCOUTFS_FREE_EXTENT_BLKNO_ZONE 10
#define SCOUTFS_FREE_EXTENT_ORDER_ZONE 11
#define SCOUTFS_MOUNTED_CLIENT_ZONE 7
#define SCOUTFS_SRCH_ZONE 8
#define SCOUTFS_FREE_EXTENT_BLKNO_ZONE 9
#define SCOUTFS_FREE_EXTENT_ORDER_ZONE 10
/* Items only stored in log merge server btrees */
#define SCOUTFS_LOG_MERGE_STATUS_ZONE 12
#define SCOUTFS_LOG_MERGE_RANGE_ZONE 13
#define SCOUTFS_LOG_MERGE_REQUEST_ZONE 14
#define SCOUTFS_LOG_MERGE_COMPLETE_ZONE 15
#define SCOUTFS_LOG_MERGE_FREEING_ZONE 16
#define SCOUTFS_LOG_MERGE_STATUS_ZONE 11
#define SCOUTFS_LOG_MERGE_RANGE_ZONE 12
#define SCOUTFS_LOG_MERGE_REQUEST_ZONE 13
#define SCOUTFS_LOG_MERGE_COMPLETE_ZONE 14
#define SCOUTFS_LOG_MERGE_FREEING_ZONE 15
/* inode index zone */
#define SCOUTFS_INODE_INDEX_META_SEQ_TYPE 1
@@ -807,7 +810,6 @@ struct scoutfs_super_block {
struct scoutfs_btree_root fs_root;
struct scoutfs_btree_root logs_root;
struct scoutfs_btree_root log_merge;
struct scoutfs_btree_root trans_seqs;
struct scoutfs_btree_root mounted_clients;
struct scoutfs_btree_root srch_root;
struct scoutfs_volume_options volopt;
@@ -990,7 +992,6 @@ enum scoutfs_net_cmd {
SCOUTFS_NET_CMD_COMMIT_LOG_TREES,
SCOUTFS_NET_CMD_SYNC_LOG_TREES,
SCOUTFS_NET_CMD_GET_ROOTS,
SCOUTFS_NET_CMD_ADVANCE_SEQ,
SCOUTFS_NET_CMD_GET_LAST_SEQ,
SCOUTFS_NET_CMD_LOCK,
SCOUTFS_NET_CMD_LOCK_RECOVER,

View File

@@ -1980,48 +1980,6 @@ DEFINE_EVENT(scoutfs_clock_sync_class, scoutfs_recv_clock_sync,
TP_ARGS(clock_sync_id)
);
TRACE_EVENT(scoutfs_trans_seq_advance,
TP_PROTO(struct super_block *sb, u64 rid, u64 trans_seq),
TP_ARGS(sb, rid, trans_seq),
TP_STRUCT__entry(
SCSB_TRACE_FIELDS
__field(__u64, s_rid)
__field(__u64, trans_seq)
),
TP_fast_assign(
SCSB_TRACE_ASSIGN(sb);
__entry->s_rid = rid;
__entry->trans_seq = trans_seq;
),
TP_printk(SCSBF" rid %016llx trans_seq %llu\n",
SCSB_TRACE_ARGS, __entry->s_rid, __entry->trans_seq)
);
TRACE_EVENT(scoutfs_trans_seq_remove,
TP_PROTO(struct super_block *sb, u64 rid, u64 trans_seq),
TP_ARGS(sb, rid, trans_seq),
TP_STRUCT__entry(
SCSB_TRACE_FIELDS
__field(__u64, s_rid)
__field(__u64, trans_seq)
),
TP_fast_assign(
SCSB_TRACE_ASSIGN(sb);
__entry->s_rid = rid;
__entry->trans_seq = trans_seq;
),
TP_printk(SCSBF" rid %016llx trans_seq %llu",
SCSB_TRACE_ARGS, __entry->s_rid, __entry->trans_seq)
);
TRACE_EVENT(scoutfs_trans_seq_last,
TP_PROTO(struct super_block *sb, u64 rid, u64 trans_seq),

View File

@@ -73,9 +73,6 @@ struct server_info {
struct llist_head commit_waiters;
struct work_struct commit_work;
/* server tracks seq use */
struct rw_semaphore seq_rwsem;
struct list_head clients;
unsigned long nr_clients;
@@ -1023,6 +1020,16 @@ static int finalize_and_start_log_merge(struct super_block *sb, struct scoutfs_l
*
* If the committed log trees are large enough we finalize them and make
* them available to log merging.
*
* As we prepare a new transaction we get its get_trans_seq to indicate
* that it's open. The client uses this to identify its open
* transaction and we watch all the log trees to track the sequence
* numbers of transactions that clients have open. This limits the
* transaction sequence numbers that can be returned in the index of
* inodes by meta and data transaction numbers. We communicate the
* largest possible sequence number to clients via an rpc. The
* transactions are closed by setting the commit_trans_seq during commit
* or as the mount is cleaned up.
*/
static int server_get_log_trees(struct super_block *sb,
struct scoutfs_net_connection *conn,
@@ -1071,6 +1078,19 @@ static int server_get_log_trees(struct super_block *sb,
lt.nr = cpu_to_le64(nr);
}
/* the commit_trans_seq can never go past the open_trans_seq */
if (le64_to_cpu(lt.get_trans_seq) < le64_to_cpu(lt.commit_trans_seq)) {
err_str = "invalid open_trans_seq and commit_trans_seq";
ret = -EINVAL;
goto unlock;
}
/* transaction's already open, client resent get_ after server failover */
if (le64_to_cpu(lt.get_trans_seq) > le64_to_cpu(lt.commit_trans_seq)) {
ret = 0;
goto unlock;
}
/* drops and re-acquires the mutex and commit if it has to wait */
ret = finalize_and_start_log_merge(sb, &lt, rid);
if (ret < 0)
@@ -1151,6 +1171,9 @@ static int server_get_log_trees(struct super_block *sb,
lt.data_alloc_zone_blocks = cpu_to_le64(data_zone_blocks);
}
/* give the transaction a new seq (must have been ==) */
lt.get_trans_seq = cpu_to_le64(scoutfs_server_next_seq(sb));
/* update client's log tree's item */
scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid),
le64_to_cpu(lt.nr));
@@ -1187,9 +1210,11 @@ static int server_commit_log_trees(struct super_block *sb,
const u64 rid = scoutfs_net_client_rid(conn);
DECLARE_SERVER_INFO(sb, server);
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_log_trees *exist;
struct scoutfs_log_trees lt;
struct scoutfs_key key;
char *err_str = NULL;
bool committed = false;
int ret;
if (arg_len != sizeof(struct scoutfs_log_trees)) {
@@ -1214,12 +1239,26 @@ static int server_commit_log_trees(struct super_block *sb,
scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid),
le64_to_cpu(lt.nr));
ret = scoutfs_btree_lookup(sb, &super->logs_root, &key, &iref);
if (ret < 0) {
if (ret < 0)
err_str = "finding log trees item";
goto unlock;
}
if (ret == 0)
if (ret == 0) {
if (iref.val_len == sizeof(struct scoutfs_log_trees)) {
exist = iref.val;
if (exist->get_trans_seq != lt.get_trans_seq) {
ret = -EIO;
err_str = "invalid log trees item get_trans_seq";
} else {
if (exist->commit_trans_seq == lt.get_trans_seq)
committed = true;
}
} else {
ret = -EIO;
err_str = "invalid log trees item size";
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0 || committed)
goto unlock;
/* try to rotate the srch log when big enough */
mutex_lock(&server->srch_mutex);
@@ -1231,6 +1270,8 @@ static int server_commit_log_trees(struct super_block *sb,
goto unlock;
}
lt.commit_trans_seq = lt.get_trans_seq;
ret = scoutfs_btree_update(sb, &server->alloc, &server->wri,
&super->logs_root, &key, &lt, sizeof(lt));
if (ret < 0)
@@ -1357,6 +1398,9 @@ static int reclaim_open_log_tree(struct super_block *sb, u64 rid)
alloc_move_empty(sb, &super->data_alloc, &lt.data_freed));
mutex_unlock(&server->alloc_mutex);
/* the transaction is no longer open */
lt.commit_trans_seq = lt.get_trans_seq;
/* the mount is no longer writing to the zones */
zero_data_alloc_zone_bits(&lt);
le64_add_cpu(&lt.flags, SCOUTFS_LOG_TREES_FINALIZED);
@@ -1376,140 +1420,6 @@ out:
return ret;
}
static void init_trans_seq_key(struct scoutfs_key *key, u64 seq, u64 rid)
{
*key = (struct scoutfs_key) {
.sk_zone = SCOUTFS_TRANS_SEQ_ZONE,
.skts_trans_seq = cpu_to_le64(seq),
.skts_rid = cpu_to_le64(rid),
};
}
/*
* Remove all trans_seq items owned by the client rid, the caller holds
* the seq_rwsem.
*/
static int remove_trans_seq_locked(struct super_block *sb, u64 rid)
{
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_key key;
int ret = 0;
init_trans_seq_key(&key, 0, 0);
for (;;) {
ret = scoutfs_btree_next(sb, &super->trans_seqs, &key, &iref);
if (ret < 0) {
if (ret == -ENOENT)
ret = 0;
break;
}
key = *iref.key;
scoutfs_btree_put_iref(&iref);
if (le64_to_cpu(key.skts_rid) == rid) {
trace_scoutfs_trans_seq_remove(sb, rid,
le64_to_cpu(key.skts_trans_seq));
ret = scoutfs_btree_delete(sb, &server->alloc,
&server->wri,
&super->trans_seqs, &key);
if (ret < 0)
break;
}
scoutfs_key_inc(&key);
}
return ret;
}
/*
* Give the client the next sequence number for the transaction that
* they're opening.
*
* We track the sequence numbers of transactions that clients have open.
* This limits the transaction sequence numbers that can be returned in
* the index of inodes by meta and data transaction numbers. We
* communicate the largest possible sequence number to clients via an
* rpc.
*
* The transaction sequence tracking is stored in a btree so it is
* shared across servers. Final entries are removed when processing a
* client's farewell or when it's removed. We can be processent a
* resent request that was committed by a previous server before the
* reply was lost. At this point the client has no transactions open
* and may or may not have just finished one. To keep it simple we
* always remove any previous seq items, if there are any, and then
* insert a new item for the client at the next greatest seq.
*/
static int server_advance_seq(struct super_block *sb,
struct scoutfs_net_connection *conn,
u8 cmd, u64 id, void *arg, u16 arg_len)
{
DECLARE_SERVER_INFO(sb, server);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
u64 rid = scoutfs_net_client_rid(conn);
struct scoutfs_key key;
__le64 leseq = 0;
u64 seq;
int ret;
if (arg_len != 0) {
ret = -EINVAL;
goto out;
}
scoutfs_server_hold_commit(sb);
down_write(&server->seq_rwsem);
ret = remove_trans_seq_locked(sb, rid);
if (ret < 0)
goto unlock;
seq = scoutfs_server_next_seq(sb);
trace_scoutfs_trans_seq_advance(sb, rid, seq);
init_trans_seq_key(&key, seq, rid);
ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri,
&super->trans_seqs, &key, NULL, 0);
if (ret == 0)
leseq = cpu_to_le64(seq);
unlock:
up_write(&server->seq_rwsem);
ret = scoutfs_server_apply_commit(sb, ret);
out:
return scoutfs_net_response(sb, conn, cmd, id, ret,
&leseq, sizeof(leseq));
}
/*
* Remove any transaction sequences owned by the client who's sent a
* farewell They must have committed any final transaction by the time
* they get here via sending their farewell message. This can be called
* multiple times as the client's farewell is retransmitted so it's OK
* to not find any entries. This is called with the server commit rwsem
* held.
*/
static int remove_trans_seq(struct super_block *sb, u64 rid)
{
DECLARE_SERVER_INFO(sb, server);
int ret = 0;
down_write(&server->seq_rwsem);
ret = remove_trans_seq_locked(sb, rid);
up_write(&server->seq_rwsem);
return ret;
}
/*
* Give the caller the last seq before outstanding client commits. All
* seqs up to and including this are stable, new client transactions can
@@ -1521,27 +1431,41 @@ static int get_stable_trans_seq(struct super_block *sb, u64 *last_seq_ret)
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_log_trees *lt;
struct scoutfs_key key;
u64 last_seq = 0;
int ret;
down_read(&server->seq_rwsem);
last_seq = scoutfs_server_seq(sb) - 1;
scoutfs_key_init_log_trees(&key, 0, 0);
init_trans_seq_key(&key, 0, 0);
ret = scoutfs_btree_next(sb, &super->trans_seqs, &key, &iref);
if (ret == 0) {
last_seq = le64_to_cpu(iref.key->skts_trans_seq) - 1;
scoutfs_btree_put_iref(&iref);
mutex_lock(&server->logs_mutex);
} else if (ret == -ENOENT) {
last_seq = scoutfs_server_seq(sb) - 1;
ret = 0;
for (;; scoutfs_key_inc(&key)) {
ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref);
if (ret == 0) {
if (iref.val_len == sizeof(*lt)) {
lt = iref.val;
if ((le64_to_cpu(lt->get_trans_seq) >
le64_to_cpu(lt->commit_trans_seq)) &&
le64_to_cpu(lt->get_trans_seq) <= last_seq) {
last_seq = le64_to_cpu(lt->get_trans_seq) - 1;
}
key = *iref.key;
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) {
if (ret == -ENOENT) {
ret = 0;
break;
}
}
}
up_read(&server->seq_rwsem);
if (ret < 0)
last_seq = 0;
mutex_unlock(&server->logs_mutex);
*last_seq_ret = last_seq;
return ret;
@@ -3374,7 +3298,6 @@ static int reclaim_rid(struct super_block *sb, u64 rid)
/* delete mounted client last, recovery looks for it */
ret = scoutfs_lock_server_farewell(sb, rid) ?:
remove_trans_seq(sb, rid) ?:
reclaim_open_log_tree(sb, rid) ?:
cancel_srch_compact(sb, rid) ?:
cancel_log_merge(sb, rid) ?:
@@ -3608,7 +3531,6 @@ static scoutfs_net_request_t server_req_funcs[] = {
[SCOUTFS_NET_CMD_GET_LOG_TREES] = server_get_log_trees,
[SCOUTFS_NET_CMD_COMMIT_LOG_TREES] = server_commit_log_trees,
[SCOUTFS_NET_CMD_GET_ROOTS] = server_get_roots,
[SCOUTFS_NET_CMD_ADVANCE_SEQ] = server_advance_seq,
[SCOUTFS_NET_CMD_GET_LAST_SEQ] = server_get_last_seq,
[SCOUTFS_NET_CMD_LOCK] = server_lock,
[SCOUTFS_NET_CMD_SRCH_GET_COMPACT] = server_srch_get_compact,
@@ -4088,7 +4010,6 @@ int scoutfs_server_setup(struct super_block *sb)
init_rwsem(&server->commit_rwsem);
init_llist_head(&server->commit_waiters);
INIT_WORK(&server->commit_work, scoutfs_server_commit_func);
init_rwsem(&server->seq_rwsem);
INIT_LIST_HEAD(&server->clients);
spin_lock_init(&server->farewell_lock);
INIT_LIST_HEAD(&server->farewell_requests);

View File

@@ -646,8 +646,7 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
}
/* send requests once iget progress shows we had a server */
ret = scoutfs_trans_get_log_trees(sb) ?:
scoutfs_client_advance_seq(sb, &sbi->trans_seq);
ret = scoutfs_trans_get_log_trees(sb);
if (ret)
goto out;

View File

@@ -17,6 +17,7 @@
#include <linux/atomic.h>
#include <linux/writeback.h>
#include <linux/slab.h>
#include <linux/delay.h>
#include "super.h"
#include "trans.h"
@@ -100,6 +101,7 @@ static int commit_btrees(struct super_block *sb)
*/
int scoutfs_trans_get_log_trees(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_TRANS_INFO(sb, tri);
struct scoutfs_log_trees lt;
int ret = 0;
@@ -112,6 +114,11 @@ int scoutfs_trans_get_log_trees(struct super_block *sb)
scoutfs_forest_init_btrees(sb, &tri->alloc, &tri->wri, &lt);
scoutfs_data_init_btrees(sb, &tri->alloc, &tri->wri, &lt);
/* first set during mount from 0 to nonzero allows commits */
spin_lock(&tri->write_lock);
sbi->trans_seq = le64_to_cpu(lt.get_trans_seq);
spin_unlock(&tri->write_lock);
}
return ret;
}
@@ -162,26 +169,22 @@ static bool drained_holders(struct trans_info *tri)
* functions that would try to hold the transaction. We record the task
* whose committing the transaction so that holding won't deadlock.
*
* Any dirty block had to have allocated a new blkno which would have
* created dirty allocator metadata blocks. We can avoid writing
* entirely if we don't have any dirty metadata blocks. This is
* important because we don't try to serialize this work during
* unmount.. we can execute as the vfs is shutting down.. we need to
* decide that nothing is dirty without calling the vfs at all.
* Once we clear the write func bit in holders then waiting holders can
* enter the transaction and continue modifying the transaction. Once
* we start writing we consider the transaction done and won't exit,
* clearing the write func bit, until get_log_trees has opened the next
* transaction. The exception is forced unmount which is allowed to
* generate errors and throw away data.
*
* We first try to sync the dirty inodes and write their dirty data blocks,
* then we write all our dirty metadata blocks, and only when those succeed
* do we write the new super that references all of these newly written blocks.
*
* If there are write errors then blocks are kept dirty in memory and will
* be written again at the next sync.
* This means that the only way fsync can return an error is if we're in
* forced unmount.
*/
void scoutfs_trans_write_func(struct work_struct *work)
{
struct trans_info *tri = container_of(work, struct trans_info, write_work.work);
struct super_block *sb = tri->sb;
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
u64 trans_seq = sbi->trans_seq;
bool retrying = false;
char *s = NULL;
int ret = 0;
@@ -192,6 +195,12 @@ void scoutfs_trans_write_func(struct work_struct *work)
wait_event(tri->hold_wq, drained_holders(tri));
/* mount hasn't opened first transaction yet, still complete sync */
if (sbi->trans_seq == 0) {
ret = 0;
goto out;
}
if (scoutfs_forcing_unmount(sb)) {
ret = -EIO;
goto out;
@@ -205,25 +214,41 @@ void scoutfs_trans_write_func(struct work_struct *work)
scoutfs_inc_counter(sb, trans_commit_written);
/* XXX this all needs serious work for dealing with errors */
ret = (s = "data submit", scoutfs_inode_walk_writeback(sb, true)) ?:
(s = "item dirty", scoutfs_item_write_dirty(sb)) ?:
(s = "data prepare", scoutfs_data_prepare_commit(sb)) ?:
(s = "alloc prepare", scoutfs_alloc_prepare_commit(sb, &tri->alloc, &tri->wri)) ?:
(s = "meta write", scoutfs_block_writer_write(sb, &tri->wri)) ?:
(s = "data wait", scoutfs_inode_walk_writeback(sb, false)) ?:
(s = "commit log trees", commit_btrees(sb)) ?: scoutfs_item_write_done(sb) ?:
(s = "get log trees", scoutfs_trans_get_log_trees(sb)) ?:
(s = "advance seq", scoutfs_client_advance_seq(sb, &trans_seq));
if (ret < 0)
scoutfs_err(sb, "critical transaction commit failure: %s, %d",
s, ret);
do {
ret = (s = "data submit", scoutfs_inode_walk_writeback(sb, true)) ?:
(s = "item dirty", scoutfs_item_write_dirty(sb)) ?:
(s = "data prepare", scoutfs_data_prepare_commit(sb)) ?:
(s = "alloc prepare", scoutfs_alloc_prepare_commit(sb, &tri->alloc,
&tri->wri)) ?:
(s = "meta write", scoutfs_block_writer_write(sb, &tri->wri)) ?:
(s = "data wait", scoutfs_inode_walk_writeback(sb, false)) ?:
(s = "commit log trees", commit_btrees(sb)) ?:
scoutfs_item_write_done(sb) ?:
(s = "get log trees", scoutfs_trans_get_log_trees(sb));
if (ret < 0) {
if (!retrying) {
scoutfs_warn(sb, "critical transaction commit failure: %s = %d, retrying",
s, ret);
retrying = true;
}
if (scoutfs_forcing_unmount(sb)) {
ret = -EIO;
break;
}
msleep(2 * MSEC_PER_SEC);
} else if (retrying) {
scoutfs_info(sb, "retried transaction commit succeeded");
}
} while (ret < 0);
out:
spin_lock(&tri->write_lock);
tri->write_count++;
tri->write_ret = ret;
sbi->trans_seq = trans_seq;
spin_unlock(&tri->write_lock);
wake_up(&tri->write_wq);
@@ -464,6 +489,7 @@ static bool holders_no_writer(struct trans_info *tri)
*/
int scoutfs_hold_trans(struct super_block *sb, bool allocing)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_TRANS_INFO(sb, tri);
u64 seq;
int ret;
@@ -472,6 +498,12 @@ int scoutfs_hold_trans(struct super_block *sb, bool allocing)
return 0;
for (;;) {
/* shouldn't get holders until mount finishes, (not locking for cheap test) */
if (WARN_ON_ONCE(sbi->trans_seq == 0)) {
ret = -EINVAL;
break;
}
/* if a caller already has a hold we acquire unconditionally */
if (inc_journal_info_holders()) {
atomic_inc(&tri->holders);

View File

@@ -353,15 +353,6 @@ static int print_srch_root_item(struct scoutfs_key *key, u64 seq, u8 flags, void
return 0;
}
static int print_trans_seqs_entry(struct scoutfs_key *key, u64 seq, u8 flags, void *val,
unsigned val_len, void *arg)
{
printf(" trans_seq %llu rid %016llx\n",
le64_to_cpu(key->skts_trans_seq), le64_to_cpu(key->skts_rid));
return 0;
}
static int print_mounted_client_entry(struct scoutfs_key *key, u64 seq, u8 flags, void *val,
unsigned val_len, void *arg)
{
@@ -954,7 +945,6 @@ static void print_super_block(struct scoutfs_super_block *super, u64 blkno)
" fs_root: "BTR_FMT"\n"
" logs_root: "BTR_FMT"\n"
" log_merge: "BTR_FMT"\n"
" trans_seqs: "BTR_FMT"\n"
" mounted_clients: "BTR_FMT"\n"
" srch_root: "BTR_FMT"\n",
le64_to_cpu(super->next_ino),
@@ -972,7 +962,6 @@ static void print_super_block(struct scoutfs_super_block *super, u64 blkno)
BTR_ARG(&super->fs_root),
BTR_ARG(&super->logs_root),
BTR_ARG(&super->log_merge),
BTR_ARG(&super->trans_seqs),
BTR_ARG(&super->mounted_clients),
BTR_ARG(&super->srch_root));
@@ -1024,11 +1013,6 @@ static int print_volume(int fd)
if (err && !ret)
ret = err;
err = print_btree(fd, super, "trans_seqs", &super->trans_seqs,
print_trans_seqs_entry, NULL);
if (err && !ret)
ret = err;
err = print_btree(fd, super, "log_merge", &super->log_merge,
print_log_merge_item, NULL);
if (err && !ret)