diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 4636b475..b560d3b3 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -50,11 +50,14 @@ struct lock_info { struct shrinker shrinker; struct list_head lru_list; unsigned long long lru_nr; + struct workqueue_struct *lock_reclaim_wq; }; #define DECLARE_LOCK_INFO(sb, name) \ struct lock_info *name = SCOUTFS_SB(sb)->lock_info +static void scoutfs_lock_reclaim(struct work_struct *work); + /* * Invalidate caches on this because another node wants a lock * with the a lock with the given mode and range. We always have to @@ -104,7 +107,6 @@ static void free_scoutfs_lock(struct scoutfs_lock *lock) if (lock) { linfo = SCOUTFS_SB(lock->sb)->lock_info; - ocfs2_simple_drop_lockres(&linfo->dlmglue, &lock->lockres); scoutfs_key_free(lock->sb, lock->start); scoutfs_key_free(lock->sb, lock->end); kfree(lock); @@ -124,6 +126,8 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock) rb_erase(&lock->node, &linfo->lock_tree); list_del(&lock->lru_entry); spin_unlock(&linfo->lock); + ocfs2_simple_drop_lockres(&linfo->dlmglue, + &lock->lockres); free_scoutfs_lock(lock); return; } @@ -131,6 +135,19 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock) } } +static void dec_lock_users(struct scoutfs_lock *lock) +{ + DECLARE_LOCK_INFO(lock->sb, linfo); + + spin_lock(&linfo->lock); + lock->users--; + if (list_empty(&lock->lru_entry) && lock->users == 0) { + list_add_tail(&lock->lru_entry, &linfo->lru_list); + linfo->lru_nr++; + } + spin_unlock(&linfo->lock); +} + static struct ocfs2_super *get_ino_lock_osb(struct ocfs2_lock_res *lockres) { struct scoutfs_lock *lock = lockres->l_priv; @@ -209,6 +226,8 @@ static struct scoutfs_lock *alloc_scoutfs_lock(struct super_block *sb, memcpy(&lock->lockres.l_name[0], &lock->lock_name, sizeof(struct scoutfs_lock_name)); ocfs2_lock_res_init_common(&linfo->dlmglue, &lock->lockres, type, lock); + INIT_WORK(&lock->reclaim_work, scoutfs_lock_reclaim); + init_waitqueue_head(&lock->waitq); return lock; } @@ -276,16 +295,47 @@ search: rb_insert_color(&found->node, &linfo->lock_tree); } found->refcnt++; + if (test_bit(SCOUTFS_LOCK_RECLAIM, &found->flags)) { + spin_unlock(&linfo->lock); + wait_event(found->waitq, + test_bit(SCOUTFS_LOCK_DROPPED, &found->flags)); + put_scoutfs_lock(sb, found); + goto search; + } + if (!list_empty(&found->lru_entry)) { list_del_init(&found->lru_entry); linfo->lru_nr--; } + found->users++; spin_unlock(&linfo->lock); kfree(new); return found; } +static void scoutfs_lock_reclaim(struct work_struct *work) +{ + struct scoutfs_lock *lock = container_of(work, struct scoutfs_lock, + reclaim_work); + struct lock_info *linfo = SCOUTFS_SB(lock->sb)->lock_info; + + trace_scoutfs_lock_reclaim(lock->sb, lock); + + /* + * Drop the last ref on our lock here, allowing us to clean up + * the dlm lock. We might race with another process in + * find_alloc_scoutfs_lock(), hence the dropped flag telling + * those processes to go ahead and drop the lock ref as well. + */ + BUG_ON(lock->users); + + set_bit(SCOUTFS_LOCK_DROPPED, &lock->flags); + wake_up(&lock->waitq); + + put_scoutfs_lock(linfo->sb, lock); +} + static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc) { struct lock_info *linfo = container_of(shrink, struct lock_info, @@ -294,7 +344,6 @@ static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc) struct scoutfs_lock *tmp; unsigned long flags; unsigned long nr; - LIST_HEAD(list); nr = sc->nr_to_scan; if (!nr) @@ -305,20 +354,18 @@ static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc) if (nr-- == 0) break; - WARN_ON(lock->refcnt != 1); + trace_shrink_lock_tree(linfo->sb, lock); - rb_erase(&lock->node, &linfo->lock_tree); - list_del(&lock->lru_entry); - list_add_tail(&lock->lru_entry, &list); + WARN_ON(lock->users); + + set_bit(SCOUTFS_LOCK_RECLAIM, &lock->flags); + list_del_init(&lock->lru_entry); linfo->lru_nr--; + + queue_work(linfo->lock_reclaim_wq, &lock->reclaim_work); } spin_unlock_irqrestore(&linfo->lock, flags); - list_for_each_entry_safe(lock, tmp, &list, lru_entry) { - trace_shrink_lock_tree(linfo->sb, lock); - list_del(&lock->lru_entry); - free_scoutfs_lock(lock); - } out: return min_t(unsigned long, linfo->lru_nr, INT_MAX); } @@ -374,10 +421,11 @@ static int lock_name_keys(struct super_block *sb, int mode, int flags, ret = ocfs2_cluster_lock(&linfo->dlmglue, &lock->lockres, mode, lkm_flags, 0); - if (ret) - return ret; - - *ret_lock = lock; + if (ret) { + dec_lock_users(lock); + put_scoutfs_lock(sb, lock); + } else + *ret_lock = lock; return 0; } @@ -627,6 +675,8 @@ void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock, ocfs2_cluster_unlock(&linfo->dlmglue, &lock->lockres, level); + dec_lock_users(lock); + put_scoutfs_lock(sb, lock); } @@ -675,9 +725,15 @@ void scoutfs_lock_destroy(struct super_block *sb) DECLARE_LOCK_INFO(sb, linfo); if (linfo) { - free_lock_tree(sb); /* Do this before uninitializing the dlm. */ - unregister_shrinker(&linfo->shrinker); + if (linfo->lock_reclaim_wq) + destroy_workqueue(linfo->lock_reclaim_wq); + /* + * Do this before uninitializing the dlm and after + * draining the reclaim workqueue. + */ + free_lock_tree(sb); + if (linfo->dlmglue_online) { ocfs2_dlm_shutdown(&linfo->dlmglue, 0); ocfs2_uninit_super(&linfo->dlmglue); @@ -703,6 +759,13 @@ int scoutfs_lock_setup(struct super_block *sb) return ret; linfo = sbi->lock_info; + linfo->lock_reclaim_wq = alloc_workqueue("scoutfs_reclaim", + WQ_UNBOUND|WQ_HIGHPRI, 0); + if (!linfo->lock_reclaim_wq) { + ret = -ENOMEM; + goto out; + } + ret = ocfs2_dlm_init(&linfo->dlmglue, "null", sbi->opts.cluster_name, linfo->ls_name, sbi->debug_root); if (ret) diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 6c5c1aee..65c0a4f2 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -8,6 +8,12 @@ #define SCOUTFS_LKF_REFRESH_INODE 0x01 /* update stale inode from item */ #define SCOUTFS_LKF_TRYLOCK 0x02 /* EAGAIN if contention */ +/* flags for scoutfs_lock->flags */ +enum { + SCOUTFS_LOCK_RECLAIM = 0, /* lock is queued for reclaim */ + SCOUTFS_LOCK_DROPPED, /* lock is going away, drop reference */ +}; + struct scoutfs_lock { struct super_block *sb; struct scoutfs_lock_name lock_name; @@ -16,9 +22,13 @@ struct scoutfs_lock { struct dlm_lksb lksb; unsigned int sequence; /* for debugging and sanity checks */ struct rb_node node; - struct list_head lru_entry; unsigned int refcnt; struct ocfs2_lock_res lockres; + struct list_head lru_entry; + struct work_struct reclaim_work; + unsigned int users; /* Tracks active users of this lock */ + unsigned long flags; + wait_queue_head_t waitq; }; u64 scoutfs_lock_refresh_gen(struct scoutfs_lock *lock); diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 2147ce2a..b30504c9 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -306,6 +306,7 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __field(u64, name_second) __field(unsigned int, seq) __field(unsigned int, refcnt) + __field(unsigned int, users) ), TP_fast_assign( __entry->name_scope = lck->lock_name.scope; @@ -315,11 +316,12 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __entry->name_second = le64_to_cpu(lck->lock_name.second); __entry->seq = lck->sequence; __entry->refcnt = lck->refcnt; + __entry->users = lck->users; ), - TP_printk("name %u.%u.%u.%llu.%llu seq %u refs %d", + TP_printk("name %u.%u.%u.%llu.%llu seq %u refs %d users %d", __entry->name_scope, __entry->name_zone, __entry->name_type, __entry->name_first, __entry->name_second, __entry->seq, - __entry->refcnt) + __entry->refcnt, __entry->users) ); DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_resource, @@ -342,7 +344,7 @@ DEFINE_EVENT(scoutfs_lock_class, scoutfs_bast, TP_ARGS(sb, lck) ); -DEFINE_EVENT(scoutfs_lock_class, scoutfs_downconvert_func, +DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_reclaim, TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), TP_ARGS(sb, lck) );