Files
scoutfs/kmod/src/srch.c
Auke Kok 1d150da3f0 Use page->lru instead of page->list
With v3.14-rc1-10-g34bf6ef94a83, page->list is removed Instead,
use the union member ->lru.

Signed-off-by: Auke Kok <auke.kok@versity.com>
2023-10-09 15:35:40 -04:00

2242 lines
56 KiB
C

/*
* Copyright (C) 2020 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/crc32c.h>
#include <linux/random.h>
#include <linux/pagemap.h>
#include <linux/vmalloc.h>
#include <linux/sort.h>
#include "super.h"
#include "format.h"
#include "counters.h"
#include "block.h"
#include "alloc.h"
#include "srch.h"
#include "btree.h"
#include "spbm.h"
#include "client.h"
#include "counters.h"
#include "scoutfs_trace.h"
/*
* This srch subsystem gives us a way to find inodes that have a given
* tagged xattr set. It's designed for an xattr population that is
* orders of magnitudes larger than the file population, is updated much
* more frequently than it is searched, and can have slightly relaxed
* consistency requirements so that searches don't have to serialize
* with updates through locking.
*
* A srch entry is logged every time a .srch. xattr is created or
* deleted. Commits append entries to a growing srch log file along
* with the item btree and allocator block structures they're modifying.
*
* The server regularly rotates these growing log files so that they
* don't exceed a given size. Once there are enough log files they're
* all read and their sorted entries are written to a larger sorted
* file. Once there are enough sorted files they're all read and their
* combined sorted entries are written to a larger file, and so on.
*
* Searches combine all the entries read from unsorted log files and
* binary searches of larger sorted files to come up with the candidate
* inodes that probably contain the given named .srch. xattr.
*
* Searches read rotated log files and sorted files which have been
* committed. There is nothing protecting their blocks from being
* re-allocated and re-written. Search can restart by checking the
* btree for the current set of files. Compaction reads log files which
* are protected from other compactions by the persistent busy items
* created by the server. Compaction won't see it's blocks reused out
* from under it, but it can encounter stale cached blocks that need to
* be invalidated.
*/
struct srch_info {
struct super_block *sb;
atomic_t shutdown;
struct workqueue_struct *workq;
struct delayed_work compact_dwork;
};
#define DECLARE_SRCH_INFO(sb, name) \
struct srch_info *name = SCOUTFS_SB(sb)->srch_info
#define SRE_FMT "%016llx.%llu.%llu"
#define SRE_ARG(sre) \
le64_to_cpu((sre)->hash), le64_to_cpu((sre)->ino), \
le64_to_cpu((sre)->id)
/*
* Compactions dirty radix allocator blocks, file radix parent blocks,
* and especially srch file blocks. The files can get enormous and we
* can't have compactions OOM the box but they're meant to be large
* streaming operations, so we only stop and write out dirty blocks in
* large chunks.
*/
#define SRCH_COMPACT_DIRTY_LIMIT_BYTES (32 * 1024 * 1024)
static int sre_cmp(const struct scoutfs_srch_entry *a,
const struct scoutfs_srch_entry *b)
{
return scoutfs_cmp_u64s(le64_to_cpu(a->hash), le64_to_cpu(b->hash)) ?:
scoutfs_cmp_u64s(le64_to_cpu(a->ino), le64_to_cpu(b->ino)) ?:
scoutfs_cmp_u64s(le64_to_cpu(a->id), le64_to_cpu(b->id));
}
static void sre_inc(struct scoutfs_srch_entry *sre)
{
le64_add_cpu(&sre->id, 1);
if (sre->id != 0)
return;
le64_add_cpu(&sre->ino, 1);
if (sre->ino != 0)
return;
le64_add_cpu(&sre->hash, 1);
}
static void sre_dec(struct scoutfs_srch_entry *sre)
{
le64_add_cpu(&sre->id, -1);
if (sre->id != cpu_to_le64(U64_MAX))
return;
le64_add_cpu(&sre->ino, -1);
if (sre->ino != cpu_to_le64(U64_MAX))
return;
le64_add_cpu(&sre->hash, -1);
}
/*
* srch items are first grouped by type and we have log files, sorted
* files, and busy compactions.
*/
static void init_srch_key(struct scoutfs_key *key, int type,
u64 major, u64 minor)
{
*key = (struct scoutfs_key) {
.sk_zone = SCOUTFS_SRCH_ZONE,
.sk_type = type,
._sk_second = cpu_to_le64(major),
._sk_third = cpu_to_le64(minor),
};
}
/*
* The caller has ensured that there is space for a full word at the
* buf. Only the set low order bytes will be used. The clear high
* order bytes will be overwritten in the future and ignored in the
* final encoding in the block.
*/
static int encode_u64(__le64 *buf, u64 val)
{
int bytes;
val = (val << 1) ^ ((s64)val >> 63); /* shift sign extend */
bytes = (fls64(val) + 7) >> 3;
put_unaligned_le64(val, buf);
return bytes;
}
/* shifting by width is undefined :/ */
#define BYTE_MASK(b) ((1ULL << (b << 3)) - 1)
static u64 byte_masks[] = {
0, BYTE_MASK(1), BYTE_MASK(2), BYTE_MASK(3),
BYTE_MASK(4), BYTE_MASK(5), BYTE_MASK(6), BYTE_MASK(7), U64_MAX,
};
static u64 decode_u64(void *buf, int bytes)
{
u64 val = get_unaligned_le64(buf) & byte_masks[bytes];
return (val >> 1) ^ (-(val & 1));
}
/*
* Encode an entry at the offset in the block. Leave room for the
* lengths short, encode the diff of the encoded entry from the
* previous, then update the length short with the length of each
* encoded diff. The caller ensures that there's room for a full size
* entry at position in the block.
*/
static int encode_entry(void *buf, struct scoutfs_srch_entry *sre,
struct scoutfs_srch_entry *prev)
{
u64 diffs[] = {
le64_to_cpu(sre->hash) - le64_to_cpu(prev->hash),
le64_to_cpu(sre->ino) - le64_to_cpu(prev->ino),
le64_to_cpu(sre->id) - le64_to_cpu(prev->id),
};
u16 lengths = 0;
int bytes;
int tot = 2;
int i;
for (i = 0; i < ARRAY_SIZE(diffs); i++) {
bytes = encode_u64(buf + tot, diffs[i]);
lengths |= bytes << (i << 2);
tot += bytes;
}
put_unaligned_le16(lengths, buf);
return tot;
}
/*
* Decode an entry from the offset of the block. Load the length short
* and decode the bytes of diffs and apply them to the previous entry.
* The caller ensures that we won't read off the end of block if we were
* to try and decode a full size set of diffs.
*/
static int decode_entry(void *buf, struct scoutfs_srch_entry *sre,
struct scoutfs_srch_entry *prev)
{
u64 diffs[3];
u16 lengths;
int bytes;
int tot;
int i;
lengths = get_unaligned_le16(buf);
tot = 2;
for (i = 0; i < ARRAY_SIZE(diffs); i++) {
bytes = min_t(int, 8, lengths & 15);
diffs[i] = decode_u64(buf + tot, bytes);
tot += bytes;
lengths >>= 4;
}
sre->hash = cpu_to_le64(le64_to_cpu(prev->hash) + diffs[0]);
sre->ino = cpu_to_le64(le64_to_cpu(prev->ino) + diffs[1]);
sre->id = cpu_to_le64(le64_to_cpu(prev->id) + diffs[2]);
return tot;
}
/* return refs ind to traverse through parent at level to blk */
static int calc_ref_ind(u64 blk, int level)
{
int ind;
int i;
BUG_ON(level < 1);
for (i = 1; i <= level; i++)
blk = div_u64_rem(blk, SCOUTFS_SRCH_PARENT_REFS, &ind);
return ind;
}
static u8 height_for_blk(u64 blk)
{
u64 total = SCOUTFS_SRCH_PARENT_REFS;
int hei = 2;
if (blk == 0)
return 1;
while (blk >= total) {
hei++;
total *= SCOUTFS_SRCH_PARENT_REFS;
}
return hei;
}
static inline u32 srch_level_magic(int level)
{
return level ? SCOUTFS_BLOCK_MAGIC_SRCH_PARENT : SCOUTFS_BLOCK_MAGIC_SRCH_BLOCK;
}
/*
* This is operating on behalf of writers writing into private files and
* readers who could see stale blocks. We can find stale cached blocks
* and should retry the read ourselves after invalidating, but if we hit
* stale blocks on disk then we have to return to the caller who can
* decide to return errors or retry.
*/
static int read_srch_block(struct super_block *sb,
struct scoutfs_block_writer *wri, int level,
struct scoutfs_block_ref *ref,
struct scoutfs_block **bl_ret)
{
u32 magic = srch_level_magic(level);
int ret;
ret = scoutfs_block_read_ref(sb, ref, magic, bl_ret);
if (ret == -ESTALE)
scoutfs_inc_counter(sb, srch_read_stale);
return ret;
}
/*
* Give the caller a read-only reference to the block along the path to
* the logical block at the given level. This shouldn't be called on an
* empty root.
*/
static int read_path_block(struct super_block *sb,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
u64 blk, int at_level,
struct scoutfs_block **bl_ret)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
struct scoutfs_block_ref ref;
int level;
int ind;
int ret;
if (WARN_ON_ONCE(at_level < 0 || at_level >= sfl->height))
return -EINVAL;
level = sfl->height;
ref = sfl->ref;
while (level--) {
if (ref.blkno == 0) {
ret = -ENOENT;
break;
}
ret = read_srch_block(sb, wri, level, &ref, &bl);
if (ret < 0)
break;
if (level == at_level) {
ret = 0;
break;
}
srp = bl->data;
ind = calc_ref_ind(blk, level);
ref = srp->refs[ind];
scoutfs_block_put(sb, bl);
bl = NULL;
}
if (ret < 0)
scoutfs_block_put(sb, bl);
else
*bl_ret = bl;
return ret;
}
/*
* Walk radix blocks to find the logical file block and return the
* reference to the caller. Flags determine if we cow new dirty blocks,
* allocate new blocks, or return errors for missing blocks (files are
* never sparse, this won't happen).
*/
enum gfb_flags {
GFB_INSERT = (1 << 0),
GFB_DIRTY = (1 << 1),
};
static int get_file_block(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
int flags, u64 blk, struct scoutfs_block **bl_ret)
{
struct scoutfs_block *parent = NULL;
struct scoutfs_block_header *hdr;
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
struct scoutfs_block_ref new_root_ref;
struct scoutfs_block_ref *ref;
int level;
int ind;
int ret;
u8 hei;
/* see if we need to grow to insert a new largest blk */
hei = height_for_blk(blk);
while (sfl->height < hei) {
if (!(flags & GFB_INSERT)) {
ret = -ENOENT;
goto out;
}
memset(&new_root_ref, 0, sizeof(new_root_ref));
level = sfl->height;
ret = scoutfs_block_dirty_ref(sb, alloc, wri, &new_root_ref,
srch_level_magic(level), &bl, 0, NULL);
if (ret < 0)
goto out;
if (level) {
srp = bl->data;
srp->refs[0] = sfl->ref;
}
hdr = bl->data;
sfl->ref = new_root_ref;
sfl->height++;
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* walk file and parent block references to the leaf blocks */
level = sfl->height;
ref = &sfl->ref;
while (level--) {
/* searching an unused part of the tree */
if (!ref->blkno && !(flags & GFB_INSERT)) {
ret = -ENOENT;
goto out;
}
if (flags & GFB_DIRTY)
ret = scoutfs_block_dirty_ref(sb, alloc, wri, ref, srch_level_magic(level),
&bl, 0, NULL);
else
ret = scoutfs_block_read_ref(sb, ref, srch_level_magic(level), &bl);
if (ret < 0)
goto out;
if (level == 0) {
ret = 0;
break;
}
srp = bl->data;
ind = calc_ref_ind(blk, level);
ref = &srp->refs[ind];
scoutfs_block_put(sb, parent);
parent = bl;
bl = NULL;
}
ret = 0;
out:
scoutfs_block_put(sb, parent);
if (ret < 0) {
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* record that we successfully grew the file */
if (ret == 0 && (flags & GFB_INSERT) && blk >= le64_to_cpu(sfl->blocks))
sfl->blocks = cpu_to_le64(blk + 1);
*bl_ret = bl;
return ret;
}
int scoutfs_srch_add(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
struct scoutfs_block **bl_ret,
u64 hash, u64 ino, u64 id)
{
struct scoutfs_srch_block *srb;
struct scoutfs_block *bl = NULL;
u64 blk;
int ret;
struct scoutfs_srch_entry sre = {
.hash = cpu_to_le64(hash),
.ino = cpu_to_le64(ino),
.id = cpu_to_le64(id),
};
/* start with a new block or the last existing block */
if (le64_to_cpu(sfl->blocks) > 1)
blk = le64_to_cpu(sfl->blocks) - 1;
else
blk = 0;
bl = *bl_ret;
get_last_block:
if (bl == NULL) {
ret = get_file_block(sb, alloc, wri, sfl,
GFB_INSERT | GFB_DIRTY, blk, &bl);
if (ret < 0) {
/* writing into a private file, shouldn't happen */
WARN_ON_ONCE(ret == -ESTALE);
goto out;
}
}
srb = bl->data;
/* stop encoding once we might overflow the block */
if (le32_to_cpu(srb->entry_bytes) > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
scoutfs_block_put(sb, bl);
bl = NULL;
blk++;
goto get_last_block;
}
ret = encode_entry(srb->entries + le32_to_cpu(srb->entry_bytes),
&sre, &srb->tail);
if (ret > 0) {
if (srb->entry_bytes == 0) {
if (blk == 0) {
sfl->first = sre;
sfl->last = sre;
}
srb->first = sre;
srb->last = sre;
} else {
if (sre_cmp(&sre, &sfl->first) < 0)
sfl->first = sre;
else if (sre_cmp(&sre, &sfl->last) > 0)
sfl->last = sre;
if (sre_cmp(&sre, &srb->first) < 0)
srb->first = sre;
else if (sre_cmp(&sre, &srb->last) > 0)
srb->last = sre;
}
srb->tail = sre;
le32_add_cpu(&srb->entry_nr, 1);
le32_add_cpu(&srb->entry_bytes, ret);
le64_add_cpu(&sfl->entries, 1);
ret = 0;
scoutfs_inc_counter(sb, srch_add_entry);
}
out:
if (ret < 0) {
scoutfs_block_put(sb, bl);
bl = NULL;
}
*bl_ret = bl;
return ret;
}
/*
* The caller is dropping an ino/id because the tracking rbtree is full.
* This loses information so we can't return any entries at or after the
* one that we dropped. Update end to the entry before the dropped
* entry if it's less than the current end.
*/
static void set_end_before(struct scoutfs_srch_entry *end, u64 ino, u64 id)
{
struct scoutfs_srch_entry sre;
sre.hash = end->hash;
sre.ino = cpu_to_le64(ino);
sre.id = cpu_to_le64(id);
sre_dec(&sre);
if (sre_cmp(&sre, end) < 0)
*end = sre;
}
/*
* Track an inode and id of an xattr hash that we found while searching.
* We'll return inos from the nodes in order to userspace when we're
* done searching. The first time we see the entry we track it, the
* second time must be a deletion so we remove it.
*
* We count the number of tracked entries here. Once we hit the limit
* we drop entries which are greater than what's tracked. If we track
* new entries which are within the set then we drop the last entry.
* When we drop entries we have to trim the range of entries that we'll
* return because we've lost data. The caller will perform the search
* again from that point, giving them another window of tracked entries
* to fill from that entry.
*/
static int track_found(struct scoutfs_srch_rb_root *sroot, u64 ino, u64 id,
unsigned long limit, struct scoutfs_srch_entry *end)
{
struct rb_node **node = &sroot->root.rb_node;
struct rb_node *parent = NULL;
struct scoutfs_srch_rb_node *snode;
int cmp = 1; /* set last for first insertion */
while (*node) {
parent = *node;
snode = container_of(*node, struct scoutfs_srch_rb_node, node);
cmp = scoutfs_cmp(ino, snode->ino) ?:
scoutfs_cmp(id, snode->id);
if (cmp < 0) {
node = &(*node)->rb_left;
} else if (cmp > 0) {
node = &(*node)->rb_right;
} else {
/* update last if removed as a dupe */
if (sroot->last == &snode->node)
sroot->last = rb_prev(sroot->last);
rb_erase(&snode->node, &sroot->root);
kfree(snode);
sroot->nr--;
return 0;
}
}
/* can't track greater while we're at the limit */
if (sroot->nr >= limit && cmp > 0 && parent == sroot->last) {
set_end_before(end, ino, id);
return 0;
}
snode = kzalloc(sizeof(*snode), GFP_NOFS);
if (!snode)
return -ENOMEM;
rb_link_node(&snode->node, parent, node);
rb_insert_color(&snode->node, &sroot->root);
/* track a newly inserted last item */
if (cmp > 0 && parent == sroot->last)
sroot->last = &snode->node;
snode->ino = ino;
snode->id = id;
sroot->nr++;
/* remove and update last if we inserted earlier at limit */
if (sroot->nr > limit && sroot->last != &snode->node) {
snode = container_of(sroot->last, struct scoutfs_srch_rb_node,
node);
sroot->last = rb_prev(sroot->last);
set_end_before(end, snode->ino, snode->id);
rb_erase(&snode->node, &sroot->root);
kfree(snode);
sroot->nr--;
}
return 0;
}
/*
* Sweep all the unsorted entries of a log file looking for hash matches
* and tracking their xattr inos and ids. If the tracking sroot fills
* we update end but keep searching because we might find earlier
* entries.
*/
static int search_log_file(struct super_block *sb,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end,
unsigned long limit)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_entry sre;
struct scoutfs_srch_entry prev;
struct scoutfs_srch_block *srb;
int ret = 0;
u64 blk;
int pos;
int i;
for (blk = 0; blk < le64_to_cpu(sfl->blocks); blk++) {
scoutfs_block_put(sb, bl);
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
break;
srb = bl->data;
memset(&prev, 0, sizeof(prev));
pos = 0;
scoutfs_inc_counter(sb, srch_search_log_block);
for (i = 0; i < le32_to_cpu(srb->entry_nr); i++) {
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, &sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
pos += ret;
prev = sre;
if (sre_cmp(start, &sre) > 0 ||
sre_cmp(&sre, end) > 0)
continue;
ret = track_found(sroot, le64_to_cpu(sre.ino),
le64_to_cpu(sre.id), limit, end);
if (ret < 0)
break;
}
}
scoutfs_block_put(sb, bl);
return ret;
}
/*
* Search a sorted file for entries for inodes that could contain the
* xattr hash that we're looking for. The caller has checked that the
* start entry is contained in the file. We find the first block that
* could contain it and stream entries from there until we fill the
* rbtree or arrive at the end entry.
*/
static int search_sorted_file(struct super_block *sb,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end,
unsigned long limit)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry sre;
struct scoutfs_srch_entry prev;
struct scoutfs_block *bl = NULL;
int ret = 0;
int pos = 0;
s64 left;
s64 right;
u64 first;
u64 blk;
if (sfl->blocks == 0)
return 0;
/* binary search for first block in the range */
first = U64_MAX;
left = 0;
right = le64_to_cpu(sfl->blocks) - 1;
while (left <= right) {
blk = (left + right) >> 1;
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
if (sre_cmp(end, &srb->first) < 0) {
right = blk - 1;
} else if (sre_cmp(start, &srb->last) > 0) {
left = blk + 1;
} else {
first = min(blk, first);
right = blk - 1;
}
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* no blocks in range */
if (first == U64_MAX) {
ret = 0;
goto out;
}
blk = first;
/* stream entries until end or we're past the full tracking rb_root */
for (;;) {
if (bl == NULL) {
/* only check on each new input block */
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
memset(&prev, 0, sizeof(prev));
pos = 0;
scoutfs_inc_counter(sb, srch_search_sorted_block);
}
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, &sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
pos += ret;
prev = sre;
if (sre_cmp(start, &sre) > 0)
continue;
if (sre_cmp(&sre, end) > 0)
break;
ret = track_found(sroot, le64_to_cpu(sre.ino),
le64_to_cpu(sre.id), limit, end);
if (ret < 0)
goto out;
if (pos >= le32_to_cpu(srb->entry_bytes)) {
scoutfs_block_put(sb, bl);
bl = NULL;
if (++blk == le64_to_cpu(sfl->blocks))
break;
}
}
ret = 0;
out:
scoutfs_block_put(sb, bl);
return ret;
}
static int search_file(struct super_block *sb, int type,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end, unsigned long limit)
{
/* ignore files that don't have our hash */
if (sre_cmp(start, &sfl->last) > 0 ||
sre_cmp(end, &sfl->first) < 0)
return 0;
if (type == SCOUTFS_SRCH_LOG_TYPE) {
scoutfs_inc_counter(sb, srch_search_log);
return search_log_file(sb, sfl, sroot, start, end, limit);
} else {
scoutfs_inc_counter(sb, srch_search_sorted);
return search_sorted_file(sb, sfl, sroot, start, end, limit);
}
}
static void srch_init_rb_root(struct scoutfs_srch_rb_root *sroot)
{
sroot->root = RB_ROOT;
sroot->last = NULL;
sroot->nr = 0;
}
void scoutfs_srch_destroy_rb_root(struct scoutfs_srch_rb_root *sroot)
{
struct scoutfs_srch_rb_node *snode;
struct scoutfs_srch_rb_node *pos;
rbtree_postorder_for_each_entry_safe(snode, pos, &sroot->root, node)
kfree(snode);
srch_init_rb_root(sroot);
}
/*
* There are no constraints on the distribution of entries in log or
* sorted srch files. We limit the number of entries we track to avoid
* consuming absurd amounts of memory for very large searches. The
* larger the limit the more memory each search will take. The smaller
* this is the more searches will be necessary to find all the entries.
*/
#define SRCH_LIMIT 1000000
/*
* Search all the srch files for entries recording that inodes might
* have a given xattr.
*
* Advancing from an inode number that was returned is the only way the
* caller can make forward progress between searches. We might not find
* any inodes if we have the bad luck of pruning all the entries we
* tracked with deletions. We'll restart the search ourselves in this
* case to see if we can find an inode to return to the caller.
*/
int scoutfs_srch_search_xattrs(struct super_block *sb,
struct scoutfs_srch_rb_root *sroot,
u64 hash, u64 ino, u64 last_ino, bool *done)
{
struct scoutfs_net_roots roots;
struct scoutfs_srch_entry start;
struct scoutfs_srch_entry end;
struct scoutfs_srch_entry final;
struct scoutfs_log_trees lt;
struct scoutfs_srch_file sfl;
SCOUTFS_BTREE_ITEM_REF(iref);
DECLARE_SAVED_REFS(saved);
struct scoutfs_key key;
unsigned long limit = SRCH_LIMIT;
int ret;
scoutfs_inc_counter(sb, srch_search_xattrs);
*done = false;
srch_init_rb_root(sroot);
start.hash = cpu_to_le64(hash);
start.ino = cpu_to_le64(ino);
start.id = 0;
final.hash = cpu_to_le64(hash);
final.ino = cpu_to_le64(last_ino);
final.id = cpu_to_le64(U64_MAX);
retry:
scoutfs_srch_destroy_rb_root(sroot);
ret = scoutfs_client_get_roots(sb, &roots);
if (ret)
goto out;
end = final;
/* search intersecting sorted files, then logs */
init_srch_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE, 0, 0);
for (;;) {
ret = scoutfs_btree_next(sb, &roots.srch_root, &key, &iref);
if (ret == 0) {
if (iref.key->sk_type != key.sk_type) {
ret = -ENOENT;
} else if (iref.val_len == sizeof(sfl)) {
key = *iref.key;
scoutfs_key_inc(&key);
memcpy(&sfl, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) {
if (ret == -ENOENT) {
if (key.sk_type == SCOUTFS_SRCH_BLOCKS_TYPE) {
init_srch_key(&key,
SCOUTFS_SRCH_LOG_TYPE, 0, 0);
continue;
} else {
break;
}
}
goto out;
}
ret = search_file(sb, key.sk_type, &sfl, sroot,
&start, &end, limit);
if (ret < 0)
goto out;
}
/* search all the log files being written by mounts */
scoutfs_key_init_log_trees(&key, 0, 0);
for (;;) {
ret = scoutfs_btree_next(sb, &roots.logs_root, &key, &iref);
if (ret == -ENOENT)
break;
if (ret == 0) {
if (iref.val_len == sizeof(lt)) {
key = *iref.key;
scoutfs_key_inc(&key);
memcpy(&lt, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
ret = search_file(sb, SCOUTFS_SRCH_LOG_TYPE, &lt.srch_file,
sroot, &start, &end, limit);
if (ret < 0)
goto out;
}
/* keep searching if we didn't find any entries in the limit */
if (sroot->nr == 0 && sre_cmp(&end, &final) < 0) {
start = end;
sre_inc(&start);
scoutfs_inc_counter(sb, srch_search_retry_empty);
goto retry;
}
/* let the caller know our search was exhaustive */
*done = sre_cmp(&end, &final) == 0;
ret = 0;
out:
ret = scoutfs_block_check_stale(sb, ret, &saved, &roots.srch_root.ref,
&roots.logs_root.ref);
if (ret == -ESTALE)
goto retry;
return ret;
}
/*
* Running in the server, rotate the client's log file as they commit if
* it's large enough.
*/
int scoutfs_srch_rotate_log(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
struct scoutfs_srch_file *sfl, bool force)
{
struct scoutfs_key key;
int ret;
if (sfl->ref.blkno == 0 ||
(!force && le64_to_cpu(sfl->blocks) < SCOUTFS_SRCH_LOG_BLOCK_LIMIT))
return 0;
init_srch_key(&key, SCOUTFS_SRCH_LOG_TYPE,
le64_to_cpu(sfl->ref.blkno), 0);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sfl, sizeof(*sfl));
/*
* While it's fine to replay moving the client's logging srch
* file to the core btree item, server commits should keep it
* from happening. So we'll warn if we see it happen. This can
* be removed eventually.
*/
if (WARN_ON_ONCE(ret == -EEXIST))
ret = 0;
if (ret == 0) {
memset(sfl, 0, sizeof(*sfl));
scoutfs_inc_counter(sb, srch_rotate_log);
}
return ret;
}
/*
* Running in the server, get a compaction operation to send to the
* client. We first see if there are any pending operations to continue
* working on. If not, we see if any tier has enough files waiting for
* a compaction. We first search log files and then each greater size
* tier. We skip input files which are currently being read by busy
* compaction items.
*/
int scoutfs_srch_get_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
u64 rid, struct scoutfs_srch_compact *sc)
{
struct scoutfs_srch_file sfl;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_spbm busy;
struct scoutfs_key key;
int cur_order = -1;
int order;
int type;
int ret;
int err;
int i;
/*
* Search for pending or busy items. If we find a pending item
* we move it to busy and return it. We build up a bitmap of
* input files which are in busy items.
*/
scoutfs_spbm_init(&busy);
for (init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE, 0, 0); ;
scoutfs_key_inc(&key)) {
/* _PENDING_ and _BUSY_ are last, _next won't see other types */
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == -ENOENT)
break;
if (ret == 0) {
if (iref.val_len == sizeof(*sc)) {
key = *iref.key;
memcpy(sc, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
/* record all the busy input files */
if (key.sk_type == SCOUTFS_SRCH_BUSY_TYPE) {
for (i = 0; i < sc->nr; i++) {
ret = scoutfs_spbm_set(&busy,
le64_to_cpu(sc->in[i].sfl.ref.blkno));
if (ret < 0)
goto out;
}
continue;
}
/* or move the first pending to busy and return it */
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid,
le64_to_cpu(sc->id));
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sc, sizeof(*sc));
if (ret < 0)
goto out;
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(sc->id), 0);
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0) {
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid,
le64_to_cpu(sc->id));
err = scoutfs_btree_delete(sb, alloc, wri, root, &key);
BUG_ON(err); /* XXX both pending and busy :/ */
goto out;
}
/* found one */
ret = 0;
goto out;
}
/* no pending, look for sufficient files to start a new compaction */
memset(sc, 0, sizeof(struct scoutfs_srch_compact));
/* first look for unsorted log files */
type = SCOUTFS_SRCH_LOG_TYPE;
init_srch_key(&key, type, 0, 0);
for (;;scoutfs_key_inc(&key)) {
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == 0) {
if (iref.key->sk_type != type) {
ret = -ENOENT;
} else if (iref.val_len == sizeof(sfl)) {
key = *iref.key;
memcpy(&sfl, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) {
/* see if we ran out of log files or files entirely */
if (ret == -ENOENT) {
sc->nr = 0;
if (type == SCOUTFS_SRCH_LOG_TYPE) {
type = SCOUTFS_SRCH_BLOCKS_TYPE;
init_srch_key(&key, type, 0, 0);
continue;
} else {
ret = 0;
}
}
goto out;
}
/* skip any files already being compacted */
if (scoutfs_spbm_test(&busy, le64_to_cpu(sfl.ref.blkno)))
continue;
/* reset if we iterated into the next size category */
if (type == SCOUTFS_SRCH_BLOCKS_TYPE) {
order = fls64(le64_to_cpu(sfl.blocks)) /
SCOUTFS_SRCH_COMPACT_ORDER;
if (order != cur_order) {
cur_order = order;
sc->nr = 0;
}
}
sc->in[sc->nr++].sfl = sfl;
if (sc->nr == SCOUTFS_SRCH_COMPACT_NR)
break;
scoutfs_key_inc(&key);
}
if (type == SCOUTFS_SRCH_LOG_TYPE)
sc->flags = SCOUTFS_SRCH_COMPACT_FLAG_LOG;
else
sc->flags = SCOUTFS_SRCH_COMPACT_FLAG_SORTED;
/* record that our client has a compaction in process */
sc->id = sc->in[0].sfl.ref.blkno;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(sc->id));
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sc, sizeof(*sc));
out:
scoutfs_spbm_destroy(&busy);
if (ret < 0)
sc->nr = 0;
if (sc->nr < SCOUTFS_SRCH_COMPACT_NR)
memset(&sc->in[sc->nr], 0,
(SCOUTFS_SRCH_COMPACT_NR - sc->nr) * sizeof(sc->in[0]));
return ret;
}
/*
* get_ previously created a busy item to reserve the files for a compaction.
* The caller has finished the input struct and we can update the persistent
* copy.
*/
int scoutfs_srch_update_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_srch_compact *sc)
{
struct scoutfs_key key;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(sc->id));
return scoutfs_btree_update(sb, alloc, wri, root, &key,
sc, sizeof(struct scoutfs_srch_compact));
}
static void init_file_key(struct scoutfs_key *key, int type,
struct scoutfs_srch_file *sfl)
{
if (type == SCOUTFS_SRCH_LOG_TYPE)
init_srch_key(key, type, le64_to_cpu(sfl->ref.blkno), 0);
else
init_srch_key(key, type, le64_to_cpu(sfl->blocks),
le64_to_cpu(sfl->ref.blkno));
}
/*
* A compaction has completed so we remove the input file reference
* items and add the output file, if it has contents. If this returns
* an error then the file items were not changed.
*/
static int commit_files(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
struct scoutfs_srch_compact *sc)
{
struct scoutfs_srch_file *sfl;
struct scoutfs_key key;
int type;
int ret;
int err;
int i;
if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_LOG)
type = SCOUTFS_SRCH_LOG_TYPE;
else
type = SCOUTFS_SRCH_BLOCKS_TYPE;
if (sc->out.blocks != 0) {
sfl = &sc->out;
init_file_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE, sfl);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sfl, sizeof(*sfl));
if (ret < 0)
goto out;
}
for (i = 0; i < sc->nr; i++) {
sfl = &sc->in[i].sfl;
init_file_key(&key, type, sfl);
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0) {
while (--i >= 0) {
sfl = &sc->in[i].sfl;
init_file_key(&key, type, sfl);
err = scoutfs_btree_insert(sb, alloc, wri,
root, &key,
sfl, sizeof(*sfl));
BUG_ON(err); /* lost srch file */
}
if (sc->out.blocks != 0) {
sfl = &sc->out;
init_file_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE,
sfl);
err = scoutfs_btree_delete(sb, alloc, wri,
root, &key);
BUG_ON(err); /* duplicate srch files data */
}
goto out;
}
}
ret = 0;
out:
return ret;
}
/*
* Running in the server: commit the result of a compaction. Given the
* response id, find the compaction's busy item. The busy item is
* returned to a pending item or is advanced depending on the result.
* If the compaction completed then we replace the input files with the
* output files and transition the compaction to delete the input files.
* Once the input files are deleted we can remove the compaction item.
*/
int scoutfs_srch_commit_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_srch_compact *res,
struct scoutfs_alloc_list_head *av,
struct scoutfs_alloc_list_head *fr)
{
struct scoutfs_srch_compact *pending = NULL;
struct scoutfs_srch_compact *busy;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_key key;
int ret;
int err;
int i;
/* only free allocators when we finish deleting */
memset(av, 0, sizeof(struct scoutfs_alloc_list_head));
memset(fr, 0, sizeof(struct scoutfs_alloc_list_head));
busy = kzalloc(sizeof(struct scoutfs_srch_compact), GFP_NOFS);
if (busy == NULL) {
ret = -ENOMEM;
goto out;
}
/* find the record of our compaction */
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(res->id));
ret = scoutfs_btree_lookup(sb, root, &key, &iref);
if (ret == 0) {
if (iref.val_len == sizeof(struct scoutfs_srch_compact))
memcpy(busy, iref.val, iref.val_len);
else
ret = -EIO;
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) /* XXX leaks allocators */
goto out;
/* restore busy to pending if the operation failed */
if (res->flags & SCOUTFS_SRCH_COMPACT_FLAG_ERROR) {
pending = busy;
ret = 0;
goto update;
}
/* store result as pending if it isn't done */
if (!(res->flags & SCOUTFS_SRCH_COMPACT_FLAG_DONE)) {
pending = res;
ret = 0;
goto update;
}
/* update file references if we finished compaction (!deleting) */
if (!(res->flags & SCOUTFS_SRCH_COMPACT_FLAG_DELETE)) {
ret = commit_files(sb, alloc, wri, root, res);
if (ret < 0) {
/* XXX we can't commit, shutdown? */
goto out;
}
/* transition flags for deleting input files */
for (i = 0; i < res->nr; i++) {
res->in[i].blk = 0;
res->in[i].pos = 0;
}
res->flags &= ~(SCOUTFS_SRCH_COMPACT_FLAG_DONE |
SCOUTFS_SRCH_COMPACT_FLAG_LOG |
SCOUTFS_SRCH_COMPACT_FLAG_SORTED);
res->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DELETE;
pending = res;
ret = 0;
goto update;
}
/* ok, finished deleting, reclaim allocs and delete busy */
*av = res->meta_avail;
*fr = res->meta_freed;
pending = NULL;
ret = 0;
update:
if (pending) {
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(pending->id), 0);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
pending, sizeof(*pending));
if (ret < 0)
goto out;
}
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(res->id));
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0 && pending) {
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(pending->id), 0);
err = scoutfs_btree_delete(sb, alloc, wri, root, &key);
BUG_ON(err); /* both busy and pending present */
}
out:
WARN_ON_ONCE(ret < 0); /* XXX inconsistency */
kfree(busy);
return ret;
}
/*
* Remove a busy item for the given client and give the caller its
* allocators. Returns -ENOENT when there are no more items.
*/
int scoutfs_srch_cancel_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_alloc_list_head *av,
struct scoutfs_alloc_list_head *fr)
{
struct scoutfs_srch_compact *sc;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_key key;
struct scoutfs_key last;
int ret;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, 0);
init_srch_key(&last, SCOUTFS_SRCH_BUSY_TYPE, rid, U64_MAX);
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == 0) {
if (scoutfs_key_compare(iref.key, &last) > 0) {
ret = -ENOENT;
} else if (iref.val_len != sizeof(*sc)) {
ret = -EIO;
} else {
key = *iref.key;
sc = iref.val;
*av = sc->meta_avail;
*fr = sc->meta_freed;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
out:
return ret;
}
/*
* We should commit our progress when we have sufficient dirty blocks or
* don't have enough metadata alloc space for our caller's operations.
*/
static bool should_commit(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri, u32 nr)
{
return (scoutfs_block_writer_dirty_bytes(sb, wri) >=
SRCH_COMPACT_DIRTY_LIMIT_BYTES) ||
scoutfs_alloc_meta_low(sb, alloc, nr);
}
struct tourn_node {
struct scoutfs_srch_entry sre;
int ind;
};
static void tourn_update(struct tourn_node *tnodes, struct tourn_node *tn)
{
struct tourn_node *sib;
struct tourn_node *par;
size_t ind;
/* root is at [1] */
while (tn != &tnodes[1]) {
ind = tn - tnodes;
sib = &tnodes[ind ^ 1];
par = &tnodes[ind >> 1];
*par = sre_cmp(&tn->sre, &sib->sre) < 0 ? *tn : *sib;
tn = par;
}
}
/* return the entry at the current position, can return enoent if done */
typedef int (*kway_get_t)(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg);
/* only called after _get returns 0, advances to next entry for _get */
typedef void (*kway_advance_t)(struct super_block *sb, void *arg);
static int kway_merge(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
kway_get_t kway_get, kway_advance_t kway_adv,
void **args, int nr)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry last_tail;
struct scoutfs_block *bl = NULL;
struct tourn_node *tnodes;
struct tourn_node *leaves;
struct tourn_node *root;
struct tourn_node *tn;
int last_bytes = 0;
int nr_parents;
int nr_nodes;
int empty = 0;
int ret = 0;
int diff;
u64 blk;
int ind;
int i;
if (WARN_ON_ONCE(nr <= 0))
return -EINVAL;
/* always at least one parent for single leaf */
nr_parents = max_t(unsigned long, 1, roundup_pow_of_two(nr) - 1);
/* root at [1] for easy sib/parent index calc, final pad for odd sib */
nr_nodes = 1 + nr_parents + nr + 1;
tnodes = __vmalloc(nr_nodes * sizeof(struct tourn_node),
GFP_NOFS, PAGE_KERNEL);
if (!tnodes)
return -ENOMEM;
memset(tnodes, 0xff, nr_nodes * sizeof(struct tourn_node));
root = &tnodes[1];
leaves = &root[nr_parents];
/* initialize tournament leaves */
for (i = 0; i < nr; i++) {
tn = &leaves[i];
tn->ind = i;
ret = kway_get(sb, &tn->sre, args[i]);
if (ret == 0) {
tourn_update(tnodes, &leaves[i]);
} else if (ret == -ENOENT) {
empty++;
} else {
goto out;
}
}
/* always append new blocks */
blk = le64_to_cpu(sfl->blocks);
while (empty < nr) {
if (bl == NULL) {
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
/* could grow and dirty to a leaf */
if (should_commit(sb, alloc, wri, sfl->height + 1)) {
ret = 0;
goto out;
}
ret = get_file_block(sb, alloc, wri, sfl,
GFB_INSERT | GFB_DIRTY, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
scoutfs_inc_counter(sb, srch_compact_dirty_block);
}
if (sre_cmp(&root->sre, &srb->last) != 0) {
last_bytes = le32_to_cpu(srb->entry_bytes);
last_tail = srb->last;
ret = encode_entry(srb->entries +
le32_to_cpu(srb->entry_bytes),
&root->sre, &srb->tail);
if (WARN_ON_ONCE(ret <= 0)) {
/* shouldn't happen */
ret = -EIO;
goto out;
}
if (srb->entry_bytes == 0) {
if (blk == 0)
sfl->first = root->sre;
srb->first = root->sre;
}
le32_add_cpu(&srb->entry_nr, 1);
le32_add_cpu(&srb->entry_bytes, ret);
srb->last = root->sre;
srb->tail = root->sre;
sfl->last = root->sre;
le64_add_cpu(&sfl->entries, 1);
ret = 0;
if (le32_to_cpu(srb->entry_bytes) >
SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
scoutfs_block_put(sb, bl);
bl = NULL;
blk++;
}
scoutfs_inc_counter(sb, srch_compact_entry);
} else {
/*
* Duplicate entries indicate deletion so we
* undo the previously encoded entry and ignore
* this entry. This only happens within each
* block. Deletions can span block boundaries
* and will be filtered out by search and
* hopefully removed in future compactions.
*/
diff = le32_to_cpu(srb->entry_bytes) - last_bytes;
if (diff) {
memset(srb->entries + last_bytes, 0, diff);
if (srb->entry_bytes == 0) {
/* last_tail will be 0 */
if (blk == 0)
sfl->first = last_tail;
srb->first = last_tail;
}
le32_add_cpu(&srb->entry_nr, -1);
srb->entry_bytes = cpu_to_le32(last_bytes);
srb->last = last_tail;
srb->tail = last_tail;
sfl->last = last_tail;
le64_add_cpu(&sfl->entries, -1);
}
scoutfs_inc_counter(sb, srch_compact_removed_entry);
}
/* get the next */
ind = root->ind;
tn = &leaves[ind];
kway_adv(sb, args[ind]);
ret = kway_get(sb, &tn->sre, args[ind]);
if (ret == -ENOENT) {
/* this index is done */
memset(&tn->sre, 0xff, sizeof(tn->sre));
empty++;
ret = 0;
} else if (ret < 0) {
goto out;
}
/* update the tourney and carry on */
tourn_update(tnodes, tn);
#if 0
/* would be worth it if we have uneven key distribution */
if (ind < nr - 1) {
/* order doesn't matter, fill hole */
swap(args[ind], args[nr - 1]);
swap(tn->sre, leaves[nr - 1].sre);
}
/* drop a level of the tree when we shrink to a power of 2 */
if (nr > 0 && is_power_of_two(nr)) {
memcpy(leaves - nr, leaves, nr * sizeof(*tn));
leaves -= nr;
for (i = 0; i < nr; i += 2)
tourn_update(least, leaves[i]);
}
#endif
}
/* could stream a final index.. arguably a small portion of work */
out:
scoutfs_block_put(sb, bl);
vfree(tnodes);
return ret;
}
#define SRES_PER_PAGE (PAGE_SIZE / sizeof(struct scoutfs_srch_entry))
static struct scoutfs_srch_entry *page_priv_sre(struct page *page)
{
return (struct scoutfs_srch_entry *)page_address(page) + page->private;
}
static int kway_get_page(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg)
{
struct page *page = arg;
struct scoutfs_srch_entry *sre = page_priv_sre(page);
if (page->private >= SRES_PER_PAGE || sre->ino == 0)
return -ENOENT;
*sre_ret = *sre;
return 0;
}
static void kway_adv_page(struct super_block *sb, void *arg)
{
struct page *page = arg;
page->private++;
}
static int cmp_page_sre(const void *A, const void *B)
{
const struct scoutfs_srch_entry *a = A;
const struct scoutfs_srch_entry *b = B;
return sre_cmp(a, b);
}
static void swap_page_sre(void *A, void *B, int size)
{
struct scoutfs_srch_entry *a = A;
struct scoutfs_srch_entry *b = B;
swap(*a, *b);
}
/*
* Compact a set of log files by sorting all their entries and writing
* them to a sorted output file. We decode all the file's entries into
* pages, sort the contents of each page, and then stream a k-way merge
* of the entries in the pages into an output file. While not sorted,
* the input log files entries are encoded so we can allocate quite a
* bit more memory in pages than the files took in blocks on disk (~2x
* typically, ~10x worst case).
*
* Because we read and sort all the input files we must perform the full
* compaction in one operation. The server must have given us a
* sufficiently large avail/freed lists, otherwise we'll return ENOSPC.
*/
static int compact_logs(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry *sre;
struct scoutfs_srch_entry prev;
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_file *sfl;
struct page *page = NULL;
struct page *tmp;
void **args = NULL;
int nr_pages = 0;
LIST_HEAD(pages);
int sfl_ind;
u64 blk = 0;
int pos = 0;
int ret;
int i;
if (sc->nr <= 1) {
ret = -EINVAL;
goto out;
}
memset(&prev, 0, sizeof(prev));
/* decode all the log file's block's entries into pages */
for (sfl_ind = 0, sfl = &sc->in[0].sfl; sfl_ind < sc->nr; ) {
if (bl == NULL) {
/* only check on each new input block */
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
}
if (page == NULL) {
page = alloc_page(GFP_NOFS);
if (!page) {
ret = -ENOMEM;
goto out;
}
page->private = 0;
list_add_tail(&page->lru, &pages);
nr_pages++;
scoutfs_inc_counter(sb, srch_compact_log_page);
}
sre = page_priv_sre(page);
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
goto out;
}
prev = *sre;
pos += ret;
if (pos >= le32_to_cpu(srb->entry_bytes)) {
scoutfs_block_put(sb, bl);
bl = NULL;
memset(&prev, 0, sizeof(prev));
pos = 0;
if (++blk == le64_to_cpu(sfl->blocks)) {
blk = 0;
sfl_ind++;
sfl = &sc->in[sfl_ind].sfl;
}
}
if (++page->private == SRES_PER_PAGE)
page = NULL;
}
/* add a terminal entry to the last partial page */
if (page) {
sre = page_priv_sre(page);
sre->ino = 0;
}
/* allocate args array for k-way merge */
args = vmalloc(nr_pages * sizeof(struct page *));
if (!args) {
ret = -ENOMEM;
goto out;
}
/* sort page entries and reset private for _next */
i = 0;
list_for_each_entry(page, &pages, lru) {
args[i++] = page;
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
sort(page_address(page), page->private,
sizeof(struct scoutfs_srch_entry), cmp_page_sre,
swap_page_sre);
page->private = 0;
}
ret = kway_merge(sb, alloc, wri, &sc->out, kway_get_page, kway_adv_page,
args, nr_pages);
if (ret < 0)
goto out;
/* make sure we finished all the pages */
list_for_each_entry(page, &pages, lru) {
sre = page_priv_sre(page);
if (page->private < SRES_PER_PAGE && sre->ino != 0) {
ret = -ENOSPC;
goto out;
}
}
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
ret = 0;
out:
scoutfs_block_put(sb, bl);
vfree(args);
list_for_each_entry_safe(page, tmp, &pages, lru) {
list_del(&page->lru);
__free_page(page);
}
return ret;
}
struct kway_file_reader {
struct scoutfs_srch_file *sfl;
struct scoutfs_block *bl;
struct scoutfs_srch_entry prev;
struct scoutfs_srch_entry decoded_sre;
u64 blk;
u32 skip;
u32 pos;
int decoded_bytes;
};
static int kway_get_reader(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg)
{
struct kway_file_reader *rdr = arg;
struct scoutfs_srch_block *srb;
int ret;
if (rdr->blk == le64_to_cpu(rdr->sfl->blocks))
return -ENOENT;
if (rdr->bl == NULL) {
ret = get_file_block(sb, NULL, NULL, rdr->sfl, 0, rdr->blk,
&rdr->bl);
if (ret < 0)
return ret;
memset(&rdr->prev, 0, sizeof(rdr->prev));
}
srb = rdr->bl->data;
if (rdr->pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES ||
rdr->skip >= SCOUTFS_SRCH_BLOCK_SAFE_BYTES ||
rdr->skip >= le32_to_cpu(srb->entry_bytes)) {
/* XXX inconsistency */
return -EIO;
}
/* decode entry, possibly skipping start of the block */
while (rdr->decoded_bytes == 0 || rdr->pos < rdr->skip) {
ret = decode_entry(srb->entries + rdr->pos,
&rdr->decoded_sre, &rdr->prev);
if (ret <= 0) {
/* XXX inconsistency */
return -EIO;
}
rdr->decoded_bytes = ret;
if (rdr->pos < rdr->skip) {
rdr->prev = rdr->decoded_sre;
rdr->pos += ret;
if (rdr->pos >= rdr->skip)
rdr->skip = 0;
rdr->decoded_bytes = 0;
}
}
*sre_ret = rdr->decoded_sre;
return 0;
}
static void kway_adv_reader(struct super_block *sb, void *arg)
{
struct kway_file_reader *rdr = arg;
struct scoutfs_srch_block *srb;
/* _get must have set */
BUG_ON(rdr->bl == NULL);
BUG_ON(rdr->decoded_bytes == 0);
rdr->prev = rdr->decoded_sre;
rdr->pos += rdr->decoded_bytes;
rdr->decoded_bytes = 0;
srb = rdr->bl->data;
if (rdr->pos >= le32_to_cpu(srb->entry_bytes)) {
rdr->pos = 0;
scoutfs_block_put(sb, rdr->bl);
rdr->bl = NULL;
rdr->blk++;
}
}
/*
* Compact a set of sorted files by performing a k-way merge of the files
* into an output sorted file. The k-way merge works with an iterator
* which reads blocks and decodes entries.
*/
static int compact_sorted(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
struct kway_file_reader *rdrs = NULL;
void **args = NULL;
int ret;
int nr;
int i;
if (WARN_ON_ONCE(sc->nr <= 1))
return -EINVAL;
nr = sc->nr;
/* allocate args array for k-way merge */
rdrs = kmalloc_array(nr, sizeof(rdrs[0]), __GFP_ZERO | GFP_NOFS);
args = kmalloc_array(nr, sizeof(args[0]), GFP_NOFS);
if (!rdrs || !args) {
ret = -ENOMEM;
goto out;
}
for (i = 0; i < nr; i++) {
if (le64_to_cpu(sc->in[i].blk) >
le64_to_cpu(sc->in[i].sfl.blocks)) {
ret = -EINVAL;
goto out;
}
rdrs[i].sfl = &sc->in[i].sfl;
rdrs[i].blk = le64_to_cpu(sc->in[i].blk);
rdrs[i].skip = le64_to_cpu(sc->in[i].pos);
args[i] = &rdrs[i];
}
ret = kway_merge(sb, alloc, wri, &sc->out, kway_get_reader,
kway_adv_reader, args, nr);
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
for (i = 0; i < nr; i++) {
sc->in[i].blk = cpu_to_le64(rdrs[i].blk);
sc->in[i].pos = cpu_to_le64(rdrs[i].pos);
if (rdrs[i].blk < le64_to_cpu(sc->in[i].sfl.blocks))
sc->flags &= ~SCOUTFS_SRCH_COMPACT_FLAG_DONE;
}
out:
for (i = 0; rdrs && i < nr; i++)
scoutfs_block_put(sb, rdrs[i].bl);
kfree(rdrs);
kfree(args);
return ret;
}
/*
* Delete a file that has been compacted and is no longer referenced by
* items in the srch_root. The server protects the input file from
* other compactions while we're working, but other readers could be
* still trying to read it while searching.
*
* We don't modify the blocks to avoid the cost of allocating and
* freeing dirty parent metadata blocks, and we want to avoid triggering
* stale reads in racing readers. We free blocks from leaf parents
* upwards and from left to right. Once we've freed a block we never
* visit it again. We store our walk position in each file's compact
* input so that it can be stored in pending items as progress is made
* over multiple operations.
*/
static int delete_file(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact_input *in)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
u64 blkno;
u64 blk;
u64 inc;
int level;
int ret;
int i;
blk = le64_to_cpu(in->blk);
level = max(le64_to_cpu(in->pos), 1ULL);
if (level > in->sfl.height) {
ret = 0;
goto out;
}
for (; level < in->sfl.height; level++) {
for (inc = 1, i = 2; i <= level; i++)
inc *= SCOUTFS_SRCH_PARENT_REFS;
while (blk < le64_to_cpu(in->sfl.blocks)) {
ret = read_path_block(sb, wri, &in->sfl, blk, level,
&bl);
if (ret < 0)
goto out;
srp = bl->data;
for (i = calc_ref_ind(blk, level);
i < SCOUTFS_SRCH_PARENT_REFS &&
blk < le64_to_cpu(in->sfl.blocks);
i++, blk += inc) {
blkno = le64_to_cpu(srp->refs[i].blkno);
if (!blkno)
continue;
/* free below, then final root block */
if (should_commit(sb, alloc, wri, 2)) {
ret = 0;
goto out;
}
ret = scoutfs_free_meta(sb, alloc, wri, blkno);
if (ret < 0)
goto out;
}
scoutfs_block_put(sb, bl);
bl = NULL;
}
blk = 0;
}
if (level == in->sfl.height) {
ret = scoutfs_free_meta(sb, alloc, wri,
le64_to_cpu(in->sfl.ref.blkno));
if (ret < 0)
goto out;
level++;
}
ret = 0;
out:
in->blk = cpu_to_le64(blk);
in->pos = cpu_to_le64(level);
scoutfs_block_put(sb, bl);
return ret;
}
static int delete_files(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
int ret = 0;
int i;
for (i = 0; i < sc->nr; i++) {
ret = delete_file(sb, alloc, wri, &sc->in[i]);
if (ret < 0 ||
(le64_to_cpu(sc->in[i].pos) <= sc->in[i].sfl.height))
break;
}
if (i == sc->nr)
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
return ret;
}
/* wait 10s between compact attempts on error, immediate after success */
#define SRCH_COMPACT_DELAY_MS (10 * MSEC_PER_SEC)
/*
* Get a compaction operation from the server, sort the entries from the
* input files as they're read, and stream the remaining sorted entries
* into a newly written output file. The server is protecting the input
* files from other compactions, they will be stable. The server gives
* us a populated allocator that should be enough to write a new file
* and delete the old file blocks. We'll regularly write out dirty
* blocks as we hit a dirty limit threshold so there will be some cow
* overhead of repeatedly dirtying, say, parent allocator and file radix
* blocks. We don't reclaim freed blocks in the allocator after each
* write so the initial allocator pool has to account for that cow
* overhead.
*
* All of our modifications are written into free blocks from the
* filesystem's perspective. If anything goes wrong we return an error
* and the server will ignore all our work and reclaim the initial
* allocator they gave us.
*/
static void scoutfs_srch_compact_worker(struct work_struct *work)
{
struct srch_info *srinf = container_of(work, struct srch_info,
compact_dwork.work);
struct scoutfs_srch_compact *sc = NULL;
struct super_block *sb = srinf->sb;
struct scoutfs_block_writer wri;
struct scoutfs_alloc alloc;
unsigned long delay;
int ret;
int err;
sc = kmalloc(sizeof(struct scoutfs_srch_compact), GFP_NOFS);
if (sc == NULL) {
ret = -ENOMEM;
goto out;
}
scoutfs_block_writer_init(sb, &wri);
ret = scoutfs_client_srch_get_compact(sb, sc);
if (ret < 0 || sc->nr == 0)
goto out;
scoutfs_alloc_init(&alloc, &sc->meta_avail, &sc->meta_freed);
if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_LOG) {
ret = compact_logs(sb, &alloc, &wri, sc);
} else if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_SORTED) {
ret = compact_sorted(sb, &alloc, &wri, sc);
} else if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_DELETE) {
ret = delete_files(sb, &alloc, &wri, sc);
} else {
ret = -EINVAL;
}
if (ret < 0)
goto commit;
ret = scoutfs_alloc_prepare_commit(sb, &alloc, &wri) ?:
scoutfs_block_writer_write(sb, &wri);
commit:
/* the server won't use our partial compact if _ERROR is set */
sc->meta_avail = alloc.avail;
sc->meta_freed = alloc.freed;
sc->flags |= ret < 0 ? SCOUTFS_SRCH_COMPACT_FLAG_ERROR : 0;
err = scoutfs_client_srch_commit_compact(sb, sc);
if (err < 0 && ret == 0)
ret = err;
out:
/* our allocators and files should be stable */
WARN_ON_ONCE(ret == -ESTALE);
if (ret < 0)
scoutfs_inc_counter(sb, srch_compact_error);
scoutfs_block_writer_forget_all(sb, &wri);
if (!atomic_read(&srinf->shutdown)) {
delay = ret == 0 ? 0 : msecs_to_jiffies(SRCH_COMPACT_DELAY_MS);
queue_delayed_work(srinf->workq, &srinf->compact_dwork, delay);
}
kfree(sc);
}
void scoutfs_srch_destroy(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_SRCH_INFO(sb, srinf);
if (!srinf)
return;
if (srinf->workq) {
/* pending grace work queues normal work */
atomic_set(&srinf->shutdown, 1);
cancel_delayed_work_sync(&srinf->compact_dwork);
flush_workqueue(srinf->workq);
destroy_workqueue(srinf->workq);
}
kfree(srinf);
sbi->srch_info = NULL;
}
int scoutfs_srch_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct srch_info *srinf;
int ret;
srinf = kzalloc(sizeof(struct srch_info), GFP_KERNEL);
if (!srinf)
return -ENOMEM;
srinf->sb = sb;
atomic_set(&srinf->shutdown, 0);
INIT_DELAYED_WORK(&srinf->compact_dwork, scoutfs_srch_compact_worker);
sbi->srch_info = srinf;
srinf->workq = alloc_workqueue("scoutfs_srch_compact",
WQ_NON_REENTRANT | WQ_UNBOUND |
WQ_HIGHPRI, 0);
if (!srinf->workq) {
ret = -ENOMEM;
goto out;
}
queue_delayed_work(srinf->workq, &srinf->compact_dwork,
msecs_to_jiffies(SRCH_COMPACT_DELAY_MS));
ret = 0;
out:
if (ret)
scoutfs_srch_destroy(sb);
return ret;
}