From e6f3b3ca8fbb10530357f37021b2507bb04ff72e Mon Sep 17 00:00:00 2001 From: Mark Fasheh Date: Mon, 19 Jun 2017 15:26:57 -0500 Subject: [PATCH] scoutfs: add lock caching We refcount our locks and hold them across system calls. If another node wants access to a given lock we'll mark it as blocking in the bast and queue a work item so that the lock can later be released. Otherwise locks are free'd under memory pressure, unmount or after a timer fires. Signed-off-by: Mark Fasheh --- kmod/src/lock.c | 366 +++++++++++++++++++++++++++++++++++---- kmod/src/lock.h | 17 +- kmod/src/net.c | 4 +- kmod/src/scoutfs_trace.h | 32 +++- kmod/src/xattr.c | 16 +- 5 files changed, 386 insertions(+), 49 deletions(-) diff --git a/kmod/src/lock.c b/kmod/src/lock.c index 379983be..ea915df8 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -21,6 +21,8 @@ #include "scoutfs_trace.h" #include "msg.h" +#include "../dlm/interval_tree_generic.h" + #include "linux/dlm.h" /* @@ -28,10 +30,15 @@ * the same fsid. Freed as the last super unmounts. */ struct held_locks { + struct super_block *sb; spinlock_t lock; - struct list_head list; unsigned int seq_cnt; wait_queue_head_t waitq; + struct rb_root lock_tree; + struct workqueue_struct *downconvert_wq; + struct shrinker shrinker; + struct list_head lru_list; + unsigned long long lru_nr; }; /* @@ -51,10 +58,18 @@ struct lock_info { #define RANGE_LOCK_RESOURCE "fs_range" #define RANGE_LOCK_RESOURCE_LEN (strlen(RANGE_LOCK_RESOURCE)) - #define DECLARE_LOCK_INFO(sb, name) \ struct lock_info *name = SCOUTFS_SB(sb)->lock_info +static void scoutfs_downconvert_func(struct work_struct *work); + +#define START(lck) ((lck)->start) +#define LAST(lck) ((lck)->end) +KEYED_INTERVAL_TREE_DEFINE(struct scoutfs_lock, interval_node, + struct scoutfs_key_buf *, subtree_last, START, LAST, + scoutfs_key_compare, static, scoutfs_lock); + + /* * Invalidate caches on this because another node wants a lock * with the a lock with the given mode and range. We always have to @@ -79,14 +94,34 @@ static int invalidate_caches(struct super_block *sb, int mode, return ret; } -static void uninit_scoutfs_lock(struct held_locks *held, - struct scoutfs_lock *lck) +static void free_scoutfs_lock(struct scoutfs_lock *lck) { - spin_lock(&held->lock); - lck->rqmode = SCOUTFS_LOCK_MODE_IV; - list_del_init(&lck->head); - spin_unlock(&held->lock); - lck->sequence = 0; + kfree(lck->start); + kfree(lck->end); + kfree(lck); +} + +static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lck) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct held_locks *held = linfo->held; + unsigned int refs; + + if (lck) { + spin_lock(&held->lock); + BUG_ON(!lck->refcnt); + refs = --lck->refcnt; + if (!refs) { + BUG_ON(lck->holders); + BUG_ON(delayed_work_pending(&lck->dc_work)); + scoutfs_lock_remove(lck, &held->lock_tree); + list_del(&lck->lru_entry); + spin_unlock(&held->lock); + free_scoutfs_lock(lck); + return; + } + spin_unlock(&held->lock); + } } static void init_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lck, @@ -96,10 +131,11 @@ static void init_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lck, DECLARE_LOCK_INFO(sb, linfo); struct held_locks *held = linfo->held; - memset(lck, 0, sizeof(*lck)); - INIT_LIST_HEAD(&lck->head); + RB_CLEAR_NODE(&lck->interval_node); lck->sb = sb; lck->mode = SCOUTFS_LOCK_MODE_IV; + INIT_DELAYED_WORK(&lck->dc_work, scoutfs_downconvert_func); + INIT_LIST_HEAD(&lck->lru_entry); if (start) { lck->start = start; @@ -117,6 +153,124 @@ static void init_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lck, spin_unlock(&held->lock); } +static struct scoutfs_lock *alloc_scoutfs_lock(struct super_block *sb, + struct scoutfs_key_buf *start, + struct scoutfs_key_buf *end) + +{ + struct scoutfs_key_buf *s, *e; + struct scoutfs_lock *lck; + + s = scoutfs_key_dup(sb, start); + if (!s) + return NULL; + e = scoutfs_key_dup(sb, end); + if (!e) { + kfree(s); + return NULL; + } + lck = kzalloc(sizeof(struct scoutfs_lock), GFP_NOFS); + if (!lck) { + kfree(e); + kfree(s); + } + + init_scoutfs_lock(sb, lck, s, e); + return lck; +} + +static struct scoutfs_lock *find_alloc_scoutfs_lock(struct super_block *sb, + struct scoutfs_key_buf *start, + struct scoutfs_key_buf *end) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct held_locks *held = linfo->held; + struct scoutfs_lock *found, *new; + + new = NULL; + spin_lock(&held->lock); +search: + found = scoutfs_lock_iter_first(&held->lock_tree, start, end); + if (!found) { + if (!new) { + spin_unlock(&held->lock); + new = alloc_scoutfs_lock(sb, start, end); + if (!new) + return NULL; + + spin_lock(&held->lock); + goto search; + } + new->refcnt = 1; /* Freed by shrinker or on umount */ + scoutfs_lock_insert(new, &held->lock_tree); + found = new; + new = NULL; + } + found->refcnt++; + if (!list_empty(&found->lru_entry)) { + list_del_init(&found->lru_entry); + held->lru_nr--; + } + spin_unlock(&held->lock); + + kfree(new); + return found; +} + +static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc) +{ + struct held_locks *held = container_of(shrink, struct held_locks, + shrinker); + struct scoutfs_lock *lck; + struct scoutfs_lock *tmp; + unsigned long flags; + unsigned long nr; + LIST_HEAD(list); + + nr = sc->nr_to_scan; + if (!nr) + goto out; + + spin_lock_irqsave(&held->lock, flags); + list_for_each_entry_safe(lck, tmp, &held->lru_list, lru_entry) { + if (nr-- == 0) + break; + + WARN_ON(lck->holders); + WARN_ON(lck->refcnt != 1); + WARN_ON(lck->flags & SCOUTFS_LOCK_QUEUED); + + scoutfs_lock_remove(lck, &held->lock_tree); + list_del(&lck->lru_entry); + list_add_tail(&lck->lru_entry, &list); + held->lru_nr--; + } + spin_unlock_irqrestore(&held->lock, flags); + + list_for_each_entry_safe(lck, tmp, &list, lru_entry) { + trace_shrink_lock_tree(held->sb, lck); + list_del(&lck->lru_entry); + free_scoutfs_lock(lck); + } +out: + return min_t(unsigned long, held->lru_nr, INT_MAX); +} + +static void free_lock_tree(struct super_block *sb) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct held_locks *held = linfo->held; + struct rb_node *node = rb_first(&held->lock_tree); + + while (node) { + struct scoutfs_lock *lck; + + lck = rb_entry(node, struct scoutfs_lock, interval_node); + node = rb_next(node); + put_scoutfs_lock(sb, lck); + } +} + static void scoutfs_ast(void *astarg) { struct scoutfs_lock *lck = astarg; @@ -127,16 +281,48 @@ static void scoutfs_ast(void *astarg) spin_lock(&held->lock); lck->mode = lck->rqmode; - lck->rqmode = SCOUTFS_LOCK_MODE_IV; + /* Clear blocking flag when we are granted an unlock request */ + if (lck->rqmode == DLM_LOCK_IV) + lck->flags &= ~SCOUTFS_LOCK_BLOCKING; + lck->rqmode = DLM_LOCK_IV; spin_unlock(&held->lock); wake_up(&held->waitq); } +static void queue_blocking_work(struct held_locks *held, + struct scoutfs_lock *lck, unsigned int seconds) +{ + assert_spin_locked(&held->lock); + if (!(lck->flags & SCOUTFS_LOCK_QUEUED)) { + /* Take a ref for the workqueue */ + lck->flags |= SCOUTFS_LOCK_QUEUED; + lck->refcnt++; + } + mod_delayed_work(held->downconvert_wq, &lck->dc_work, seconds * HZ); +} + +static void set_lock_blocking(struct held_locks *held, + struct scoutfs_lock *lck, unsigned int seconds) +{ + assert_spin_locked(&held->lock); + lck->flags |= SCOUTFS_LOCK_BLOCKING; + if (lck->holders == 0) + queue_blocking_work(held, lck, seconds); +} + static void scoutfs_rbast(void *astarg, int mode, struct dlm_key *start, struct dlm_key *end) { + struct scoutfs_lock *lck = astarg; + struct lock_info *linfo = SCOUTFS_SB(lck->sb)->lock_info; + struct held_locks *held = linfo->held; + trace_scoutfs_rbast(lck->sb, lck); + + spin_lock(&held->lock); + set_lock_blocking(held, lck, 0); + spin_unlock(&held->lock); } static int lock_granted(struct held_locks *held, struct scoutfs_lock *lck, @@ -151,6 +337,17 @@ static int lock_granted(struct held_locks *held, struct scoutfs_lock *lck, return ret; } +static int lock_blocking(struct held_locks *held, struct scoutfs_lock *lck) +{ + int ret; + + spin_lock(&held->lock); + ret = !!(lck->flags & SCOUTFS_LOCK_BLOCKING); + spin_unlock(&held->lock); + + return ret; +} + /* * Acquire a coherent lock on the given range of keys. While the lock * is held other lockers are serialized. Cache coherency is maintained @@ -163,26 +360,54 @@ static int lock_granted(struct held_locks *held, struct scoutfs_lock *lck, int scoutfs_lock_range(struct super_block *sb, int mode, struct scoutfs_key_buf *start, struct scoutfs_key_buf *end, - struct scoutfs_lock *lck) + struct scoutfs_lock **ret_lck) { DECLARE_LOCK_INFO(sb, linfo); struct held_locks *held = linfo->held; + struct scoutfs_lock *lck; int ret; - init_scoutfs_lock(sb, lck, start, end); + lck = find_alloc_scoutfs_lock(sb, start, end); + if (!lck) + return -ENOMEM; trace_scoutfs_lock_range(sb, lck); +check_lock_state: spin_lock(&held->lock); if (linfo->shutdown) { spin_unlock(&held->lock); + put_scoutfs_lock(sb, lck); return -ESHUTDOWN; } - list_add(&lck->head, &held->list); - spin_unlock(&held->lock); + if (lck->flags & SCOUTFS_LOCK_BLOCKING) { + spin_unlock(&held->lock); + wait_event(held->waitq, !lock_blocking(held, lck)); + goto check_lock_state; + } + + if (lck->mode > DLM_LOCK_IV) { + if (lck->mode < mode) { + /* + * We already have the lock but at a mode which is not + * compatible with what the caller wants. Set the lock + * blocking to let the downconvert thread do it's work + * so we can reacquire at the correct mode. + */ + set_lock_blocking(held, lck, 0); + spin_unlock(&held->lock); + goto check_lock_state; + } + lck->holders++; + spin_unlock(&held->lock); + goto out; + } lck->rqmode = mode; + lck->holders++; + spin_unlock(&held->lock); + ret = dlm_lock_range(linfo->ls, mode, &lck->dlm_start, &lck->dlm_end, &lck->lksb, DLM_LKF_NOORDER, RANGE_LOCK_RESOURCE, RANGE_LOCK_RESOURCE_LEN, 0, scoutfs_ast, lck, @@ -190,16 +415,37 @@ int scoutfs_lock_range(struct super_block *sb, int mode, if (ret) { scoutfs_err(sb, "Error %d locking %s\n", ret, RANGE_LOCK_RESOURCE); - uninit_scoutfs_lock(held, lck); + put_scoutfs_lock(sb, lck); return ret; } wait_event(held->waitq, lock_granted(held, lck, mode)); - +out: + *ret_lck = lck; return 0; } void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck) +{ + DECLARE_LOCK_INFO(sb, linfo); + struct held_locks *held = linfo->held; + unsigned int seconds = 60; + + trace_scoutfs_unlock_range(sb, lck); + + spin_lock(&held->lock); + lck->holders--; + if (lck->holders == 0) { + if (lck->flags & SCOUTFS_LOCK_BLOCKING) + seconds = 0; + queue_blocking_work(held, lck, seconds); + } + spin_unlock(&held->lock); + + put_scoutfs_lock(sb, lck); +} + +static void unlock_range(struct super_block *sb, struct scoutfs_lock *lck) { DECLARE_LOCK_INFO(sb, linfo); struct held_locks *held = linfo->held; @@ -209,15 +455,9 @@ void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck) BUG_ON(!lck->sequence); - /* - * Use write mode to invalidate all since we are completely - * dropping the lock. Once we keep the locks around then we - * can invalidate based on what level we're downconverting to - * (PR, NL). - */ - invalidate_caches(sb, SCOUTFS_LOCK_MODE_WRITE, lck->start, lck->end); - + spin_lock(&held->lock); lck->rqmode = DLM_LOCK_IV; + spin_unlock(&held->lock); ret = dlm_unlock(linfo->ls, lck->lksb.sb_lkid, 0, &lck->lksb, lck); if (ret) { scoutfs_err(sb, "Error %d unlocking %s\n", ret, @@ -227,11 +467,58 @@ void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck) wait_event(held->waitq, lock_granted(held, lck, DLM_LOCK_IV)); out: - uninit_scoutfs_lock(held, lck); - /* lock was removed from held list, wake up umount process */ + /* lock was removed from tree, wake up umount process */ wake_up(&held->waitq); } +static void scoutfs_downconvert_func(struct work_struct *work) +{ + struct scoutfs_lock *lck = container_of(work, struct scoutfs_lock, + dc_work.work); + struct super_block *sb = lck->sb; + DECLARE_LOCK_INFO(sb, linfo); + struct held_locks *held = linfo->held; + + trace_scoutfs_downconvert_func(sb, lck); + + spin_lock(&held->lock); + lck->flags &= ~SCOUTFS_LOCK_QUEUED; + if (lck->holders) + goto out; /* scoutfs_unlock_range will requeue for us */ + + spin_unlock(&held->lock); + + WARN_ON_ONCE(lck->holders); + WARN_ON_ONCE(lck->refcnt == 0); + /* + * Use write mode to invalidate all since we are completely + * dropping the lock. Once we are dowconverting, we can + * invalidate based on what level we're downconverting to (PR, + * NL). + */ + invalidate_caches(sb, SCOUTFS_LOCK_MODE_WRITE, lck->start, lck->end); + unlock_range(sb, lck); + + spin_lock(&held->lock); + /* Check whether we can add the lock to the LRU list: + * + * First, check mode to be sure that the lock wasn't reacquired + * while we slept in unlock_range(). + * + * Next, check refs. refcnt == 1 means the only holder is the + * lock tree so in particular we have nobody in + * scoutfs_lock_range concurrently trying to acquire a lock. + */ + if (lck->mode == SCOUTFS_LOCK_MODE_IV && lck->refcnt == 1 && + list_empty(&lck->lru_entry)) { + list_add_tail(&lck->lru_entry, &held->lru_list); + held->lru_nr++; + } +out: + spin_unlock(&held->lock); + put_scoutfs_lock(sb, lck); +} + /* * The moment this is done we can have other mounts start asking * us to write back and invalidate, so do this very very late. @@ -253,8 +540,12 @@ static int init_lock_info(struct super_block *sb) } spin_lock_init(&held->lock); - INIT_LIST_HEAD(&held->list); init_waitqueue_head(&held->waitq); + INIT_LIST_HEAD(&held->lru_list); + held->shrinker.shrink = shrink_lock_tree; + held->shrinker.seeks = DEFAULT_SEEKS; + register_shrinker(&held->shrinker); + held->sb = sb; linfo->sb = sb; linfo->shutdown = false; @@ -278,7 +569,7 @@ static int can_complete_shutdown(struct held_locks *held) int ret; spin_lock(&held->lock); - ret = !!list_empty(&held->list); + ret = !!RB_EMPTY_ROOT(&held->lock_tree); spin_unlock(&held->lock); return ret; } @@ -313,13 +604,16 @@ void scoutfs_lock_destroy(struct super_block *sb) if (linfo) { held = linfo->held; - wait_event(held->waitq, can_complete_shutdown(held)); + destroy_workqueue(held->downconvert_wq); + unregister_shrinker(&held->shrinker); ret = dlm_release_lockspace(linfo->ls, 2); if (ret) scoutfs_info(sb, "Error %d releasing lockspace %s\n", ret, linfo->ls_name); + free_lock_tree(sb); + sbi->lock_info = NULL; trace_printk("sb %p id %016llx freeing linfo %p held %p\n", @@ -332,6 +626,7 @@ void scoutfs_lock_destroy(struct super_block *sb) int scoutfs_lock_setup(struct super_block *sb) { + struct held_locks *held; struct lock_info *linfo; struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); int ret; @@ -341,6 +636,15 @@ int scoutfs_lock_setup(struct super_block *sb) return ret; linfo = sbi->lock_info; + held = linfo->held; + held->downconvert_wq = alloc_workqueue("scoutfs_dc", + WQ_UNBOUND|WQ_HIGHPRI, 0); + if (!held->downconvert_wq) { + kfree(held); + kfree(linfo); + return -ENOMEM; + } + /* * Open coded '64' here is for lvb_len. We never use the LVB * flag so this doesn't matter, but the dlm needs a non-zero diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 89e820e9..08459e07 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -3,8 +3,10 @@ #include "../dlm/include/linux/dlm.h" +#define SCOUTFS_LOCK_BLOCKING 0x01 /* Blocking another lock request */ +#define SCOUTFS_LOCK_QUEUED 0x02 /* Put on drop workqueue */ + struct scoutfs_lock { - struct list_head head; struct super_block *sb; struct scoutfs_key_buf *start; struct scoutfs_key_buf *end; @@ -14,6 +16,13 @@ struct scoutfs_lock { struct dlm_key dlm_start; struct dlm_key dlm_end; unsigned int sequence; /* for debugging and sanity checks */ + struct rb_node interval_node; + struct scoutfs_key_buf *subtree_last; + struct list_head lru_entry; + unsigned int refcnt; + unsigned int holders; /* Tracks active users of this lock */ + unsigned int flags; + struct delayed_work dc_work; }; enum { @@ -23,9 +32,9 @@ enum { }; int scoutfs_lock_range(struct super_block *sb, int mode, - struct scoutfs_key_buf *start, - struct scoutfs_key_buf *end, - struct scoutfs_lock *lck); + struct scoutfs_key_buf *start, + struct scoutfs_key_buf *end, + struct scoutfs_lock **ret_lck); void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck); int scoutfs_lock_addr(struct super_block *sb, int wanted_mode, diff --git a/kmod/src/net.c b/kmod/src/net.c index 91189304..d971bb36 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -144,7 +144,7 @@ struct sock_info { struct list_head have_sent; struct list_head active_rbufs; - struct scoutfs_lock listen_lck; + struct scoutfs_lock *listen_lck; struct scoutfs_inet_addr addr; struct work_struct listen_work; @@ -1245,7 +1245,7 @@ static void scoutfs_net_shutdown_func(struct work_struct *work) scoutfs_err(sb, "Non-fatal error %d while writing server " "address\n", ret); - scoutfs_unlock_range(sb, &sinf->listen_lck); + scoutfs_unlock_range(sb, sinf->listen_lck); queue_delayed_work(nti->proc_wq, &nti->server_work, 0); } if (sinf == nti->connected_sinf) { diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 175fe3e6..7b1ecff4 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -309,17 +309,26 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, __field(unsigned int, seq) __dynamic_array(char, start, scoutfs_key_str(NULL, lck->start)) __dynamic_array(char, end, scoutfs_key_str(NULL, lck->end)) - ), + __field(unsigned int, flags) + __field(unsigned int, refcnt) + __field(unsigned int, holders) + ), TP_fast_assign( __entry->mode = lck->mode; __entry->rqmode = lck->rqmode; __entry->seq = lck->sequence; + __entry->flags = lck->flags; + __entry->refcnt = lck->refcnt; + __entry->holders = lck->holders; scoutfs_key_str(__get_dynamic_array(start), lck->start); scoutfs_key_str(__get_dynamic_array(end), lck->end); ), - TP_printk("seq %u mode %s rqmode %s start %s end %s", - __entry->seq, lock_mode(__entry->mode), - lock_mode(__entry->rqmode), __get_str(start), __get_str(end)) + TP_printk("seq %u refs %d holders %d mode %s rqmode %s flags 0x%x " + "start %s end %s", + __entry->seq, __entry->refcnt, __entry->holders, + lock_mode(__entry->mode), lock_mode(__entry->rqmode), + __entry->flags, __get_str(start), + __get_str(end)) ); DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_range, @@ -337,6 +346,21 @@ DEFINE_EVENT(scoutfs_lock_class, scoutfs_ast, TP_ARGS(sb, lck) ); +DEFINE_EVENT(scoutfs_lock_class, scoutfs_rbast, + TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), + TP_ARGS(sb, lck) +); + +DEFINE_EVENT(scoutfs_lock_class, scoutfs_downconvert_func, + TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), + TP_ARGS(sb, lck) +); + +DEFINE_EVENT(scoutfs_lock_class, shrink_lock_tree, + TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), + TP_ARGS(sb, lck) +); + TRACE_EVENT(scoutfs_lock_invalidate_sb, TP_PROTO(struct super_block *sb, int mode, struct scoutfs_key_buf *start, struct scoutfs_key_buf *end), diff --git a/kmod/src/xattr.c b/kmod/src/xattr.c index 2bd066d4..fc8a4af7 100644 --- a/kmod/src/xattr.c +++ b/kmod/src/xattr.c @@ -151,7 +151,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, struct scoutfs_key_buf *key = NULL; struct scoutfs_key_buf *last = NULL; SCOUTFS_DECLARE_KVEC(val); - struct scoutfs_lock lck; + struct scoutfs_lock *lck; unsigned int total; unsigned int bytes; unsigned int off; @@ -228,7 +228,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, ret = -ERANGE; up_read(&si->xattr_rwsem); - scoutfs_unlock_range(sb, &lck); + scoutfs_unlock_range(sb, lck); out: scoutfs_key_free(sb, key); @@ -263,7 +263,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, size_t name_len = strlen(name); SCOUTFS_DECLARE_KVEC(val); DECLARE_ITEM_COUNT(cnt); - struct scoutfs_lock lck; + struct scoutfs_lock *lck; unsigned int bytes; unsigned int off; LIST_HEAD(list); @@ -335,7 +335,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, scoutfs_release_trans(sb); unlock: - scoutfs_unlock_range(sb, &lck); + scoutfs_unlock_range(sb, lck); out: scoutfs_item_free_batch(sb, &list); @@ -368,7 +368,7 @@ ssize_t scoutfs_listxattr(struct dentry *dentry, char *buffer, size_t size) struct scoutfs_xattr_key *xkey; struct scoutfs_key_buf *key; struct scoutfs_key_buf *last; - struct scoutfs_lock lck; + struct scoutfs_lock *lck; ssize_t total; int name_len; int ret; @@ -435,7 +435,7 @@ ssize_t scoutfs_listxattr(struct dentry *dentry, char *buffer, size_t size) } up_read(&si->xattr_rwsem); - scoutfs_unlock_range(sb, &lck); + scoutfs_unlock_range(sb, lck); out: scoutfs_key_free(sb, key); scoutfs_key_free(sb, last); @@ -456,7 +456,7 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino) { struct scoutfs_key_buf *key; struct scoutfs_key_buf *last; - struct scoutfs_lock lck; + struct scoutfs_lock *lck; int ret; key = alloc_xattr_key(sb, ino, NULL, SCOUTFS_XATTR_MAX_NAME_LEN, 0); @@ -489,7 +489,7 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino) /* don't need to increment past deleted key */ } - scoutfs_unlock_range(sb, &lck); + scoutfs_unlock_range(sb, lck); out: scoutfs_key_free(sb, key);