From 4e8a088cc53acdc22324b12b046176c92b252ce5 Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Mon, 21 Mar 2022 15:53:39 -0700 Subject: [PATCH] Don't use vmalloc in get/set xattr Extended attribute values can be larger than a reasonable maximum size for our btree items so we store xattrs in many items. The first pass at this code used vmalloc to make it relatively easy to work with a contiguous buffer that was cut up into multiple items. The problem, of course, is that vmalloc() is expensive. Well, the problem is that I always forget just how expensive it can be and use it when I shouldn't. We had loads on high cpu count machines that were catastrophically cpu bound on all the contentious work that vmalloc does to maintain a coherent global address space. This removes the use of vmalloc and only allocates a small buffer for the first compound item. The later items directly reference regions of value buffer rather than copying it to and from the large intermediate vmalloced buffer. Signed-off-by: Zach Brown --- kmod/src/xattr.c | 277 +++++++++++++++++++++++++++-------------------- 1 file changed, 162 insertions(+), 115 deletions(-) diff --git a/kmod/src/xattr.c b/kmod/src/xattr.c index 3de6ee7d..b7623f68 100644 --- a/kmod/src/xattr.c +++ b/kmod/src/xattr.c @@ -57,12 +57,6 @@ static u32 xattr_names_equal(const char *a_name, unsigned int a_len, return a_len == b_len && memcmp(a_name, b_name, a_len) == 0; } -static unsigned int xattr_full_bytes(struct scoutfs_xattr *xat) -{ - return offsetof(struct scoutfs_xattr, - name[xat->name_len + le16_to_cpu(xat->val_len)]); -} - static unsigned int xattr_nr_parts(struct scoutfs_xattr *xat) { return SCOUTFS_XATTR_NR_PARTS(xat->name_len, @@ -137,12 +131,29 @@ int scoutfs_xattr_parse_tags(const char *name, unsigned int name_len, } /* - * Find the next xattr and copy the key, xattr header, and as much of - * the name and value into the callers buffer as we can. Returns the - * number of bytes copied which include the header, name, and value and - * can be limited by the xattr length or the callers buffer. The caller - * is responsible for comparing their lengths, the header, and the - * returned length before safely using the xattr. + * xattrs are stored in multiple items. The first item is a + * concatenation of an initial header, the name, and then as much of the + * value as fits in the remainder of the first item. This return the + * size of the first item that'd store an xattr with the given name + * length and value payload size. + */ +static int first_item_bytes(int name_len, size_t size) +{ + if (WARN_ON_ONCE(name_len <= 0) || + WARN_ON_ONCE(name_len > SCOUTFS_XATTR_MAX_NAME_LEN)) + return 0; + + return min_t(int, sizeof(struct scoutfs_xattr) + name_len + size, + SCOUTFS_XATTR_MAX_PART_SIZE); +} + +/* + * Find the next xattr, set the caller's key, and copy as much of the + * first item into the callers buffer as we can. Returns the number of + * bytes copied which can include the header, name, and start of the + * value from the first item. The caller is responsible for comparing + * their lengths, the header, and the returned length before safely + * using the buffer. * * If a name is provided then we'll iterate over items with a matching * name_hash until we find a matching name. If we don't find a matching @@ -154,20 +165,17 @@ int scoutfs_xattr_parse_tags(const char *name, unsigned int name_len, * Returns -ENOENT if it didn't find a next item. */ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, - struct scoutfs_xattr *xat, unsigned int bytes, + struct scoutfs_xattr *xat, unsigned int xat_bytes, const char *name, unsigned int name_len, u64 name_hash, u64 id, struct scoutfs_lock *lock) { struct super_block *sb = inode->i_sb; struct scoutfs_key last; - u8 last_part; - int total; - u8 part; int ret; /* need to be able to see the name we're looking for */ - if (WARN_ON_ONCE(name_len > 0 && bytes < offsetof(struct scoutfs_xattr, - name[name_len]))) + if (WARN_ON_ONCE(name_len > 0 && + xat_bytes < offsetof(struct scoutfs_xattr, name[name_len]))) return -EINVAL; if (name_len) @@ -176,26 +184,15 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, init_xattr_key(key, scoutfs_ino(inode), name_hash, id); init_xattr_key(&last, scoutfs_ino(inode), U32_MAX, U64_MAX); - last_part = 0; - part = 0; - total = 0; - for (;;) { - key->skx_part = part; - ret = scoutfs_item_next(sb, key, &last, - (void *)xat + total, bytes - total, - lock); - if (ret < 0) { - /* XXX corruption, ran out of parts */ - if (ret == -ENOENT && part > 0) - ret = -EIO; + ret = scoutfs_item_next(sb, key, &last, xat, xat_bytes, lock); + if (ret < 0) break; - } trace_scoutfs_xattr_get_next_key(sb, key); /* XXX corruption */ - if (key->skx_part != part) { + if (key->skx_part != 0) { ret = -EIO; break; } @@ -205,8 +202,7 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, * the first part and if the next xattr name fits in our * buffer then the item must have included it. */ - if (part == 0 && - (ret < sizeof(struct scoutfs_xattr) || + if ((ret < sizeof(struct scoutfs_xattr) || (xat->name_len <= name_len && ret < offsetof(struct scoutfs_xattr, name[xat->name_len])) || @@ -216,7 +212,7 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, break; } - if (part == 0 && name_len) { + if (name_len > 0) { /* ran out of names that could match */ if (le64_to_cpu(key->skx_name_hash) != name_hash) { ret = -ENOENT; @@ -224,64 +220,126 @@ static int get_next_xattr(struct inode *inode, struct scoutfs_key *key, } /* keep looking for our name */ - if (!xattr_names_equal(name, name_len, - xat->name, xat->name_len)) { - part = 0; + if (!xattr_names_equal(name, name_len, xat->name, xat->name_len)) { le64_add_cpu(&key->skx_id, 1); continue; } - - /* use the matching name we found */ - last_part = xattr_nr_parts(xat) - 1; } - total += ret; - if (total == bytes || part == last_part) { - /* copied as much as we could */ - ret = total; - break; - } - part++; + /* found next name */ + break; } return ret; } +/* + * The caller has already read and verified the xattr's first item. + * Copy the value from the tail of the first item and from any future + * items into the destination buffer. + */ +static int copy_xattr_value(struct super_block *sb, struct scoutfs_key *xat_key, + struct scoutfs_xattr *xat, int xat_bytes, + char *buffer, size_t size, + struct scoutfs_lock *lock) +{ + struct scoutfs_key key; + size_t copied = 0; + int val_tail; + int bytes; + int ret; + int i; + + /* must have first item up to value */ + if (WARN_ON_ONCE(xat_bytes < sizeof(struct scoutfs_xattr)) || + WARN_ON_ONCE(xat_bytes < offsetof(struct scoutfs_xattr, name[xat->name_len]))) + return -EINVAL; + + /* only ever copy up to the full value */ + size = min_t(size_t, size, le16_to_cpu(xat->val_len)); + + /* must have full first item if caller needs value from second item */ + val_tail = SCOUTFS_XATTR_MAX_PART_SIZE - + offsetof(struct scoutfs_xattr, name[xat->name_len]); + if (WARN_ON_ONCE(size > val_tail && xat_bytes != SCOUTFS_XATTR_MAX_PART_SIZE)) + return -EINVAL; + + /* copy from tail of first item */ + bytes = min_t(unsigned int, size, val_tail); + if (bytes > 0) { + memcpy(buffer, &xat->name[xat->name_len], bytes); + copied += bytes; + } + + key = *xat_key; + for (i = 1; copied < size; i++) { + key.skx_part = i; + bytes = min_t(unsigned int, size - copied, SCOUTFS_XATTR_MAX_PART_SIZE); + + ret = scoutfs_item_lookup(sb, &key, buffer + copied, bytes, lock); + if (ret >= 0 && ret != bytes) + ret = -EIO; + if (ret < 0) + return ret; + + copied += ret; + } + + return copied; +} + +/* + * The caller is working with items that are either in the allocated + * first compound item or further items that are offsets into a value + * buffer. Give them a pointer and length of the start of the item. + */ +static void xattr_item_part_buffer(void **buf, int *len, int part, + struct scoutfs_xattr *xat, unsigned int xat_bytes, + const char *value, size_t size) +{ + int off; + + if (part == 0) { + *buf = xat; + *len = xat_bytes; + } else { + off = (part * SCOUTFS_XATTR_MAX_PART_SIZE) - + offsetof(struct scoutfs_xattr, name[xat->name_len]); + BUG_ON(off >= size); /* calls limited by number of parts */ + *buf = (void *)value + off; + *len = min_t(size_t, size - off, SCOUTFS_XATTR_MAX_PART_SIZE); + } +} + /* * Create all the items associated with the given xattr. If this * returns an error it will have already cleaned up any items it created * before seeing the error. */ -static int create_xattr_items(struct inode *inode, u64 id, - struct scoutfs_xattr *xat, unsigned int bytes, +static int create_xattr_items(struct inode *inode, u64 id, struct scoutfs_xattr *xat, + int xat_bytes, const char *value, size_t size, u8 new_parts, struct scoutfs_lock *lock) { struct super_block *sb = inode->i_sb; struct scoutfs_key key; - unsigned int part_bytes; - unsigned int total; - int ret; + int ret = 0; + void *buf; + int len; + int i; init_xattr_key(&key, scoutfs_ino(inode), xattr_name_hash(xat->name, xat->name_len), id); - total = 0; - ret = 0; - while (total < bytes) { - part_bytes = min_t(unsigned int, bytes - total, - SCOUTFS_XATTR_MAX_PART_SIZE); + for (i = 0; i < new_parts; i++) { + key.skx_part = i; + xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size); - ret = scoutfs_item_create(sb, &key, - (void *)xat + total, part_bytes, - lock); - if (ret) { + ret = scoutfs_item_create(sb, &key, buf, len, lock); + if (ret < 0) { while (key.skx_part-- > 0) scoutfs_item_delete(sb, &key, lock); break; } - - total += part_bytes; - key.skx_part++; } return ret; @@ -329,20 +387,20 @@ out: * deleted items. */ static int change_xattr_items(struct inode *inode, u64 id, - struct scoutfs_xattr *new_xat, - unsigned int new_bytes, u8 new_parts, - u8 old_parts, struct scoutfs_lock *lock) + struct scoutfs_xattr *xat, int xat_bytes, + const char *value, size_t size, + u8 new_parts, u8 old_parts, struct scoutfs_lock *lock) { struct super_block *sb = inode->i_sb; struct scoutfs_key key; int last_created = -1; - int bytes; - int off; + void *buf; + int len; int i; int ret; init_xattr_key(&key, scoutfs_ino(inode), - xattr_name_hash(new_xat->name, new_xat->name_len), id); + xattr_name_hash(xat->name, xat->name_len), id); /* dirty existing old items */ for (i = 0; i < old_parts; i++) { @@ -354,13 +412,10 @@ static int change_xattr_items(struct inode *inode, u64 id, /* create any new items past the old */ for (i = old_parts; i < new_parts; i++) { - off = i * SCOUTFS_XATTR_MAX_PART_SIZE; - bytes = min_t(unsigned int, new_bytes - off, - SCOUTFS_XATTR_MAX_PART_SIZE); - key.skx_part = i; - ret = scoutfs_item_create(sb, &key, (void *)new_xat + off, - bytes, lock); + xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size); + + ret = scoutfs_item_create(sb, &key, buf, len, lock); if (ret) goto out; @@ -369,13 +424,10 @@ static int change_xattr_items(struct inode *inode, u64 id, /* update dirtied overlapping existing items, last partial first */ for (i = min(old_parts, new_parts) - 1; i >= 0; i--) { - off = i * SCOUTFS_XATTR_MAX_PART_SIZE; - bytes = min_t(unsigned int, new_bytes - off, - SCOUTFS_XATTR_MAX_PART_SIZE); - key.skx_part = i; - ret = scoutfs_item_update(sb, &key, (void *)new_xat + off, - bytes, lock); + xattr_item_part_buffer(&buf, &len, i, xat, xat_bytes, value, size); + + ret = scoutfs_item_update(sb, &key, buf, len, lock); /* only last partial can fail, then we unwind created */ if (ret < 0) goto out; @@ -412,7 +464,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, struct scoutfs_xattr *xat = NULL; struct scoutfs_lock *lck = NULL; struct scoutfs_key key; - unsigned int bytes; + unsigned int xat_bytes; size_t name_len; int ret; @@ -423,9 +475,8 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, if (name_len > SCOUTFS_XATTR_MAX_NAME_LEN) return -ENODATA; - /* only need enough for caller's name and value sizes */ - bytes = sizeof(struct scoutfs_xattr) + name_len + size; - xat = __vmalloc(bytes, GFP_NOFS, PAGE_KERNEL); + xat_bytes = first_item_bytes(name_len, size); + xat = kmalloc(xat_bytes, GFP_NOFS); if (!xat) return -ENOMEM; @@ -435,8 +486,7 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, down_read(&si->xattr_rwsem); - ret = get_next_xattr(inode, &key, xat, bytes, - name, name_len, 0, 0, lck); + ret = get_next_xattr(inode, &key, xat, xat_bytes, name, name_len, 0, 0, lck); up_read(&si->xattr_rwsem); scoutfs_unlock(sb, lck, SCOUTFS_LOCK_READ); @@ -459,16 +509,9 @@ ssize_t scoutfs_getxattr(struct dentry *dentry, const char *name, void *buffer, goto out; } - /* XXX corruption, the items didn't match the header */ - if (ret < xattr_full_bytes(xat)) { - ret = -EIO; - goto out; - } - - ret = le16_to_cpu(xat->val_len); - memcpy(buffer, &xat->name[xat->name_len], ret); + ret = copy_xattr_value(sb, &key, xat, xat_bytes, buffer, size, lck); out: - vfree(xat); + kfree(xat); return ret; } @@ -596,7 +639,8 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, bool undo_totl = false; LIST_HEAD(ind_locks); u8 found_parts; - unsigned int bytes; + unsigned int xat_bytes_totl; + unsigned int xat_bytes; unsigned int val_len; u64 ind_seq; u64 total; @@ -629,9 +673,12 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, if (tgs.totl && ((ret = parse_totl_key(&totl_key, name, name_len)) != 0)) return ret; - bytes = sizeof(struct scoutfs_xattr) + name_len + size; - /* alloc enough to read old totl value */ - xat = __vmalloc(bytes + SCOUTFS_XATTR_MAX_TOTL_U64, GFP_NOFS, PAGE_KERNEL); + /* allocate enough to always read an existing xattr's totl */ + xat_bytes_totl = first_item_bytes(name_len, + max_t(size_t, size, SCOUTFS_XATTR_MAX_TOTL_U64)); + /* but store partial first item that only includes the new xattr's value */ + xat_bytes = first_item_bytes(name_len, size); + xat = kmalloc(xat_bytes_totl, GFP_NOFS); if (!xat) { ret = -ENOMEM; goto out; @@ -645,9 +692,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, down_write(&si->xattr_rwsem); /* find an existing xattr to delete, including possible totl value */ - ret = get_next_xattr(inode, &key, xat, - sizeof(struct scoutfs_xattr) + name_len + SCOUTFS_XATTR_MAX_TOTL_U64, - name, name_len, 0, 0, lck); + ret = get_next_xattr(inode, &key, xat, xat_bytes_totl, name, name_len, 0, 0, lck); if (ret < 0 && ret != -ENOENT) goto unlock; @@ -683,7 +728,7 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, le64_add_cpu(&tval.total, -total); } - /* prepare our xattr */ + /* prepare the xattr header, name, and start of value in first item */ if (value) { if (found_parts) id = le64_to_cpu(key.skx_id); @@ -693,7 +738,9 @@ static int scoutfs_xattr_set(struct dentry *dentry, const char *name, xat->val_len = cpu_to_le16(size); memset(xat->__pad, 0, sizeof(xat->__pad)); memcpy(xat->name, name, name_len); - memcpy(&xat->name[xat->name_len], value, size); + memcpy(&xat->name[name_len], value, + min(size, SCOUTFS_XATTR_MAX_PART_SIZE - + offsetof(struct scoutfs_xattr, name[name_len]))); if (tgs.totl) { ret = parse_totl_u64(value, size, &total); @@ -741,14 +788,15 @@ retry: } if (found_parts && value) - ret = change_xattr_items(inode, id, xat, bytes, + ret = change_xattr_items(inode, id, xat, xat_bytes, value, size, xattr_nr_parts(xat), found_parts, lck); else if (found_parts) ret = delete_xattr_items(inode, le64_to_cpu(key.skx_name_hash), le64_to_cpu(key.skx_id), found_parts, lck); else - ret = create_xattr_items(inode, id, xat, bytes, lck); + ret = create_xattr_items(inode, id, xat, xat_bytes, value, size, + xattr_nr_parts(xat), lck); if (ret < 0) goto release; @@ -778,7 +826,7 @@ unlock: scoutfs_unlock(sb, lck, SCOUTFS_LOCK_WRITE); scoutfs_unlock(sb, totl_lock, SCOUTFS_LOCK_WRITE_ONLY); out: - vfree(xat); + kfree(xat); return ret; } @@ -807,7 +855,7 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer, struct scoutfs_xattr *xat = NULL; struct scoutfs_lock *lck = NULL; struct scoutfs_key key; - unsigned int bytes; + unsigned int xat_bytes; ssize_t total = 0; u32 name_hash = 0; bool is_hidden; @@ -820,8 +868,8 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer, id = *id_pos; /* need a buffer large enough for all possible names */ - bytes = sizeof(struct scoutfs_xattr) + SCOUTFS_XATTR_MAX_NAME_LEN; - xat = kmalloc(bytes, GFP_NOFS); + xat_bytes = first_item_bytes(SCOUTFS_XATTR_MAX_NAME_LEN, 0); + xat = kmalloc(xat_bytes, GFP_NOFS); if (!xat) { ret = -ENOMEM; goto out; @@ -834,8 +882,7 @@ ssize_t scoutfs_list_xattrs(struct inode *inode, char *buffer, down_read(&si->xattr_rwsem); for (;;) { - ret = get_next_xattr(inode, &key, xat, bytes, - NULL, 0, name_hash, id, lck); + ret = get_next_xattr(inode, &key, xat, xat_bytes, NULL, 0, name_hash, id, lck); if (ret < 0) { if (ret == -ENOENT) ret = total;