scoutfs: use srch to track .srch. xattrs

Using strictly coherent btree items to map the hash of xattr names to
inode numbers proved the value of the functionality, but it was too
expensive.  We now have the more efficient srch infrastructure to use.

We change from the .indx. to the .srch. tag, and change the ioctl from
find_xattr to search_xattrs.  The idea is to communicate that these are
accelerated searches, not precise index lookups and are relatively
expensive.

Rather than maintaining btree items, xattr setting and deleting emits
srch entries which either tracks the xattr or combines with the previous
tracker and removes the entry.  These are done under the lock that
protects the main xattr item, we can remove the separate locking of the
previous index items.

The semantics of the search ioctl needs to change a bit.  Because
searches are so expensive we now return a flag to indicate that the
search completed.  While we're there, we also allow a last_ino parameter
so that searches can be divided up and run in parallel.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2020-06-23 09:49:06 -07:00
committed by Zach Brown
parent f8e1812288
commit c415cab1e9
3 changed files with 91 additions and 113 deletions

View File

@@ -34,6 +34,7 @@
#include "trans.h"
#include "xattr.h"
#include "hash.h"
#include "srch.h"
#include "scoutfs_trace.h"
/*
@@ -759,18 +760,18 @@ out:
* but we don't check that the callers xattr name contains the tag and
* search for it regardless.
*/
static long scoutfs_ioc_find_xattrs(struct file *file, unsigned long arg)
static long scoutfs_ioc_search_xattrs(struct file *file, unsigned long arg)
{
struct super_block *sb = file_inode(file)->i_sb;
struct scoutfs_ioctl_find_xattrs __user *ufx = (void __user *)arg;
struct scoutfs_ioctl_find_xattrs fx;
struct scoutfs_lock *lock = NULL;
struct scoutfs_key last;
struct scoutfs_key key;
struct scoutfs_ioctl_search_xattrs __user *usx = (void __user *)arg;
struct scoutfs_ioctl_search_xattrs sx;
struct scoutfs_srch_rb_root sroot;
struct scoutfs_srch_rb_node *snode;
u64 __user *uinos;
struct rb_node *node;
char *name = NULL;
int total = 0;
u64 hash;
u64 ino;
bool done = false;
u64 total = 0;
int ret;
if (!(file->f_mode & FMODE_READ)) {
@@ -783,67 +784,59 @@ static long scoutfs_ioc_find_xattrs(struct file *file, unsigned long arg)
goto out;
}
if (copy_from_user(&fx, ufx, sizeof(fx))) {
if (copy_from_user(&sx, usx, sizeof(sx))) {
ret = -EFAULT;
goto out;
}
uinos = (u64 __user *)sx.inodes_ptr;
if (fx.name_bytes > SCOUTFS_XATTR_MAX_NAME_LEN) {
if (sx.name_bytes > SCOUTFS_XATTR_MAX_NAME_LEN) {
ret = -EINVAL;
goto out;
}
name = kmalloc(fx.name_bytes, GFP_KERNEL);
if (sx.nr_inodes == 0 || sx.last_ino < sx.next_ino) {
ret = 0;
goto out;
}
name = kmalloc(sx.name_bytes, GFP_KERNEL);
if (!name) {
ret = -ENOMEM;
goto out;
}
if (copy_from_user(name, (void __user *)fx.name_ptr, fx.name_bytes)) {
if (copy_from_user(name, (void __user *)sx.name_ptr, sx.name_bytes)) {
ret = -EFAULT;
goto out;
}
hash = scoutfs_hash64(name, fx.name_bytes);
scoutfs_xattr_index_key(&key, hash, fx.next_ino, 0);
scoutfs_xattr_index_key(&last, hash, U64_MAX, U64_MAX);
ino = 0;
ret = scoutfs_lock_xattr_index(sb, SCOUTFS_LOCK_READ, 0, hash, &lock);
ret = scoutfs_srch_search_xattrs(sb, &sroot,
scoutfs_hash64(name, sx.name_bytes),
sx.next_ino, sx.last_ino, &done);
if (ret < 0)
goto out;
while (fx.nr_inodes) {
ret = scoutfs_forest_next(sb, &key, &last, NULL, lock);
if (ret < 0) {
if (ret == -ENOENT)
ret = 0;
scoutfs_srch_foreach_rb_node(snode, node, &sroot) {
if (put_user(snode->ino, uinos + total)) {
ret = -EFAULT;
break;
}
/* xattrs hashes can collide and add multiple entries */
if (le64_to_cpu(key.skxi_ino) != ino) {
ino = le64_to_cpu(key.skxi_ino);
if (put_user(ino, (u64 __user *)fx.inodes_ptr)) {
ret = -EFAULT;
break;
}
fx.inodes_ptr += sizeof(u64);
fx.nr_inodes--;
total++;
ret = 0;
}
scoutfs_key_inc(&key);
if (++total == sx.nr_inodes)
break;
}
scoutfs_unlock(sb, lock, SCOUTFS_LOCK_READ);
sx.output_flags = 0;
if (done && total == sroot.nr)
sx.output_flags |= SCOUTFS_SEARCH_XATTRS_OFLAG_END;
if (put_user(sx.output_flags, &usx->output_flags))
ret = -EFAULT;
else
ret = 0;
out:
scoutfs_srch_destroy_rb_root(&sroot);
kfree(name);
return ret ?: total;
}
@@ -887,8 +880,8 @@ long scoutfs_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
return scoutfs_ioc_setattr_more(file, arg);
case SCOUTFS_IOC_LISTXATTR_HIDDEN:
return scoutfs_ioc_listxattr_hidden(file, arg);
case SCOUTFS_IOC_FIND_XATTRS:
return scoutfs_ioc_find_xattrs(file, arg);
case SCOUTFS_IOC_SEARCH_XATTRS:
return scoutfs_ioc_search_xattrs(file, arg);
case SCOUTFS_IOC_STATFS_MORE:
return scoutfs_ioc_statfs_more(file, arg);
case SCOUTFS_IOC_DATA_WAIT_ERR:

View File

@@ -296,34 +296,57 @@ struct scoutfs_ioctl_listxattr_hidden {
/*
* Return the inode numbers of inodes which might contain the given
* named xattr. The inode may not have a set xattr with that name, the
* caller must check the returned inodes to see if they match.
* xattr. The inode may not have a set xattr with that name, the caller
* must check the returned inodes to see if they match.
*
* @next_ino: The next inode number that could be returned. Initialized
* to 0 when first searching and set to one past the last inode number
* returned to continue searching.
* @name_ptr: The address of the name of the xattr to search for. It does
* not need to be null terminated.
* @inodes_ptr: The address of the array of uint64_t inode numbers in which
* to store inode numbers that may contain the xattr. EFAULT may be returned
* if this address is not naturally aligned.
* @name_bytes: The number of non-null bytes found in the name at name_ptr.
* @last_ino: The last inode number that could be returned. U64_MAX to
* find all inodes.
* @name_ptr: The address of the name of the xattr to search for. It is
* not null terminated.
* @inodes_ptr: The address of the array of uint64_t inode numbers in
* which to store inode numbers that may contain the xattr. EFAULT may
* be returned if this address is not naturally aligned.
* @output_flags: Set as success is returned. If an error is returned
* then this field is undefined and should not be read.
* @nr_inodes: The number of elements in the array found at inodes_ptr.
* @name_bytes: The number of non-null bytes found in the name at
* name_ptr.
*
* This requires the CAP_SYS_ADMIN capability and will return -EPERM if
* it's not granted.
*
* The number of inode numbers stored in the inodes_ptr array is
* returned. If nr_inodes is 0 or last_ino is less than next_ino then 0
* will be immediately returned.
*
* Partial progress can be returned if an error is hit or if nr_inodes
* was larger than the internal limit on the number of inodes returned
* in a search pass. The _END output flag is set if all the results
* including last_ino were searched in this pass.
*
* It's valuable to provide a large inodes array so that all the results
* can be found in one search pass and _END can be set. There are
* significant constant costs for performing each search pass.
*/
struct scoutfs_ioctl_find_xattrs {
struct scoutfs_ioctl_search_xattrs {
__u64 next_ino;
__u64 last_ino;
__u64 name_ptr;
__u64 inodes_ptr;
__u64 output_flags;
__u64 nr_inodes;
__u16 name_bytes;
__u16 nr_inodes;
__u8 _pad[4];
__u8 _pad[6];
};
#define SCOUTFS_IOC_FIND_XATTRS _IOR(SCOUTFS_IOCTL_MAGIC, 9, \
struct scoutfs_ioctl_find_xattrs)
/* set in output_flags if returned inodes reached last_ino */
#define SCOUTFS_SEARCH_XATTRS_OFLAG_END (1ULL << 0)
#define SCOUTFS_IOC_SEARCH_XATTRS _IOR(SCOUTFS_IOCTL_MAGIC, 9, \
struct scoutfs_ioctl_search_xattrs)
/*
* Give the user information about the filesystem.

View File

@@ -96,11 +96,11 @@ static int unknown_prefix(const char *name)
struct prefix_tags {
unsigned long hide:1,
indx:1;
srch:1;
};
#define HIDE_TAG "hide."
#define INDX_TAG "indx."
#define SRCH_TAG "srch."
#define TAG_LEN (sizeof(HIDE_TAG) - 1)
static int parse_tags(const char *name, unsigned int name_len,
@@ -120,8 +120,8 @@ static int parse_tags(const char *name, unsigned int name_len,
if (!strncmp(name, HIDE_TAG, TAG_LEN)) {
if (++tgs->hide == 0)
return -EINVAL;
} else if (!strncmp(name, INDX_TAG, TAG_LEN)) {
if (++tgs->indx == 0)
} else if (!strncmp(name, SRCH_TAG, TAG_LEN)) {
if (++tgs->srch == 0)
return -EINVAL;
} else {
/* only reason to use scoutfs. is tags */
@@ -412,19 +412,17 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
struct super_block *sb = inode->i_sb;
const u64 ino = scoutfs_ino(inode);
struct scoutfs_xattr *xat = NULL;
struct scoutfs_lock *indx_lock = NULL;
struct scoutfs_lock *lck = NULL;
size_t name_len = strlen(name);
struct scoutfs_key indx_key;
struct scoutfs_key key;
struct prefix_tags tgs;
bool undo_indx = false;
bool undo_srch = false;
LIST_HEAD(ind_locks);
LIST_HEAD(saved);
u8 found_parts;
unsigned int bytes;
u64 ind_seq;
u64 hash;
u64 hash = 0;
u64 id = 0;
int ret;
int err;
@@ -447,7 +445,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
if (parse_tags(name, name_len, &tgs) != 0)
return -EINVAL;
if ((tgs.hide || tgs.indx) && !capable(CAP_SYS_ADMIN))
if ((tgs.hide || tgs.srch) && !capable(CAP_SYS_ADMIN))
return -EPERM;
bytes = sizeof(struct scoutfs_xattr) + name_len + size;
@@ -498,14 +496,6 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
memcpy(&xat->name[xat->name_len], value, size);
}
if (tgs.indx && !(found_parts && value)) {
hash = scoutfs_hash64(name, name_len);
ret = scoutfs_lock_xattr_index(sb, SCOUTFS_LOCK_WRITE_ONLY, 0,
hash, &indx_lock);
if (ret < 0)
goto unlock;
}
retry:
ret = scoutfs_inode_index_start(sb, &ind_seq) ?:
scoutfs_inode_index_prepare(sb, &ind_locks, inode, false) ?:
@@ -513,7 +503,7 @@ retry:
SIC_XATTR_SET(found_parts,
value != NULL,
name_len, size,
tgs.indx));
tgs.srch));
if (ret > 0)
goto retry;
if (ret)
@@ -523,20 +513,14 @@ retry:
if (ret < 0)
goto release;
if (tgs.indx && !(found_parts && value)) {
if (tgs.srch && !(found_parts && value)) {
if (found_parts)
id = le64_to_cpu(key.skx_id);
hash = scoutfs_hash64(name, name_len);
scoutfs_xattr_index_key(&indx_key, hash, ino, id);
if (value)
ret = scoutfs_forest_create_force(sb, &indx_key, NULL,
indx_lock);
else
ret = scoutfs_forest_delete_force(sb, &indx_key,
indx_lock);
ret = scoutfs_forest_srch_add(sb, hash, ino, id);
if (ret < 0)
goto release;
undo_indx = true;
undo_srch = true;
}
ret = 0;
@@ -559,13 +543,8 @@ retry:
ret = 0;
release:
if (ret < 0 && undo_indx) {
if (value)
err = scoutfs_forest_delete_force(sb, &indx_key,
indx_lock);
else
err = scoutfs_forest_create_force(sb, &indx_key, NULL,
indx_lock);
if (ret < 0 && undo_srch) {
err = scoutfs_forest_srch_add(sb, hash, ino, id);
BUG_ON(err);
}
@@ -573,7 +552,6 @@ release:
scoutfs_inode_index_unlock(sb, &ind_locks);
unlock:
up_write(&si->xattr_rwsem);
scoutfs_unlock(sb, indx_lock, SCOUTFS_LOCK_WRITE_ONLY);
scoutfs_unlock(sb, lck, SCOUTFS_LOCK_WRITE);
out:
kfree(xat);
@@ -693,9 +671,7 @@ ssize_t scoutfs_listxattr(struct dentry *dentry, char *buffer, size_t size)
int scoutfs_xattr_drop(struct super_block *sb, u64 ino,
struct scoutfs_lock *lock)
{
struct scoutfs_lock *indx_lock = NULL;
struct scoutfs_xattr *xat = NULL;
struct scoutfs_key indx_key;
struct scoutfs_key last;
struct scoutfs_key key;
struct prefix_tags tgs;
@@ -729,17 +705,6 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino,
parse_tags(xat->name, xat->name_len, &tgs) != 0)
memset(&tgs, 0, sizeof(tgs));
if (tgs.indx) {
hash = scoutfs_hash64(xat->name, xat->name_len);
scoutfs_xattr_index_key(&indx_key, hash, ino,
le64_to_cpu(key.skx_id));
ret = scoutfs_lock_xattr_index(sb,
SCOUTFS_LOCK_WRITE_ONLY,
0, hash, &indx_lock);
if (ret < 0)
break;
}
ret = scoutfs_hold_trans(sb, SIC_EXACT(2, 0));
if (ret < 0)
break;
@@ -749,9 +714,10 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino,
if (ret < 0)
break;
if (tgs.indx) {
ret = scoutfs_forest_delete_force(sb, &indx_key,
indx_lock);
if (tgs.srch) {
hash = scoutfs_hash64(xat->name, xat->name_len);
ret = scoutfs_forest_srch_add(sb, hash, ino,
le64_to_cpu(key.skx_id));
if (ret < 0)
break;
}
@@ -759,15 +725,11 @@ int scoutfs_xattr_drop(struct super_block *sb, u64 ino,
scoutfs_release_trans(sb);
release = false;
scoutfs_unlock(sb, indx_lock, SCOUTFS_LOCK_WRITE_ONLY);
indx_lock = NULL;
/* don't need to inc, next won't see deleted item */
}
if (release)
scoutfs_release_trans(sb);
scoutfs_unlock(sb, indx_lock, SCOUTFS_LOCK_WRITE_ONLY);
kfree(xat);
out:
return ret;