Use core seq for lock write_seq

Rename the write_version lock field to write_seq and get it from the
core seq in the super block.

We're doing this to create a relationship between a client transaction's
seq and a lock's write_seq.  New transactions will have a greater seq
than all previously granted write locks and new write locks will have a
greater seq than all open transactions.  This will be used to resolve
ambiguities in item merging as transaction seqs are written out of order
and write locks span transactions.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2020-12-08 14:59:27 -08:00
parent 05ae756b74
commit 3c69861c03
8 changed files with 27 additions and 38 deletions

View File

@@ -943,7 +943,7 @@ struct scoutfs_net_roots {
struct scoutfs_net_lock {
struct scoutfs_key key;
__le64 write_version;
__le64 write_seq;
__u8 old_mode;
__u8 new_mode;
__u8 __pad[6];

View File

@@ -1816,7 +1816,7 @@ int scoutfs_item_dirty(struct super_block *sb, struct scoutfs_key *key,
ret = -ENOENT;
} else {
mark_item_dirty(sb, cinf, pg, NULL, item);
item->liv.vers = cpu_to_le64(lock->write_version);
item->liv.vers = cpu_to_le64(lock->write_seq);
ret = 0;
}
@@ -1836,7 +1836,7 @@ static int item_create(struct super_block *sb, struct scoutfs_key *key,
{
DECLARE_ITEM_CACHE_INFO(sb, cinf);
struct scoutfs_log_item_value liv = {
.vers = cpu_to_le64(lock->write_version),
.vers = cpu_to_le64(lock->write_seq),
};
struct cached_item *found;
struct cached_item *item;
@@ -1911,7 +1911,7 @@ int scoutfs_item_update(struct super_block *sb, struct scoutfs_key *key,
{
DECLARE_ITEM_CACHE_INFO(sb, cinf);
struct scoutfs_log_item_value liv = {
.vers = cpu_to_le64(lock->write_version),
.vers = cpu_to_le64(lock->write_seq),
};
struct cached_item *item;
struct cached_item *found;
@@ -1978,7 +1978,7 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key,
{
DECLARE_ITEM_CACHE_INFO(sb, cinf);
struct scoutfs_log_item_value liv = {
.vers = cpu_to_le64(lock->write_version),
.vers = cpu_to_le64(lock->write_seq),
};
struct cached_item *item;
struct cached_page *pg;
@@ -2020,7 +2020,7 @@ static int item_delete(struct super_block *sb, struct scoutfs_key *key,
erase_item(pg, item);
} else {
/* must emit deletion to clobber old persistent item */
item->liv.vers = cpu_to_le64(lock->write_version);
item->liv.vers = cpu_to_le64(lock->write_seq);
item->liv.flags |= SCOUTFS_LOG_ITEM_FLAG_DELETION;
item->deletion = 1;
pg->erased_bytes += item->val_len;

View File

@@ -730,7 +730,7 @@ static void lock_grant_worker(struct work_struct *work)
lock->request_pending = 0;
lock->mode = nl->new_mode;
lock->write_version = le64_to_cpu(nl->write_version);
lock->write_seq = le64_to_cpu(nl->write_seq);
if (lock_count_match_exists(nl->new_mode, lock->waiters))
extend_grace(sb, lock);
@@ -988,7 +988,7 @@ int scoutfs_lock_recover_request(struct super_block *sb, u64 net_id,
for (i = 0; lock && i < SCOUTFS_NET_LOCK_MAX_RECOVER_NR; i++) {
nlr->locks[i].key = lock->start;
nlr->locks[i].write_version = cpu_to_le64(lock->write_version);
nlr->locks[i].write_seq = cpu_to_le64(lock->write_seq);
nlr->locks[i].old_mode = lock->mode;
nlr->locks[i].new_mode = lock->mode;

View File

@@ -13,7 +13,7 @@
struct scoutfs_omap_lock;
/*
* A few fields (start, end, refresh_gen, write_version, granted_mode)
* A few fields (start, end, refresh_gen, write_seq, granted_mode)
* are referenced by code outside lock.c.
*/
struct scoutfs_lock {
@@ -23,7 +23,7 @@ struct scoutfs_lock {
struct rb_node node;
struct rb_node range_node;
u64 refresh_gen;
u64 write_version;
u64 write_seq;
u64 dirty_trans_seq;
struct list_head lru_head;
wait_queue_head_t waitq;

View File

@@ -81,8 +81,6 @@ struct lock_server_info {
struct scoutfs_alloc *alloc;
struct scoutfs_block_writer *wri;
atomic64_t write_version;
};
#define DECLARE_LOCK_SERVER_INFO(sb, name) \
@@ -479,7 +477,7 @@ static int process_waiting_requests(struct super_block *sb,
struct client_lock_entry *req_tmp;
struct client_lock_entry *gr;
struct client_lock_entry *gr_tmp;
u64 wv;
u64 seq;
int ret;
BUG_ON(!mutex_is_locked(&snode->mutex));
@@ -532,8 +530,9 @@ static int process_waiting_requests(struct super_block *sb,
if (nl.new_mode == SCOUTFS_LOCK_WRITE ||
nl.new_mode == SCOUTFS_LOCK_WRITE_ONLY) {
wv = atomic64_inc_return(&inf->write_version);
nl.write_version = cpu_to_le64(wv);
/* doesn't commit seq update, recovered with locks */
seq = scoutfs_server_next_seq(sb);
nl.write_seq = cpu_to_le64(seq);
}
ret = scoutfs_server_lock_response(sb, req->rid,
@@ -609,14 +608,6 @@ int scoutfs_lock_server_finished_recovery(struct super_block *sb)
return ret;
}
static void set_max_write_version(struct lock_server_info *inf, u64 new)
{
u64 old;
while (new > (old = atomic64_read(&inf->write_version)) &&
(atomic64_cmpxchg(&inf->write_version, old, new) != old));
}
/*
* We sent a lock recover request to the client when we received its
* greeting while in recovery. Here we instantiate all the locks it
@@ -680,9 +671,9 @@ int scoutfs_lock_server_recover_response(struct super_block *sb, u64 rid,
put_server_lock(inf, snode);
/* make sure next write lock is greater than all recovered */
set_max_write_version(inf,
le64_to_cpu(nlr->locks[i].write_version));
/* make sure next core seq is greater than all lock write seq */
scoutfs_server_set_seq_if_greater(sb,
le64_to_cpu(nlr->locks[i].write_seq));
}
/* send request for next batch of keys */
@@ -800,7 +791,7 @@ static void lock_server_tseq_show(struct seq_file *m,
*/
int scoutfs_lock_server_setup(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri, u64 max_vers)
struct scoutfs_block_writer *wri)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct lock_server_info *inf;
@@ -815,7 +806,6 @@ int scoutfs_lock_server_setup(struct super_block *sb,
scoutfs_tseq_tree_init(&inf->tseq_tree, lock_server_tseq_show);
inf->alloc = alloc;
inf->wri = wri;
atomic64_set(&inf->write_version, max_vers); /* inc_return gives +1 */
inf->tseq_dentry = scoutfs_tseq_create("server_locks", sbi->debug_root,
&inf->tseq_tree);

View File

@@ -13,7 +13,7 @@ int scoutfs_lock_server_farewell(struct super_block *sb, u64 rid);
int scoutfs_lock_server_setup(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri, u64 max_vers);
struct scoutfs_block_writer *wri);
void scoutfs_lock_server_destroy(struct super_block *sb);
#endif

View File

@@ -137,11 +137,10 @@ struct omap_request {
/*
* In each inode group cluster lock we store data to track the open ino
* map which tracks all the inodes that the cluster lock covers. When
* the version shows that the map is stale we send a request to update
* it.
* the seq shows that the map is stale we send a request to update it.
*/
struct scoutfs_omap_lock_data {
u64 version;
u64 seq;
bool req_in_flight;
wait_queue_head_t waitq;
struct scoutfs_open_ino_map map;
@@ -833,8 +832,7 @@ static bool omap_req_in_flight(struct scoutfs_lock *lock, struct scoutfs_omap_lo
/*
* Make sure the map covered by the cluster lock is current. The caller
* holds the cluster lock so once we store lock_data on the cluster lock
* it won't be freed and the write_version in the cluster lock won't
* change.
* it won't be freed and the write_seq in the cluster lock won't change.
*
* The omap_spinlock protects the omap_data in the cluster lock. We
* have to drop it if we have to block to allocate lock_data, send a
@@ -861,7 +859,7 @@ static int get_current_lock_data(struct super_block *sb, struct scoutfs_lock *lo
}
if (lock->omap_data == NULL) {
ldata->version = lock->write_version - 1; /* ensure refresh */
ldata->seq = lock->write_seq - 1; /* ensure refresh */
init_waitqueue_head(&ldata->waitq);
lock->omap_data = ldata;
@@ -871,7 +869,7 @@ static int get_current_lock_data(struct super_block *sb, struct scoutfs_lock *lo
}
}
while (ldata->version != lock->write_version) {
while (ldata->seq != lock->write_seq) {
/* only one waiter sends a request at a time */
if (!ldata->req_in_flight) {
ldata->req_in_flight = true;
@@ -891,7 +889,7 @@ static int get_current_lock_data(struct super_block *sb, struct scoutfs_lock *lo
if (send_req) {
ldata->req_in_flight = false;
if (ret == 0)
ldata->version = lock->write_version;
ldata->seq = lock->write_seq;
wake_up(&ldata->waitq);
if (ret < 0)
goto out;

View File

@@ -2319,8 +2319,9 @@ static void scoutfs_server_worker(struct work_struct *work)
scoutfs_err(sb, "server couldn't find max item vers: %d", ret);
goto shutdown;
}
scoutfs_server_set_seq_if_greater(sb, max_vers);
ret = scoutfs_lock_server_setup(sb, &server->alloc, &server->wri, max_vers) ?:
ret = scoutfs_lock_server_setup(sb, &server->alloc, &server->wri) ?:
start_recovery(sb);
if (ret)
goto shutdown;