diff --git a/kmod/src/format.h b/kmod/src/format.h index 23bd959c..6251c8f8 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -506,6 +506,19 @@ enum { #define SCOUTFS_XATTR_MAX_PARTS \ DIV_ROUND_UP(SCOUTFS_XATTR_MAX_SIZE, SCOUTFS_XATTR_PART_SIZE) +/* + * structures used by dlm + */ +struct scoutfs_lock_name { + __u8 zone; + __u8 type; + __le64 first; + __le64 second; +} __packed; + +#define SCOUTFS_LOCK_INODE_GROUP_NR 1024 +#define SCOUTFS_LOCK_INODE_GROUP_MASK (SCOUTFS_LOCK_INODE_GROUP_NR - 1) +#define SCOUTFS_LOCK_INODE_GROUP_OFFSET (~0ULL) /* * messages over the wire. diff --git a/kmod/src/lock.c b/kmod/src/lock.c index b6ee02fc..77336cbc 100644 --- a/kmod/src/lock.c +++ b/kmod/src/lock.c @@ -14,16 +14,19 @@ #include #include #include +#include #include "super.h" #include "lock.h" #include "item.h" #include "scoutfs_trace.h" #include "msg.h" +#include "cmp.h" -#include "../dlm/interval_tree_generic.h" - -#include "linux/dlm.h" +#define LN_FMT "%u.%u.%llu.%llu" +#define LN_ARG(name) \ + (name)->zone, (name)->type, le64_to_cpu((name)->first), \ + le64_to_cpu((name)->second) /* * allocated per-super, freed on unmount. @@ -45,21 +48,11 @@ struct lock_info { unsigned long long lru_nr; }; -#define RANGE_LOCK_RESOURCE "fs_range" -#define RANGE_LOCK_RESOURCE_LEN (strlen(RANGE_LOCK_RESOURCE)) - #define DECLARE_LOCK_INFO(sb, name) \ struct lock_info *name = SCOUTFS_SB(sb)->lock_info static void scoutfs_downconvert_func(struct work_struct *work); -#define START(lock) ((lock)->start) -#define LAST(lock) ((lock)->end) -KEYED_INTERVAL_TREE_DEFINE(struct scoutfs_lock, interval_node, - struct scoutfs_key_buf *, subtree_last, START, LAST, - scoutfs_key_compare, static, scoutfs_lock); - - /* * Invalidate caches on this because another node wants a lock * with the a lock with the given mode and range. We always have to @@ -86,9 +79,11 @@ static int invalidate_caches(struct super_block *sb, int mode, static void free_scoutfs_lock(struct scoutfs_lock *lock) { - kfree(lock->start); - kfree(lock->end); - kfree(lock); + if (lock) { + scoutfs_key_free(lock->sb, lock->start); + scoutfs_key_free(lock->sb, lock->end); + kfree(lock); + } } static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock) @@ -103,7 +98,7 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock) if (!refs) { BUG_ON(lock->holders); BUG_ON(delayed_work_pending(&lock->dc_work)); - scoutfs_lock_remove(lock, &linfo->lock_tree); + rb_erase(&lock->node, &linfo->lock_tree); list_del(&lock->lru_entry); spin_unlock(&linfo->lock); free_scoutfs_lock(lock); @@ -113,85 +108,93 @@ static void put_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock) } } -static void init_scoutfs_lock(struct super_block *sb, struct scoutfs_lock *lock, - struct scoutfs_key_buf *start, - struct scoutfs_key_buf *end) -{ - DECLARE_LOCK_INFO(sb, linfo); - - RB_CLEAR_NODE(&lock->interval_node); - lock->sb = sb; - lock->mode = DLM_LOCK_IV; - INIT_DELAYED_WORK(&lock->dc_work, scoutfs_downconvert_func); - INIT_LIST_HEAD(&lock->lru_entry); - - if (start) { - lock->start = start; - lock->dlm_start.val = start->data; - lock->dlm_start.len = start->key_len; - } - if (end) { - lock->end = end; - lock->dlm_end.val = end->data; - lock->dlm_end.len = end->key_len; - } - - spin_lock(&linfo->lock); - lock->sequence = ++linfo->seq_cnt; - spin_unlock(&linfo->lock); -} - static struct scoutfs_lock *alloc_scoutfs_lock(struct super_block *sb, + struct scoutfs_lock_name *lock_name, struct scoutfs_key_buf *start, struct scoutfs_key_buf *end) { - struct scoutfs_key_buf *s, *e; struct scoutfs_lock *lock; - s = scoutfs_key_dup(sb, start); - if (!s) - return NULL; - e = scoutfs_key_dup(sb, end); - if (!e) { - kfree(s); - return NULL; - } lock = kzalloc(sizeof(struct scoutfs_lock), GFP_NOFS); - if (!lock) { - kfree(e); - kfree(s); + if (lock) { + lock->start = scoutfs_key_dup(sb, start); + lock->end = scoutfs_key_dup(sb, end); + if (!lock->start || !lock->end) { + free_scoutfs_lock(lock); + lock = NULL; + } else { + RB_CLEAR_NODE(&lock->node); + lock->sb = sb; + lock->lock_name = *lock_name; + lock->mode = DLM_LOCK_IV; + INIT_DELAYED_WORK(&lock->dc_work, + scoutfs_downconvert_func); + INIT_LIST_HEAD(&lock->lru_entry); + } } - init_scoutfs_lock(sb, lock, s, e); return lock; } +static int cmp_lock_names(struct scoutfs_lock_name *a, + struct scoutfs_lock_name *b) +{ + return (int)a->zone - (int)b->zone ?: + (int)a->type - (int)b->type ?: + scoutfs_cmp_u64s(le64_to_cpu(a->first), le64_to_cpu(b->first)) ?: + scoutfs_cmp_u64s(le64_to_cpu(b->second), le64_to_cpu(b->second)); +} + static struct scoutfs_lock *find_alloc_scoutfs_lock(struct super_block *sb, - struct scoutfs_key_buf *start, - struct scoutfs_key_buf *end) + struct scoutfs_lock_name *lock_name, + struct scoutfs_key_buf *start, + struct scoutfs_key_buf *end) { DECLARE_LOCK_INFO(sb, linfo); - struct scoutfs_lock *found, *new; + struct scoutfs_lock *new = NULL; + struct scoutfs_lock *found; + struct scoutfs_lock *lock; + struct rb_node *parent; + struct rb_node **node; + int cmp; - new = NULL; - spin_lock(&linfo->lock); search: - found = scoutfs_lock_iter_first(&linfo->lock_tree, start, end); + spin_lock(&linfo->lock); + node = &linfo->lock_tree.rb_node; + parent = NULL; + found = NULL; + while (*node) { + parent = *node; + lock = container_of(*node, struct scoutfs_lock, node); + + cmp = cmp_lock_names(lock_name, &lock->lock_name); + if (cmp < 0) { + node = &(*node)->rb_left; + } else if (cmp > 0) { + node = &(*node)->rb_right; + } else { + found = lock; + break; + } + lock = NULL; + } + if (!found) { if (!new) { spin_unlock(&linfo->lock); - new = alloc_scoutfs_lock(sb, start, end); + new = alloc_scoutfs_lock(sb, lock_name, start, end); if (!new) return NULL; - spin_lock(&linfo->lock); goto search; } - new->refcnt = 1; /* Freed by shrinker or on umount */ - scoutfs_lock_insert(new, &linfo->lock_tree); found = new; new = NULL; + found->refcnt = 1; /* Freed by shrinker or on umount */ + found->sequence = ++linfo->seq_cnt; + rb_link_node(&found->node, parent, node); + rb_insert_color(&found->node, &linfo->lock_tree); } found->refcnt++; if (!list_empty(&found->lru_entry)) { @@ -227,7 +230,7 @@ static int shrink_lock_tree(struct shrinker *shrink, struct shrink_control *sc) WARN_ON(lock->refcnt != 1); WARN_ON(lock->flags & SCOUTFS_LOCK_QUEUED); - scoutfs_lock_remove(lock, &linfo->lock_tree); + rb_erase(&lock->node, &linfo->lock_tree); list_del(&lock->lru_entry); list_add_tail(&lock->lru_entry, &list); linfo->lru_nr--; @@ -251,7 +254,7 @@ static void free_lock_tree(struct super_block *sb) while (node) { struct scoutfs_lock *lock; - lock = rb_entry(node, struct scoutfs_lock, interval_node); + lock = rb_entry(node, struct scoutfs_lock, node); node = rb_next(node); put_scoutfs_lock(sb, lock); } @@ -296,13 +299,12 @@ static void set_lock_blocking(struct lock_info *linfo, struct scoutfs_lock *lock queue_blocking_work(linfo, lock, seconds); } -static void scoutfs_rbast(void *astarg, int mode, - struct dlm_key *start, struct dlm_key *end) +static void scoutfs_bast(void *astarg, int mode) { struct scoutfs_lock *lock = astarg; struct lock_info *linfo = SCOUTFS_SB(lock->sb)->lock_info; - trace_scoutfs_rbast(lock->sb, lock); + trace_scoutfs_bast(lock->sb, lock); spin_lock(&linfo->lock); set_lock_blocking(linfo, lock, 0); @@ -341,20 +343,21 @@ static int lock_blocking(struct lock_info *linfo, struct scoutfs_lock *lock) * The caller provides the opaque lock structure used for storage and * their start and end pointers will be accessed while the lock is held. */ -int scoutfs_lock_range(struct super_block *sb, int mode, - struct scoutfs_key_buf *start, - struct scoutfs_key_buf *end, - struct scoutfs_lock **ret_lock) +static int lock_name_keys(struct super_block *sb, int mode, + struct scoutfs_lock_name *lock_name, + struct scoutfs_key_buf *start, + struct scoutfs_key_buf *end, + struct scoutfs_lock **ret_lock) { DECLARE_LOCK_INFO(sb, linfo); struct scoutfs_lock *lock; int ret; - lock = find_alloc_scoutfs_lock(sb, start, end); + lock = find_alloc_scoutfs_lock(sb, lock_name, start, end); if (!lock) return -ENOMEM; - trace_scoutfs_lock_range(sb, lock); + trace_scoutfs_lock_resource(sb, lock); check_lock_state: spin_lock(&linfo->lock); @@ -391,13 +394,12 @@ check_lock_state: lock->holders++; spin_unlock(&linfo->lock); - ret = dlm_lock_range(linfo->ls, mode, &lock->dlm_start, &lock->dlm_end, - &lock->lksb, DLM_LKF_NOORDER, RANGE_LOCK_RESOURCE, - RANGE_LOCK_RESOURCE_LEN, 0, scoutfs_ast, lock, - scoutfs_rbast); + ret = dlm_lock(linfo->ls, mode, &lock->lksb, DLM_LKF_NOORDER, + &lock->lock_name, sizeof(struct scoutfs_lock_name), + 0, scoutfs_ast, lock, scoutfs_bast); if (ret) { - scoutfs_err(sb, "Error %d locking %s\n", ret, - RANGE_LOCK_RESOURCE); + scoutfs_err(sb, "Error %d locking "LN_FMT, ret, + LN_ARG(&lock->lock_name)); put_scoutfs_lock(sb, lock); return ret; } @@ -408,12 +410,41 @@ out: return 0; } -void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lock) +int scoutfs_lock_ino_group(struct super_block *sb, int mode, u64 ino, + struct scoutfs_lock **ret_lock) +{ + struct scoutfs_lock_name lock_name; + struct scoutfs_inode_key start_ikey; + struct scoutfs_inode_key end_ikey; + struct scoutfs_key_buf start; + struct scoutfs_key_buf end; + + ino &= ~(u64)SCOUTFS_LOCK_INODE_GROUP_MASK; + + lock_name.zone = SCOUTFS_FS_ZONE; + lock_name.type = SCOUTFS_INODE_TYPE; + lock_name.first = cpu_to_le64(ino); + lock_name.second = 0; + + start_ikey.zone = SCOUTFS_FS_ZONE; + start_ikey.ino = cpu_to_be64(ino); + start_ikey.type = 0; + scoutfs_key_init(&start, &start_ikey, sizeof(start_ikey)); + + end_ikey.zone = SCOUTFS_FS_ZONE; + end_ikey.ino = cpu_to_be64(ino + SCOUTFS_LOCK_INODE_GROUP_NR - 1); + end_ikey.type = ~0; + scoutfs_key_init(&end, &end_ikey, sizeof(end_ikey)); + + return lock_name_keys(sb, mode, &lock_name, &start, &end, ret_lock); +} + +void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock) { DECLARE_LOCK_INFO(sb, linfo); unsigned int seconds = 60; - trace_scoutfs_unlock_range(sb, lock); + trace_scoutfs_unlock(sb, lock); spin_lock(&linfo->lock); lock->holders--; @@ -432,7 +463,7 @@ static void unlock_range(struct super_block *sb, struct scoutfs_lock *lock) DECLARE_LOCK_INFO(sb, linfo); int ret; - trace_scoutfs_unlock_range(sb, lock); + trace_scoutfs_unlock(sb, lock); BUG_ON(!lock->sequence); @@ -441,8 +472,8 @@ static void unlock_range(struct super_block *sb, struct scoutfs_lock *lock) spin_unlock(&linfo->lock); ret = dlm_unlock(linfo->ls, lock->lksb.sb_lkid, 0, &lock->lksb, lock); if (ret) { - scoutfs_err(sb, "Error %d unlocking %s\n", ret, - RANGE_LOCK_RESOURCE); + scoutfs_err(sb, "Error %d unlocking "LN_FMT, ret, + LN_ARG(&lock->lock_name)); goto out; } diff --git a/kmod/src/lock.h b/kmod/src/lock.h index 38eba1e7..e08bcfd2 100644 --- a/kmod/src/lock.h +++ b/kmod/src/lock.h @@ -1,23 +1,22 @@ #ifndef _SCOUTFS_LOCK_H_ #define _SCOUTFS_LOCK_H_ -#include "../dlm/include/linux/dlm.h" +#include +#include "key.h" #define SCOUTFS_LOCK_BLOCKING 0x01 /* Blocking another lock request */ #define SCOUTFS_LOCK_QUEUED 0x02 /* Put on drop workqueue */ struct scoutfs_lock { struct super_block *sb; + struct scoutfs_lock_name lock_name; struct scoutfs_key_buf *start; struct scoutfs_key_buf *end; int mode; int rqmode; struct dlm_lksb lksb; - struct dlm_key dlm_start; - struct dlm_key dlm_end; unsigned int sequence; /* for debugging and sanity checks */ - struct rb_node interval_node; - struct scoutfs_key_buf *subtree_last; + struct rb_node node; struct list_head lru_entry; unsigned int refcnt; unsigned int holders; /* Tracks active users of this lock */ @@ -25,11 +24,9 @@ struct scoutfs_lock { struct delayed_work dc_work; }; -int scoutfs_lock_range(struct super_block *sb, int mode, - struct scoutfs_key_buf *start, - struct scoutfs_key_buf *end, - struct scoutfs_lock **ret_lock); -void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lock); +int scoutfs_lock_ino_group(struct super_block *sb, int mode, u64 ino, + struct scoutfs_lock **ret_lock); +void scoutfs_unlock(struct super_block *sb, struct scoutfs_lock *lock); int scoutfs_lock_addr(struct super_block *sb, int wanted_mode, void *caller_lvb, unsigned lvb_len); diff --git a/kmod/src/net.c b/kmod/src/net.c index 0a741631..e35cea27 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -1272,7 +1272,7 @@ static void scoutfs_net_shutdown_func(struct work_struct *work) scoutfs_err(sb, "Non-fatal error %d while writing server " "address\n", ret); - scoutfs_unlock_range(sb, sinf->listen_lck); + scoutfs_unlock(sb, sinf->listen_lck); queue_delayed_work(nti->proc_wq, &nti->server_work, 0); } if (sinf == nti->connected_sinf) { @@ -2083,8 +2083,7 @@ static void scoutfs_net_server_func(struct work_struct *work) INIT_WORK(&sinf->listen_work, scoutfs_net_listen_func); INIT_WORK(&sinf->accept_work, scoutfs_net_accept_func); - ret = scoutfs_lock_range(sb, DLM_LOCK_EX, &listen_key, - &listen_key, &sinf->listen_lck); + ret = scoutfs_lock_ino_group(sb, DLM_LOCK_EX, 0, &sinf->listen_lck); if (ret) { kfree(sinf); goto out; diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 4ab822d4..6f36eada 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -298,39 +298,42 @@ DECLARE_EVENT_CLASS(scoutfs_lock_class, TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), TP_ARGS(sb, lck), TP_STRUCT__entry( + __field(u8, name_zone) + __field(u8, name_type) + __field(u64, name_first) + __field(u64, name_second) __field(int, mode) __field(int, rqmode) __field(unsigned int, seq) - __dynamic_array(char, start, scoutfs_key_str(NULL, lck->start)) - __dynamic_array(char, end, scoutfs_key_str(NULL, lck->end)) __field(unsigned int, flags) __field(unsigned int, refcnt) __field(unsigned int, holders) ), TP_fast_assign( + __entry->name_zone = lck->lock_name.zone; + __entry->name_type = lck->lock_name.type; + __entry->name_first = le64_to_cpu(lck->lock_name.first); + __entry->name_second = le64_to_cpu(lck->lock_name.second); __entry->mode = lck->mode; __entry->rqmode = lck->rqmode; __entry->seq = lck->sequence; __entry->flags = lck->flags; __entry->refcnt = lck->refcnt; __entry->holders = lck->holders; - scoutfs_key_str(__get_dynamic_array(start), lck->start); - scoutfs_key_str(__get_dynamic_array(end), lck->end); ), - TP_printk("seq %u refs %d holders %d mode %s rqmode %s flags 0x%x " - "start %s end %s", - __entry->seq, __entry->refcnt, __entry->holders, - lock_mode(__entry->mode), lock_mode(__entry->rqmode), - __entry->flags, __get_str(start), - __get_str(end)) + TP_printk("name %u.%u.%llu.%llu seq %u refs %d holders %d mode %s rqmode %s flags 0x%x", + __entry->name_zone, __entry->name_type, __entry->name_first, + __entry->name_second, __entry->seq, + __entry->refcnt, __entry->holders, lock_mode(__entry->mode), + lock_mode(__entry->rqmode), __entry->flags) ); -DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_range, +DEFINE_EVENT(scoutfs_lock_class, scoutfs_lock_resource, TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), TP_ARGS(sb, lck) ); -DEFINE_EVENT(scoutfs_lock_class, scoutfs_unlock_range, +DEFINE_EVENT(scoutfs_lock_class, scoutfs_unlock, TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), TP_ARGS(sb, lck) ); @@ -340,7 +343,7 @@ DEFINE_EVENT(scoutfs_lock_class, scoutfs_ast, TP_ARGS(sb, lck) ); -DEFINE_EVENT(scoutfs_lock_class, scoutfs_rbast, +DEFINE_EVENT(scoutfs_lock_class, scoutfs_bast, TP_PROTO(struct super_block *sb, struct scoutfs_lock *lck), TP_ARGS(sb, lck) ); diff --git a/kmod/src/xattr.c b/kmod/src/xattr.c index df9923ac..29aba33d 100644 --- a/kmod/src/xattr.c +++ b/kmod/src/xattr.c @@ -178,7 +178,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, goto out; } - ret = scoutfs_lock_range(sb, DLM_LOCK_PR, key, last, &lck); + ret = scoutfs_lock_ino_group(sb, DLM_LOCK_PR, scoutfs_ino(inode), &lck); if (ret) goto out; @@ -229,7 +229,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, ret = -ERANGE; up_read(&si->xattr_rwsem); - scoutfs_unlock_range(sb, lck); + scoutfs_unlock(sb, lck); out: scoutfs_key_free(sb, key); @@ -289,7 +289,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, goto out; } - ret = scoutfs_lock_range(sb, DLM_LOCK_EX, key, last, &lck); + ret = scoutfs_lock_ino_group(sb, DLM_LOCK_EX, scoutfs_ino(inode), &lck); if (ret) goto out; @@ -336,7 +336,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, scoutfs_release_trans(sb); unlock: - scoutfs_unlock_range(sb, lck); + scoutfs_unlock(sb, lck); out: scoutfs_item_free_batch(sb, &list); @@ -386,7 +386,7 @@ ssize_t scoutfs_listxattr(struct dentry *dentry, char *buffer, size_t size) xkey = key->data; xkey->name[0] = '\0'; - ret = scoutfs_lock_range(sb, DLM_LOCK_PR, key, last, &lck); + ret = scoutfs_lock_ino_group(sb, DLM_LOCK_PR, scoutfs_ino(inode), &lck); if (ret) goto out; @@ -436,7 +436,7 @@ ssize_t scoutfs_listxattr(struct dentry *dentry, char *buffer, size_t size) } up_read(&si->xattr_rwsem); - scoutfs_unlock_range(sb, lck); + scoutfs_unlock(sb, lck); out: scoutfs_key_free(sb, key); scoutfs_key_free(sb, last); @@ -469,7 +469,7 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino) } /* while we read to delete we need to writeback others */ - ret = scoutfs_lock_range(sb, DLM_LOCK_EX, key, last, &lck); + ret = scoutfs_lock_ino_group(sb, DLM_LOCK_EX, ino, &lck); if (ret) goto out; @@ -490,7 +490,7 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino) /* don't need to increment past deleted key */ } - scoutfs_unlock_range(sb, lck); + scoutfs_unlock(sb, lck); out: scoutfs_key_free(sb, key);