scoutfs: Use LRU for locks

The move to dlmglue necessitating removing the old lru code. That's
fine, it didn't work anyway.

We can't drop locks in the shrinker directly, so instead we have the
shrinker put them on a workqueue where they are dropped.

The rules for the LRU are simple. Locks get a users count. Any process
that holds the lock or is in the process of acquiring the lock
increments this count. When unlock is called, the count is decremented.
We can use the value of this count to manage the LRU - scoutfs_unlock
puts locks on the LRU when the count reaches zero, lock_name_keys()
always takes them off.

If the lock is selected for reclaim, callers wanting to use the lock
will need to wait. We acheive this with a pair of flags.
SCOUTFS_LOCK_RECLAIM is used to indicate the the lock is now queued for
reclaim. Once the is ready to be destroyed, we set SCOUTFS_LOCK_DROPPED
flag, telling callers to put the lock and retry their rbtree search.

Signed-off-by: Mark Fasheh <mfasheh@versity.com>
This commit is contained in:
Mark Fasheh
2017-09-06 18:13:54 -05:00
committed by Zach Brown
parent 4bb5cadaf0
commit 9461104f8e
3 changed files with 96 additions and 21 deletions

View File

@@ -50,11 +50,14 @@ struct lock_info {
struct shrinker shrinker;
struct list_head lru_list;
unsigned long long lru_nr;
struct workqueue_struct *lock_reclaim_wq;
};
#define DECLARE_LOCK_INFO(sb, name) \
struct lock_info *name = SCOUTFS_SB(sb)->lock_info
static void scoutfs_lock_reclaim(struct work_struct *work);
/*
* Invalidate caches on this because another node wants a lock
* with the a lock with the given mode and range. We always have to
@@ -104,7 +107,6 @@ static void free_scoutfs_lock(struct scoutfs_lock *lock)
if (lock) {
linfo = SCOUTFS_SB(lock->sb)->lock_info;
ocfs2_simple_drop_lockres(&linfo->dlmglue, &lock->lockres);
scoutfs_key_free(lock->sb, lock->start);
scoutfs_key_free(lock->sb, lock->end);
kfree(lock);
@@ -124,6 +126,8 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock)
rb_erase(&lock->node, &linfo->lock_tree);
list_del(&lock->lru_entry);
spin_unlock(&linfo->lock);
ocfs2_simple_drop_lockres(&linfo->dlmglue,
&lock->lockres);
free_scoutfs_lock(lock);
return;
}
@@ -131,6 +135,19 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock)
}
}
static void dec_lock_users(struct scoutfs_lock *lock)
{
DECLARE_LOCK_INFO(lock->sb, linfo);
spin_lock(&linfo->lock);
lock->users--;
if (list_empty(&lock->lru_entry) && lock->users == 0) {
list_add_tail(&lock->lru_entry, &linfo->lru_list);
linfo->lru_nr++;
}
spin_unlock(&linfo->lock);
}
static struct ocfs2_super *get_ino_lock_osb(struct ocfs2_lock_res *lockres)
{
struct scoutfs_lock *lock = lockres->l_priv;
@@ -209,6 +226,8 @@ static struct scoutfs_lock *alloc_scoutfs_lock(struct super_block *sb,
memcpy(&lock->lockres.l_name[0], &lock->lock_name,
sizeof(struct scoutfs_lock_name));
ocfs2_lock_res_init_common(&linfo->dlmglue, &lock->lockres, type, lock);
INIT_WORK(&lock->reclaim_work, scoutfs_lock_reclaim);
init_waitqueue_head(&lock->waitq);
return lock;
}
@@ -276,16 +295,47 @@ search:
rb_insert_color(&found->node, &linfo->lock_tree);
}
found->refcnt++;
if (test_bit(SCOUTFS_LOCK_RECLAIM, &found->flags)) {
spin_unlock(&linfo->lock);
wait_event(found->waitq,
test_bit(SCOUTFS_LOCK_DROPPED, &found->flags));
put_scoutfs_lock(sb, found);
goto search;
}
if (!list_empty(&found->lru_entry)) {
list_del_init(&found->lru_entry);
linfo->lru_nr--;
}
found->users++;
spin_unlock(&linfo->lock);
kfree(new);
return found;
}
static void scoutfs_lock_reclaim(struct work_struct *work)
{
struct scoutfs_lock *lock = container_of(work, struct scoutfs_lock,
reclaim_work);
struct lock_info *linfo = SCOUTFS_SB(lock->sb)->lock_info;
trace_scoutfs_lock_reclaim(lock->sb, lock);
/*
* Drop the last ref on our lock here, allowing us to clean up
* the dlm lock. We might race with another process in
* find_alloc_scoutfs_lock(), hence the dropped flag telling
* those processes to go ahead and drop the lock ref as well.
*/
BUG_ON(lock->users);
set_bit(SCOUTFS_LOCK_DROPPED, &lock->flags);
wake_up(&lock->waitq);
put_scoutfs_lock(linfo->sb, lock);
}
static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc)
{
struct lock_info *linfo = container_of(shrink, struct lock_info,
@@ -294,7 +344,6 @@ static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc)
struct scoutfs_lock *tmp;
unsigned long flags;
unsigned long nr;
LIST_HEAD(list);
nr = sc->nr_to_scan;
if (!nr)
@@ -305,20 +354,18 @@ static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc)
if (nr-- == 0)
break;
WARN_ON(lock->refcnt != 1);
trace_shrink_lock_tree(linfo->sb, lock);
rb_erase(&lock->node, &linfo->lock_tree);
list_del(&lock->lru_entry);
list_add_tail(&lock->lru_entry, &list);
WARN_ON(lock->users);
set_bit(SCOUTFS_LOCK_RECLAIM, &lock->flags);
list_del_init(&lock->lru_entry);
linfo->lru_nr--;
queue_work(linfo->lock_reclaim_wq, &lock->reclaim_work);
}
spin_unlock_irqrestore(&linfo->lock, flags);
list_for_each_entry_safe(lock, tmp, &list, lru_entry) {
trace_shrink_lock_tree(linfo->sb, lock);
list_del(&lock->lru_entry);
free_scoutfs_lock(lock);
}
out:
return min_t(unsigned long, linfo->lru_nr, INT_MAX);
}
@@ -374,10 +421,11 @@ static int lock_name_keys(struct super_block *sb, int mode, int flags,
ret = ocfs2_cluster_lock(&linfo->dlmglue, &lock->lockres, mode,
lkm_flags, 0);
if (ret)
return ret;
*ret_lock = lock;
if (ret) {
dec_lock_users(lock);
put_scoutfs_lock(sb, lock);
} else
*ret_lock = lock;
return 0;
}
@@ -627,6 +675,8 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock,
ocfs2_cluster_unlock(&linfo->dlmglue, &lock->lockres, level);
dec_lock_users(lock);
put_scoutfs_lock(sb, lock);
}
@@ -675,9 +725,15 @@ void scoutfs_lock_destroy(struct super_block *sb)
DECLARE_LOCK_INFO(sb, linfo);
if (linfo) {
free_lock_tree(sb); /* Do this before uninitializing the dlm. */
unregister_shrinker(&linfo->shrinker);
if (linfo->lock_reclaim_wq)
destroy_workqueue(linfo->lock_reclaim_wq);
/*
* Do this before uninitializing the dlm and after
* draining the reclaim workqueue.
*/
free_lock_tree(sb);
if (linfo->dlmglue_online) {
ocfs2_dlm_shutdown(&linfo->dlmglue, 0);
ocfs2_uninit_super(&linfo->dlmglue);
@@ -703,6 +759,13 @@ int scoutfs_lock_setup(struct super_block *sb)
return ret;
linfo = sbi->lock_info;
linfo->lock_reclaim_wq = alloc_workqueue("scoutfs_reclaim",
WQ_UNBOUND|WQ_HIGHPRI, 0);
if (!linfo->lock_reclaim_wq) {
ret = -ENOMEM;
goto out;
}
ret = ocfs2_dlm_init(&linfo->dlmglue, "null", sbi->opts.cluster_name,
linfo->ls_name, sbi->debug_root);
if (ret)

View File

@@ -8,6 +8,12 @@
#define SCOUTFS_LKF_REFRESH_INODE 0x01 /* update stale inode from item */
#define SCOUTFS_LKF_TRYLOCK 0x02 /* EAGAIN if contention */
/* flags for scoutfs_lock->flags */
enum {
SCOUTFS_LOCK_RECLAIM = 0, /* lock is queued for reclaim */
SCOUTFS_LOCK_DROPPED, /* lock is going away, drop reference */
};
struct scoutfs_lock {
struct super_block *sb;
struct scoutfs_lock_name lock_name;
@@ -16,9 +22,13 @@ struct scoutfs_lock {
struct dlm_lksb lksb;
unsigned int sequence; /* for debugging and sanity checks */
struct rb_node node;
struct list_head lru_entry;
unsigned int refcnt;
struct ocfs2_lock_res lockres;
struct list_head lru_entry;
struct work_struct reclaim_work;
unsigned int users; /* Tracks active users of this lock */
unsigned long flags;
wait_queue_head_t waitq;
};
u64 scoutfs_lock_refresh_gen(struct scoutfs_lock *lock);

View File

@@ -306,6 +306,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class,
__field(u64, name_second)
__field(unsigned int, seq)
__field(unsigned int, refcnt)
__field(unsigned int, users)
),
TP_fast_assign(
__entry->name_scope = lck->lock_name.scope;
@@ -315,11 +316,12 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class,
__entry->name_second = le64_to_cpu(lck->lock_name.second);
__entry->seq = lck->sequence;
__entry->refcnt = lck->refcnt;
__entry->users = lck->users;
),
TP_printk("name %u.%u.%u.%llu.%llu seq %u refs %d",
TP_printk("name %u.%u.%u.%llu.%llu seq %u refs %d users %d",
__entry->name_scope, __entry->name_zone, __entry->name_type,
__entry->name_first, __entry->name_second, __entry->seq,
__entry->refcnt)
__entry->refcnt, __entry->users)
);
DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_resource,
@@ -342,7 +344,7 @@ DEFINE_EVENT(scoutfs_lock_class, scoutfs_bast,
TP_ARGS(sb, lck)
);
DEFINE_EVENT(scoutfs_lock_class, scoutfs_downconvert_func,
DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_reclaim,
TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck),
TP_ARGS(sb, lck)
);