read_xattr_totls ioctl uses weak item cache

Change the read_xattr_totls ioctl to use the weak item cache instead of
manually reading and merging the fs items for the xattr totals on every
call.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2023-08-16 14:56:06 -07:00
parent 45cd62974a
commit 4091e2cc55

View File

@@ -43,6 +43,7 @@
#include "server.h"
#include "counters.h"
#include "totl.h"
#include "wkic.h"
#include "scoutfs_trace.h"
/*
@@ -1036,104 +1037,32 @@ out:
return ret;
}
struct xattr_total_entry {
struct rb_node node;
struct scoutfs_ioctl_xattr_total xt;
struct scoutfs_totl_merging merg;
struct read_xattr_total_iter_cb_args {
struct scoutfs_ioctl_xattr_total *xt;
unsigned int copied;
unsigned int total;
};
static int cmp_xt_entry_name(const struct xattr_total_entry *a,
const struct xattr_total_entry *b)
{
return scoutfs_cmp_u64s(a->xt.name[0], b->xt.name[0]) ?:
scoutfs_cmp_u64s(a->xt.name[1], b->xt.name[1]) ?:
scoutfs_cmp_u64s(a->xt.name[2], b->xt.name[2]);
}
/*
* Record the contribution of the three classes of logged items we can
* see: the item in the fs_root, items from finalized log btrees, and
* items from active log btrees. Once we have the full set the caller
* can decide which of the items contribute to the total it sends to the
* user.
* This is called under an RCU read lock so it can't copy to userspace.
*/
static int read_xattr_total_item(struct super_block *sb, struct scoutfs_key *key,
u64 seq, u8 flags, void *val, int val_len, int fic, void *arg)
static int read_xattr_total_iter_cb(struct scoutfs_key *key, void *val, unsigned int val_len,
void *cb_arg)
{
struct xattr_total_entry *ent;
struct xattr_total_entry rd;
struct rb_root *root = arg;
struct rb_node *parent;
struct rb_node **node;
int cmp;
struct read_xattr_total_iter_cb_args *cba = cb_arg;
struct scoutfs_xattr_totl_val *tval = val;
struct scoutfs_ioctl_xattr_total *xt = &cba->xt[cba->copied];
rd.xt.name[0] = le64_to_cpu(key->skxt_a);
rd.xt.name[1] = le64_to_cpu(key->skxt_b);
rd.xt.name[2] = le64_to_cpu(key->skxt_c);
xt->name[0] = le64_to_cpu(key->skxt_a);
xt->name[1] = le64_to_cpu(key->skxt_b);
xt->name[2] = le64_to_cpu(key->skxt_c);
xt->total = le64_to_cpu(tval->total);
xt->count = le64_to_cpu(tval->count);
/* find entry matching name */
node = &root->rb_node;
parent = NULL;
cmp = -1;
while (*node) {
parent = *node;
ent = container_of(*node, struct xattr_total_entry, node);
/* sort merge items by key then newest to oldest */
cmp = cmp_xt_entry_name(&rd, ent);
if (cmp < 0)
node = &(*node)->rb_left;
else if (cmp > 0)
node = &(*node)->rb_right;
else
break;
}
/* allocate and insert new node if we need to */
if (cmp != 0) {
ent = kzalloc(sizeof(*ent), GFP_KERNEL);
if (!ent)
return -ENOMEM;
memcpy(&ent->xt.name, &rd.xt.name, sizeof(ent->xt.name));
scoutfs_totl_merge_init(&ent->merg);
rb_link_node(&ent->node, parent, node);
rb_insert_color(&ent->node, root);
}
scoutfs_totl_merge_contribute(&ent->merg, seq, flags, val, val_len, fic);
scoutfs_inc_counter(sb, totl_read_item);
return 0;
}
/* these are always _safe, node stores next */
#define for_each_xt_ent(ent, node, root) \
for (node = rb_first(root); \
node && (ent = rb_entry(node, struct xattr_total_entry, node), \
node = rb_next(node), 1); )
#define for_each_xt_ent_reverse(ent, node, root) \
for (node = rb_last(root); \
node && (ent = rb_entry(node, struct xattr_total_entry, node), \
node = rb_prev(node), 1); )
static void free_xt_ent(struct rb_root *root, struct xattr_total_entry *ent)
{
rb_erase(&ent->node, root);
kfree(ent);
}
static void free_all_xt_ents(struct rb_root *root)
{
struct xattr_total_entry *ent;
struct rb_node *node;
for_each_xt_ent(ent, node, root)
free_xt_ent(root, ent);
if (++cba->copied < cba->total)
return -EAGAIN;
else
return 0;
}
/*
@@ -1150,14 +1079,13 @@ static long scoutfs_ioc_read_xattr_totals(struct file *file, unsigned long arg)
struct scoutfs_ioctl_read_xattr_totals __user *urxt = (void __user *)arg;
struct scoutfs_ioctl_read_xattr_totals rxt;
struct scoutfs_ioctl_xattr_total __user *uxt;
struct xattr_total_entry *ent;
struct read_xattr_total_iter_cb_args cba = {NULL, };
struct scoutfs_key range_start;
struct scoutfs_key range_end;
struct scoutfs_key key;
struct scoutfs_key bloom_key;
struct scoutfs_key start;
struct scoutfs_key end;
struct rb_root root = RB_ROOT;
struct rb_node *node;
int count = 0;
unsigned int copied = 0;
unsigned int total;
unsigned int ready;
int ret;
if (!(file->f_mode & FMODE_READ)) {
@@ -1170,6 +1098,13 @@ static long scoutfs_ioc_read_xattr_totals(struct file *file, unsigned long arg)
goto out;
}
cba.xt = (void *)__get_free_page(GFP_KERNEL);
if (!cba.xt) {
ret = -ENOMEM;
goto out;
}
cba.total = PAGE_SIZE / sizeof(struct scoutfs_ioctl_xattr_total);
if (copy_from_user(&rxt, urxt, sizeof(rxt))) {
ret = -EFAULT;
goto out;
@@ -1182,81 +1117,40 @@ static long scoutfs_ioc_read_xattr_totals(struct file *file, unsigned long arg)
goto out;
}
scoutfs_key_set_zeros(&bloom_key);
bloom_key.sk_zone = SCOUTFS_XATTR_TOTL_ZONE;
scoutfs_xattr_init_totl_key(&start, rxt.pos_name);
total = div_u64(min_t(u64, rxt.totals_bytes, INT_MAX),
sizeof(struct scoutfs_ioctl_xattr_total));
while (rxt.totals_bytes >= sizeof(struct scoutfs_ioctl_xattr_total)) {
scoutfs_totl_set_range(&range_start, &range_end);
scoutfs_xattr_init_totl_key(&key, rxt.pos_name);
scoutfs_key_set_ones(&end);
end.sk_zone = SCOUTFS_XATTR_TOTL_ZONE;
if (scoutfs_key_compare(&start, &end) > 0)
while (copied < total) {
cba.copied = 0;
ret = scoutfs_wkic_iterate(sb, &key, &range_end, &range_start, &range_end,
read_xattr_total_iter_cb, &cba);
if (ret < 0)
goto out;
if (cba.copied == 0)
break;
key = start;
ret = scoutfs_forest_read_items(sb, &key, &bloom_key, &start, &end,
read_xattr_total_item, &root);
if (ret < 0) {
if (ret == -ESTALE) {
free_all_xt_ents(&root);
continue;
}
ready = min(total - copied, cba.copied);
if (copy_to_user(&uxt[copied], cba.xt, ready * sizeof(cba.xt[0]))) {
ret = -EFAULT;
goto out;
}
if (RB_EMPTY_ROOT(&root))
break;
/* trim totals that fall outside of the consistent range */
for_each_xt_ent(ent, node, &root) {
scoutfs_xattr_init_totl_key(&key, ent->xt.name);
if (scoutfs_key_compare(&key, &start) < 0) {
free_xt_ent(&root, ent);
} else {
break;
}
}
for_each_xt_ent_reverse(ent, node, &root) {
scoutfs_xattr_init_totl_key(&key, ent->xt.name);
if (scoutfs_key_compare(&key, &end) > 0) {
free_xt_ent(&root, ent);
} else {
break;
}
}
/* copy resulting unique non-zero totals to userspace */
for_each_xt_ent(ent, node, &root) {
if (rxt.totals_bytes < sizeof(ent->xt))
break;
scoutfs_totl_merge_resolve(&ent->merg, &ent->xt.total, &ent->xt.count);
if (ent->xt.total != 0 || ent->xt.count != 0) {
if (copy_to_user(uxt, &ent->xt, sizeof(ent->xt))) {
ret = -EFAULT;
goto out;
}
uxt++;
rxt.totals_bytes -= sizeof(ent->xt);
count++;
scoutfs_inc_counter(sb, totl_read_copied);
}
free_xt_ent(&root, ent);
}
/* continue after the last possible key read */
start = end;
scoutfs_key_inc(&start);
scoutfs_xattr_init_totl_key(&key, cba.xt[ready - 1].name);
scoutfs_key_inc(&key);
copied += ready;
}
ret = 0;
out:
free_all_xt_ents(&root);
if (cba.xt)
free_page((long)cba.xt);
return ret ?: count;
return ret ?: copied;
}
static long scoutfs_ioc_get_allocated_inos(struct file *file, unsigned long arg)