Don't use vmalloc in get/set xattr

Extended attribute values can be larger than a reasonable maximum size
for our btree items so we store xattrs in many items.  The first pass at
this code used vmalloc to make it relatively easy to work with a
contiguous buffer that was cut up into multiple items.

The problem, of course, is that vmalloc() is expensive.  Well, the
problem is that I always forget just how expensive it can be and use it
when I shouldn't.  We had loads on high cpu count machines that were
catastrophically cpu bound on all the contentious work that vmalloc does
to maintain a coherent global address space.

This removes the use of vmalloc and only allocates a small buffer for
the first compound item.  The later items directly reference regions of
value buffer rather than copying it to and from the large intermediate
vmalloced buffer.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2022-03-21 15:53:39 -07:00
parent 9c751c1197
commit 4e8a088cc5

View File

@@ -57,12 +57,6 @@ static u32 xattr_names_equal(const char *a_name, unsigned int a_len,
return a_len == b_len && memcmp(a_name, b_name, a_len) == 0;
}
static unsigned int xattr_full_bytes(struct scoutfs_xattr *xat)
{
return offsetof(struct scoutfs_xattr,
name[xat->name_len + le16_to_cpu(xat->val_len)]);
}
static unsigned int xattr_nr_parts(struct scoutfs_xattr *xat)
{
return SCOUTFS_XATTR_NR_PARTS(xat->name_len,
@@ -137,12 +131,29 @@ int scoutfs_xattr_parse_tags(const char *name, unsigned int name_len,
}
/*
* Find the next xattr and copy the key, xattr header, and as much of
* the name and value into the callers buffer as we can. Returns the
* number of bytes copied which include the header, name, and value and
* can be limited by the xattr length or the callers buffer. The caller
* is responsible for comparing their lengths, the header, and the
* returned length before safely using the xattr.
* xattrs are stored in multiple items. The first item is a
* concatenation of an initial header, the name, and then as much of the
* value as fits in the remainder of the first item. This return the
* size of the first item that'd store an xattr with the given name
* length and value payload size.
*/
static int first_item_bytes(int name_len, size_t size)
{
if (WARN_ON_ONCE(name_len <= 0) ||
WARN_ON_ONCE(name_len > SCOUTFS_XATTR_MAX_NAME_LEN))
return 0;
return min_t(int, sizeof(struct scoutfs_xattr) + name_len + size,
SCOUTFS_XATTR_MAX_PART_SIZE);
}
/*
* Find the next xattr, set the caller's key, and copy as much of the
* first item into the callers buffer as we can. Returns the number of
* bytes copied which can include the header, name, and start of the
* value from the first item. The caller is responsible for comparing
* their lengths, the header, and the returned length before safely
* using the buffer.
*
* If a name is provided then we'll iterate over items with a matching
* name_hash until we find a matching name. If we don't find a matching
@@ -154,20 +165,17 @@ int scoutfs_xattr_parse_tags(const char *name, unsigned int name_len,
* Returns -ENOENT if it didn't find a next item.
*/
static int get_next_xattr(struct inode *inode, struct scoutfs_key *key,
struct scoutfs_xattr *xat, unsigned int bytes,
struct scoutfs_xattr *xat, unsigned int xat_bytes,
const char *name, unsigned int name_len,
u64 name_hash, u64 id, struct scoutfs_lock *lock)
{
struct super_block *sb = inode->i_sb;
struct scoutfs_key last;
u8 last_part;
int total;
u8 part;
int ret;
/* need to be able to see the name we're looking for */
if (WARN_ON_ONCE(name_len > 0 && bytes < offsetof(struct scoutfs_xattr,
name[name_len])))
if (WARN_ON_ONCE(name_len > 0 &&
xat_bytes < offsetof(struct scoutfs_xattr, name[name_len])))
return -EINVAL;
if (name_len)
@@ -176,26 +184,15 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key,
init_xattr_key(key, scoutfs_ino(inode), name_hash, id);
init_xattr_key(&last, scoutfs_ino(inode), U32_MAX, U64_MAX);
last_part = 0;
part = 0;
total = 0;
for (;;) {
key->skx_part = part;
ret = scoutfs_item_next(sb, key, &last,
(void *)xat + total, bytes - total,
lock);
if (ret < 0) {
/* XXX corruption, ran out of parts */
if (ret == -ENOENT && part > 0)
ret = -EIO;
ret = scoutfs_item_next(sb, key, &last, xat, xat_bytes, lock);
if (ret < 0)
break;
}
trace_scoutfs_xattr_get_next_key(sb, key);
/* XXX corruption */
if (key->skx_part != part) {
if (key->skx_part != 0) {
ret = -EIO;
break;
}
@@ -205,8 +202,7 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key,
* the first part and if the next xattr name fits in our
* buffer then the item must have included it.
*/
if (part == 0 &&
(ret < sizeof(struct scoutfs_xattr) ||
if ((ret < sizeof(struct scoutfs_xattr) ||
(xat->name_len <= name_len &&
ret < offsetof(struct scoutfs_xattr,
name[xat->name_len])) ||
@@ -216,7 +212,7 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key,
break;
}
if (part == 0 && name_len) {
if (name_len > 0) {
/* ran out of names that could match */
if (le64_to_cpu(key->skx_name_hash) != name_hash) {
ret = -ENOENT;
@@ -224,64 +220,126 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key,
}
/* keep looking for our name */
if (!xattr_names_equal(name, name_len,
xat->name, xat->name_len)) {
part = 0;
if (!xattr_names_equal(name, name_len, xat->name, xat->name_len)) {
le64_add_cpu(&key->skx_id, 1);
continue;
}
/* use the matching name we found */
last_part = xattr_nr_parts(xat) - 1;
}
total += ret;
if (total == bytes || part == last_part) {
/* copied as much as we could */
ret = total;
break;
}
part++;
/* found next name */
break;
}
return ret;
}
/*
* The caller has already read and verified the xattr's first item.
* Copy the value from the tail of the first item and from any future
* items into the destination buffer.
*/
static int copy_xattr_value(struct super_block *sb, struct scoutfs_key *xat_key,
struct scoutfs_xattr *xat, int xat_bytes,
char *buffer, size_t size,
struct scoutfs_lock *lock)
{
struct scoutfs_key key;
size_t copied = 0;
int val_tail;
int bytes;
int ret;
int i;
/* must have first item up to value */
if (WARN_ON_ONCE(xat_bytes < sizeof(struct scoutfs_xattr)) ||
WARN_ON_ONCE(xat_bytes < offsetof(struct scoutfs_xattr, name[xat->name_len])))
return -EINVAL;
/* only ever copy up to the full value */
size = min_t(size_t, size, le16_to_cpu(xat->val_len));
/* must have full first item if caller needs value from second item */
val_tail = SCOUTFS_XATTR_MAX_PART_SIZE -
offsetof(struct scoutfs_xattr, name[xat->name_len]);
if (WARN_ON_ONCE(size > val_tail && xat_bytes != SCOUTFS_XATTR_MAX_PART_SIZE))
return -EINVAL;
/* copy from tail of first item */
bytes = min_t(unsigned int, size, val_tail);
if (bytes > 0) {
memcpy(buffer, &xat->name[xat->name_len], bytes);
copied += bytes;
}
key = *xat_key;
for (i = 1; copied < size; i++) {
key.skx_part = i;
bytes = min_t(unsigned int, size - copied, SCOUTFS_XATTR_MAX_PART_SIZE);
ret = scoutfs_item_lookup(sb, &key, buffer + copied, bytes, lock);
if (ret >= 0 && ret != bytes)
ret = -EIO;
if (ret < 0)
return ret;
copied += ret;
}
return copied;
}
/*
* The caller is working with items that are either in the allocated
* first compound item or further items that are offsets into a value
* buffer. Give them a pointer and length of the start of the item.
*/
static void xattr_item_part_buffer(void **buf, int *len, int part,
struct scoutfs_xattr *xat, unsigned int xat_bytes,
const char *value, size_t size)
{
int off;
if (part == 0) {
*buf = xat;
*len = xat_bytes;
} else {
off = (part * SCOUTFS_XATTR_MAX_PART_SIZE) -
offsetof(struct scoutfs_xattr, name[xat->name_len]);
BUG_ON(off >= size); /* calls limited by number of parts */
*buf = (void *)value + off;
*len = min_t(size_t, size - off, SCOUTFS_XATTR_MAX_PART_SIZE);
}
}
/*
* Create all the items associated with the given xattr. If this
* returns an error it will have already cleaned up any items it created
* before seeing the error.
*/
static int create_xattr_items(struct inode *inode, u64 id,
struct scoutfs_xattr *xat, unsigned int bytes,
static int create_xattr_items(struct inode *inode, u64 id, struct scoutfs_xattr *xat,
int xat_bytes, const char *value, size_t size, u8 new_parts,
struct scoutfs_lock *lock)
{
struct super_block *sb = inode->i_sb;
struct scoutfs_key key;
unsigned int part_bytes;
unsigned int total;
int ret;
int ret = 0;
void *buf;
int len;
int i;
init_xattr_key(&key, scoutfs_ino(inode),
xattr_name_hash(xat->name, xat->name_len), id);
total = 0;
ret = 0;
while (total < bytes) {
part_bytes = min_t(unsigned int, bytes - total,
SCOUTFS_XATTR_MAX_PART_SIZE);
for (i = 0; i < new_parts; i++) {
key.skx_part = i;
xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size);
ret = scoutfs_item_create(sb, &key,
(void *)xat + total, part_bytes,
lock);
if (ret) {
ret = scoutfs_item_create(sb, &key, buf, len, lock);
if (ret < 0) {
while (key.skx_part-- > 0)
scoutfs_item_delete(sb, &key, lock);
break;
}
total += part_bytes;
key.skx_part++;
}
return ret;
@@ -329,20 +387,20 @@ out:
* deleted items.
*/
static int change_xattr_items(struct inode *inode, u64 id,
struct scoutfs_xattr *new_xat,
unsigned int new_bytes, u8 new_parts,
u8 old_parts, struct scoutfs_lock *lock)
struct scoutfs_xattr *xat, int xat_bytes,
const char *value, size_t size,
u8 new_parts, u8 old_parts, struct scoutfs_lock *lock)
{
struct super_block *sb = inode->i_sb;
struct scoutfs_key key;
int last_created = -1;
int bytes;
int off;
void *buf;
int len;
int i;
int ret;
init_xattr_key(&key, scoutfs_ino(inode),
xattr_name_hash(new_xat->name, new_xat->name_len), id);
xattr_name_hash(xat->name, xat->name_len), id);
/* dirty existing old items */
for (i = 0; i < old_parts; i++) {
@@ -354,13 +412,10 @@ static int change_xattr_items(struct inode *inode, u64 id,
/* create any new items past the old */
for (i = old_parts; i < new_parts; i++) {
off = i * SCOUTFS_XATTR_MAX_PART_SIZE;
bytes = min_t(unsigned int, new_bytes - off,
SCOUTFS_XATTR_MAX_PART_SIZE);
key.skx_part = i;
ret = scoutfs_item_create(sb, &key, (void *)new_xat + off,
bytes, lock);
xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size);
ret = scoutfs_item_create(sb, &key, buf, len, lock);
if (ret)
goto out;
@@ -369,13 +424,10 @@ static int change_xattr_items(struct inode *inode, u64 id,
/* update dirtied overlapping existing items, last partial first */
for (i = min(old_parts, new_parts) - 1; i >= 0; i--) {
off = i * SCOUTFS_XATTR_MAX_PART_SIZE;
bytes = min_t(unsigned int, new_bytes - off,
SCOUTFS_XATTR_MAX_PART_SIZE);
key.skx_part = i;
ret = scoutfs_item_update(sb, &key, (void *)new_xat + off,
bytes, lock);
xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size);
ret = scoutfs_item_update(sb, &key, buf, len, lock);
/* only last partial can fail, then we unwind created */
if (ret < 0)
goto out;
@@ -412,7 +464,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer,
struct scoutfs_xattr *xat = NULL;
struct scoutfs_lock *lck = NULL;
struct scoutfs_key key;
unsigned int bytes;
unsigned int xat_bytes;
size_t name_len;
int ret;
@@ -423,9 +475,8 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer,
if (name_len > SCOUTFS_XATTR_MAX_NAME_LEN)
return -ENODATA;
/* only need enough for caller's name and value sizes */
bytes = sizeof(struct scoutfs_xattr) + name_len + size;
xat = __vmalloc(bytes, GFP_NOFS, PAGE_KERNEL);
xat_bytes = first_item_bytes(name_len, size);
xat = kmalloc(xat_bytes, GFP_NOFS);
if (!xat)
return -ENOMEM;
@@ -435,8 +486,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer,
down_read(&si->xattr_rwsem);
ret = get_next_xattr(inode, &key, xat, bytes,
name, name_len, 0, 0, lck);
ret = get_next_xattr(inode, &key, xat, xat_bytes, name, name_len, 0, 0, lck);
up_read(&si->xattr_rwsem);
scoutfs_unlock(sb, lck, SCOUTFS_LOCK_READ);
@@ -459,16 +509,9 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer,
goto out;
}
/* XXX corruption, the items didn't match the header */
if (ret < xattr_full_bytes(xat)) {
ret = -EIO;
goto out;
}
ret = le16_to_cpu(xat->val_len);
memcpy(buffer, &xat->name[xat->name_len], ret);
ret = copy_xattr_value(sb, &key, xat, xat_bytes, buffer, size, lck);
out:
vfree(xat);
kfree(xat);
return ret;
}
@@ -596,7 +639,8 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
bool undo_totl = false;
LIST_HEAD(ind_locks);
u8 found_parts;
unsigned int bytes;
unsigned int xat_bytes_totl;
unsigned int xat_bytes;
unsigned int val_len;
u64 ind_seq;
u64 total;
@@ -629,9 +673,12 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
if (tgs.totl && ((ret = parse_totl_key(&totl_key, name, name_len)) != 0))
return ret;
bytes = sizeof(struct scoutfs_xattr) + name_len + size;
/* alloc enough to read old totl value */
xat = __vmalloc(bytes + SCOUTFS_XATTR_MAX_TOTL_U64, GFP_NOFS, PAGE_KERNEL);
/* allocate enough to always read an existing xattr's totl */
xat_bytes_totl = first_item_bytes(name_len,
max_t(size_t, size, SCOUTFS_XATTR_MAX_TOTL_U64));
/* but store partial first item that only includes the new xattr's value */
xat_bytes = first_item_bytes(name_len, size);
xat = kmalloc(xat_bytes_totl, GFP_NOFS);
if (!xat) {
ret = -ENOMEM;
goto out;
@@ -645,9 +692,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
down_write(&si->xattr_rwsem);
/* find an existing xattr to delete, including possible totl value */
ret = get_next_xattr(inode, &key, xat,
sizeof(struct scoutfs_xattr) + name_len + SCOUTFS_XATTR_MAX_TOTL_U64,
name, name_len, 0, 0, lck);
ret = get_next_xattr(inode, &key, xat, xat_bytes_totl, name, name_len, 0, 0, lck);
if (ret < 0 && ret != -ENOENT)
goto unlock;
@@ -683,7 +728,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
le64_add_cpu(&tval.total, -total);
}
/* prepare our xattr */
/* prepare the xattr header, name, and start of value in first item */
if (value) {
if (found_parts)
id = le64_to_cpu(key.skx_id);
@@ -693,7 +738,9 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name,
xat->val_len = cpu_to_le16(size);
memset(xat->__pad, 0, sizeof(xat->__pad));
memcpy(xat->name, name, name_len);
memcpy(&xat->name[xat->name_len], value, size);
memcpy(&xat->name[name_len], value,
min(size, SCOUTFS_XATTR_MAX_PART_SIZE -
offsetof(struct scoutfs_xattr, name[name_len])));
if (tgs.totl) {
ret = parse_totl_u64(value, size, &total);
@@ -741,14 +788,15 @@ retry:
}
if (found_parts && value)
ret = change_xattr_items(inode, id, xat, bytes,
ret = change_xattr_items(inode, id, xat, xat_bytes, value, size,
xattr_nr_parts(xat), found_parts, lck);
else if (found_parts)
ret = delete_xattr_items(inode, le64_to_cpu(key.skx_name_hash),
le64_to_cpu(key.skx_id), found_parts,
lck);
else
ret = create_xattr_items(inode, id, xat, bytes, lck);
ret = create_xattr_items(inode, id, xat, xat_bytes, value, size,
xattr_nr_parts(xat), lck);
if (ret < 0)
goto release;
@@ -778,7 +826,7 @@ unlock:
scoutfs_unlock(sb, lck, SCOUTFS_LOCK_WRITE);
scoutfs_unlock(sb, totl_lock, SCOUTFS_LOCK_WRITE_ONLY);
out:
vfree(xat);
kfree(xat);
return ret;
}
@@ -807,7 +855,7 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer,
struct scoutfs_xattr *xat = NULL;
struct scoutfs_lock *lck = NULL;
struct scoutfs_key key;
unsigned int bytes;
unsigned int xat_bytes;
ssize_t total = 0;
u32 name_hash = 0;
bool is_hidden;
@@ -820,8 +868,8 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer,
id = *id_pos;
/* need a buffer large enough for all possible names */
bytes = sizeof(struct scoutfs_xattr) + SCOUTFS_XATTR_MAX_NAME_LEN;
xat = kmalloc(bytes, GFP_NOFS);
xat_bytes = first_item_bytes(SCOUTFS_XATTR_MAX_NAME_LEN, 0);
xat = kmalloc(xat_bytes, GFP_NOFS);
if (!xat) {
ret = -ENOMEM;
goto out;
@@ -834,8 +882,7 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer,
down_read(&si->xattr_rwsem);
for (;;) {
ret = get_next_xattr(inode, &key, xat, bytes,
NULL, 0, name_hash, id, lck);
ret = get_next_xattr(inode, &key, xat, xat_bytes, NULL, 0, name_hash, id, lck);
if (ret < 0) {
if (ret == -ENOENT)
ret = total;