Files
scoutfs/kmod/src/srch.c
Zach Brown c7e97eeb1f Allow srch compaction from _SAFE_BYTES
Compacting sorted srch files can take multiple transactions because they
can be very large.  Each transaction resumes at a byte offset in a block
where the previous transaction stopped.

The resuming code tests that the byte offsets are sane but had a mistake
in testing the offset to skip to.  It returned an error if the
compaction resumed from the last possible safe offset for decoding
entries.

If a system is unlucky enough to have a compaction transaction stop at
just this offset then compaction stops making forward progress as each
attempt to resume returns an error.

The fix allows continuation from this last safe offset while returning
errors for attempts to continue *past* that offset.  This matches all
the encoding code which allows encoding the last entry in the block at
this offset.

Signed-off-by: Zach Brown <zab@versity.com>
2023-11-07 12:34:00 -08:00

2352 lines
60 KiB
C

/*
* Copyright (C) 2020 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/crc32c.h>
#include <linux/random.h>
#include <linux/pagemap.h>
#include <linux/vmalloc.h>
#include <linux/sort.h>
#include "super.h"
#include "format.h"
#include "counters.h"
#include "block.h"
#include "alloc.h"
#include "srch.h"
#include "btree.h"
#include "spbm.h"
#include "client.h"
#include "counters.h"
#include "scoutfs_trace.h"
#include "triggers.h"
/*
* This srch subsystem gives us a way to find inodes that have a given
* tagged xattr set. It's designed for an xattr population that is
* orders of magnitudes larger than the file population, is updated much
* more frequently than it is searched, and can have slightly relaxed
* consistency requirements so that searches don't have to serialize
* with updates through locking.
*
* A srch entry is logged every time a .srch. xattr is created or
* deleted. Commits append entries to a growing srch log file along
* with the item btree and allocator block structures they're modifying.
*
* The server regularly rotates these growing log files so that they
* don't exceed a given size. Once there are enough log files they're
* all read and their sorted entries are written to a larger sorted
* file. Once there are enough sorted files they're all read and their
* combined sorted entries are written to a larger file, and so on.
*
* Searches combine all the entries read from unsorted log files and
* binary searches of larger sorted files to come up with the candidate
* inodes that probably contain the given named .srch. xattr.
*
* Searches read rotated log files and sorted files which have been
* committed. There is nothing protecting their blocks from being
* re-allocated and re-written. Search can restart by checking the
* btree for the current set of files. Compaction reads log files which
* are protected from other compactions by the persistent busy items
* created by the server. Compaction won't see it's blocks reused out
* from under it, but it can encounter stale cached blocks that need to
* be invalidated.
*/
struct srch_info {
struct super_block *sb;
atomic_t shutdown;
struct workqueue_struct *workq;
struct delayed_work compact_dwork;
};
#define DECLARE_SRCH_INFO(sb, name) \
struct srch_info *name = SCOUTFS_SB(sb)->srch_info
#define SRE_FMT "%016llx.%llu.%llu"
#define SRE_ARG(sre) \
le64_to_cpu((sre)->hash), le64_to_cpu((sre)->ino), \
le64_to_cpu((sre)->id)
/*
* Compactions dirty radix allocator blocks, file radix parent blocks,
* and especially srch file blocks. The files can get enormous and we
* can't have compactions OOM the box but they're meant to be large
* streaming operations, so we only stop and write out dirty blocks in
* large chunks.
*/
#define SRCH_COMPACT_DIRTY_LIMIT_BYTES (32 * 1024 * 1024)
static int sre_cmp(const struct scoutfs_srch_entry *a,
const struct scoutfs_srch_entry *b)
{
return scoutfs_cmp_u64s(le64_to_cpu(a->hash), le64_to_cpu(b->hash)) ?:
scoutfs_cmp_u64s(le64_to_cpu(a->ino), le64_to_cpu(b->ino)) ?:
scoutfs_cmp_u64s(le64_to_cpu(a->id), le64_to_cpu(b->id));
}
static void sre_inc(struct scoutfs_srch_entry *sre)
{
le64_add_cpu(&sre->id, 1);
if (sre->id != 0)
return;
le64_add_cpu(&sre->ino, 1);
if (sre->ino != 0)
return;
le64_add_cpu(&sre->hash, 1);
}
static void sre_dec(struct scoutfs_srch_entry *sre)
{
le64_add_cpu(&sre->id, -1);
if (sre->id != cpu_to_le64(U64_MAX))
return;
le64_add_cpu(&sre->ino, -1);
if (sre->ino != cpu_to_le64(U64_MAX))
return;
le64_add_cpu(&sre->hash, -1);
}
/*
* srch items are first grouped by type and we have log files, sorted
* files, and busy compactions.
*/
static void init_srch_key(struct scoutfs_key *key, int type,
u64 major, u64 minor)
{
*key = (struct scoutfs_key) {
.sk_zone = SCOUTFS_SRCH_ZONE,
.sk_type = type,
._sk_second = cpu_to_le64(major),
._sk_third = cpu_to_le64(minor),
};
}
/*
* The caller has ensured that there is space for a full word at the
* buf. Only the set low order bytes will be used. The clear high
* order bytes will be overwritten in the future and ignored in the
* final encoding in the block.
*/
static int encode_u64(__le64 *buf, u64 val)
{
int bytes;
val = (val << 1) ^ ((s64)val >> 63); /* shift sign extend */
bytes = (fls64(val) + 7) >> 3;
put_unaligned_le64(val, buf);
return bytes;
}
/* shifting by width is undefined :/ */
#define BYTE_MASK(b) ((1ULL << (b << 3)) - 1)
static u64 byte_masks[] = {
0, BYTE_MASK(1), BYTE_MASK(2), BYTE_MASK(3),
BYTE_MASK(4), BYTE_MASK(5), BYTE_MASK(6), BYTE_MASK(7), U64_MAX,
};
static u64 decode_u64(void *buf, int bytes)
{
u64 val = get_unaligned_le64(buf) & byte_masks[bytes];
return (val >> 1) ^ (-(val & 1));
}
/*
* Encode an entry at the offset in the block. Leave room for the
* lengths short, encode the diff of the encoded entry from the
* previous, then update the length short with the length of each
* encoded diff. The caller ensures that there's room for a full size
* entry at position in the block.
*/
static int encode_entry(void *buf, struct scoutfs_srch_entry *sre,
struct scoutfs_srch_entry *prev)
{
u64 diffs[] = {
le64_to_cpu(sre->hash) - le64_to_cpu(prev->hash),
le64_to_cpu(sre->ino) - le64_to_cpu(prev->ino),
le64_to_cpu(sre->id) - le64_to_cpu(prev->id),
};
u16 lengths = 0;
int bytes;
int tot = 2;
int i;
for (i = 0; i < ARRAY_SIZE(diffs); i++) {
bytes = encode_u64(buf + tot, diffs[i]);
lengths |= bytes << (i << 2);
tot += bytes;
}
put_unaligned_le16(lengths, buf);
return tot;
}
/*
* Decode an entry from the offset of the block. Load the length short
* and decode the bytes of diffs and apply them to the previous entry.
* The caller ensures that we won't read off the end of block if we were
* to try and decode a full size set of diffs.
*/
static int decode_entry(void *buf, struct scoutfs_srch_entry *sre,
struct scoutfs_srch_entry *prev)
{
u64 diffs[3];
u16 lengths;
int bytes;
int tot;
int i;
lengths = get_unaligned_le16(buf);
tot = 2;
for (i = 0; i < ARRAY_SIZE(diffs); i++) {
bytes = min_t(int, 8, lengths & 15);
diffs[i] = decode_u64(buf + tot, bytes);
tot += bytes;
lengths >>= 4;
}
sre->hash = cpu_to_le64(le64_to_cpu(prev->hash) + diffs[0]);
sre->ino = cpu_to_le64(le64_to_cpu(prev->ino) + diffs[1]);
sre->id = cpu_to_le64(le64_to_cpu(prev->id) + diffs[2]);
return tot;
}
/* return refs ind to traverse through parent at level to blk */
static int calc_ref_ind(u64 blk, int level)
{
int ind;
int i;
BUG_ON(level < 1);
for (i = 1; i <= level; i++)
blk = div_u64_rem(blk, SCOUTFS_SRCH_PARENT_REFS, &ind);
return ind;
}
static u8 height_for_blk(u64 blk)
{
u64 total = SCOUTFS_SRCH_PARENT_REFS;
int hei = 2;
if (blk == 0)
return 1;
while (blk >= total) {
hei++;
total *= SCOUTFS_SRCH_PARENT_REFS;
}
return hei;
}
static inline u32 srch_level_magic(int level)
{
return level ? SCOUTFS_BLOCK_MAGIC_SRCH_PARENT : SCOUTFS_BLOCK_MAGIC_SRCH_BLOCK;
}
/*
* This is operating on behalf of writers writing into private files and
* readers who could see stale blocks. We can find stale cached blocks
* and should retry the read ourselves after invalidating, but if we hit
* stale blocks on disk then we have to return to the caller who can
* decide to return errors or retry.
*/
static int read_srch_block(struct super_block *sb,
struct scoutfs_block_writer *wri, int level,
struct scoutfs_block_ref *ref,
struct scoutfs_block **bl_ret)
{
u32 magic = srch_level_magic(level);
int ret;
ret = scoutfs_block_read_ref(sb, ref, magic, bl_ret);
if (ret == -ESTALE)
scoutfs_inc_counter(sb, srch_read_stale);
return ret;
}
/*
* Give the caller a read-only reference to the block along the path to
* the logical block at the given level. This shouldn't be called on an
* empty root.
*/
static int read_path_block(struct super_block *sb,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
u64 blk, int at_level,
struct scoutfs_block **bl_ret)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
struct scoutfs_block_ref ref;
int level;
int ind;
int ret;
if (WARN_ON_ONCE(at_level < 0 || at_level >= sfl->height))
return -EINVAL;
level = sfl->height;
ref = sfl->ref;
while (level--) {
if (ref.blkno == 0) {
ret = -ENOENT;
break;
}
ret = read_srch_block(sb, wri, level, &ref, &bl);
if (ret < 0)
break;
if (level == at_level) {
ret = 0;
break;
}
srp = bl->data;
ind = calc_ref_ind(blk, level);
ref = srp->refs[ind];
scoutfs_block_put(sb, bl);
bl = NULL;
}
if (ret < 0)
scoutfs_block_put(sb, bl);
else
*bl_ret = bl;
return ret;
}
/*
* Walk radix blocks to find the logical file block and return the
* reference to the caller. Flags determine if we cow new dirty blocks,
* allocate new blocks, or return errors for missing blocks (files are
* never sparse, this won't happen).
*/
enum gfb_flags {
GFB_INSERT = (1 << 0),
GFB_DIRTY = (1 << 1),
};
static int get_file_block(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
int flags, u64 blk, struct scoutfs_block **bl_ret)
{
struct scoutfs_block *parent = NULL;
struct scoutfs_block_header *hdr;
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
struct scoutfs_block_ref new_root_ref;
struct scoutfs_block_ref *ref;
int level;
int ind;
int ret;
u8 hei;
/* see if we need to grow to insert a new largest blk */
hei = height_for_blk(blk);
while (sfl->height < hei) {
if (!(flags & GFB_INSERT)) {
ret = -ENOENT;
goto out;
}
memset(&new_root_ref, 0, sizeof(new_root_ref));
level = sfl->height;
ret = scoutfs_block_dirty_ref(sb, alloc, wri, &new_root_ref,
srch_level_magic(level), &bl, 0, NULL);
if (ret < 0)
goto out;
if (level) {
srp = bl->data;
srp->refs[0] = sfl->ref;
}
hdr = bl->data;
sfl->ref = new_root_ref;
sfl->height++;
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* walk file and parent block references to the leaf blocks */
level = sfl->height;
ref = &sfl->ref;
while (level--) {
/* searching an unused part of the tree */
if (!ref->blkno && !(flags & GFB_INSERT)) {
ret = -ENOENT;
goto out;
}
if (flags & GFB_DIRTY)
ret = scoutfs_block_dirty_ref(sb, alloc, wri, ref, srch_level_magic(level),
&bl, 0, NULL);
else
ret = scoutfs_block_read_ref(sb, ref, srch_level_magic(level), &bl);
if (ret < 0)
goto out;
if (level == 0) {
ret = 0;
break;
}
srp = bl->data;
ind = calc_ref_ind(blk, level);
ref = &srp->refs[ind];
scoutfs_block_put(sb, parent);
parent = bl;
bl = NULL;
}
ret = 0;
out:
scoutfs_block_put(sb, parent);
if (ret < 0) {
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* record that we successfully grew the file */
if (ret == 0 && (flags & GFB_INSERT) && blk >= le64_to_cpu(sfl->blocks))
sfl->blocks = cpu_to_le64(blk + 1);
*bl_ret = bl;
return ret;
}
int scoutfs_srch_add(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
struct scoutfs_block **bl_ret,
u64 hash, u64 ino, u64 id)
{
struct scoutfs_srch_block *srb;
struct scoutfs_block *bl = NULL;
u64 blk;
int ret;
struct scoutfs_srch_entry sre = {
.hash = cpu_to_le64(hash),
.ino = cpu_to_le64(ino),
.id = cpu_to_le64(id),
};
/* start with a new block or the last existing block */
if (le64_to_cpu(sfl->blocks) > 1)
blk = le64_to_cpu(sfl->blocks) - 1;
else
blk = 0;
bl = *bl_ret;
get_last_block:
if (bl == NULL) {
ret = get_file_block(sb, alloc, wri, sfl,
GFB_INSERT | GFB_DIRTY, blk, &bl);
if (ret < 0) {
/* writing into a private file, shouldn't happen */
WARN_ON_ONCE(ret == -ESTALE);
goto out;
}
}
srb = bl->data;
/* stop encoding once we might overflow the block */
if (le32_to_cpu(srb->entry_bytes) > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
scoutfs_block_put(sb, bl);
bl = NULL;
blk++;
goto get_last_block;
}
ret = encode_entry(srb->entries + le32_to_cpu(srb->entry_bytes),
&sre, &srb->tail);
if (ret > 0) {
if (srb->entry_bytes == 0) {
if (blk == 0) {
sfl->first = sre;
sfl->last = sre;
}
srb->first = sre;
srb->last = sre;
} else {
if (sre_cmp(&sre, &sfl->first) < 0)
sfl->first = sre;
else if (sre_cmp(&sre, &sfl->last) > 0)
sfl->last = sre;
if (sre_cmp(&sre, &srb->first) < 0)
srb->first = sre;
else if (sre_cmp(&sre, &srb->last) > 0)
srb->last = sre;
}
srb->tail = sre;
le32_add_cpu(&srb->entry_nr, 1);
le32_add_cpu(&srb->entry_bytes, ret);
le64_add_cpu(&sfl->entries, 1);
ret = 0;
scoutfs_inc_counter(sb, srch_add_entry);
}
out:
if (ret < 0) {
scoutfs_block_put(sb, bl);
bl = NULL;
}
*bl_ret = bl;
return ret;
}
/*
* Padded entries are encoded in pairs after an existing entry. All of
* the pairs cancel each other out by all readers (the second encoding
* looks like deletion) so they aren't visible to the first/last bounds of
* the block or file.
*/
static int append_padded_entry(struct scoutfs_srch_file *sfl, u64 blk,
struct scoutfs_srch_block *srb, struct scoutfs_srch_entry *sre)
{
int ret;
ret = encode_entry(srb->entries + le32_to_cpu(srb->entry_bytes),
sre, &srb->tail);
if (ret > 0) {
srb->tail = *sre;
le32_add_cpu(&srb->entry_nr, 1);
le32_add_cpu(&srb->entry_bytes, ret);
le64_add_cpu(&sfl->entries, 1);
ret = 0;
}
return ret;
}
/*
* This is called by a testing trigger to create a very specific case of
* encoded entry offsets. We want the last entry in the block to start
* precisely at the _SAFE_BYTES offset.
*
* This is called when there is a single existing entry in the block.
* We have the entire block to work with. We encode pairs of matching
* entries. This hides them from readers (both searches and merging) as
* they're interpreted as creation and deletion and are deleted. We use
* the existing hash value of the first entry in the block but then set
* the inode to an impossibly large number so it doesn't interfere with
* anything.
*
* To hit the specific offset we very carefully manage the amount of
* bytes of change between fields in the entry. We know that if we
* change all the byte of the ino and id we end up with a 20 byte
* (2+8+8,2) encoding of the pair of entries. To have the last entry
* start at the _SAFE_POS offset we know that the final 20 byte pair
* encoding needs to end at 2 bytes (second entry encoding) after the
* _SAFE_POS offset.
*
* So as we encode pairs we watch the delta of our current offset from
* that desired final offset of 2 past _SAFE_POS. If we're a multiple
* of 20 away then we encode the full 20 byte pairs. If we're not, then
* we drop a byte to encode 19 bytes. That'll slowly change the offset
* to be a multiple of 20 again while encoding large entries.
*/
static void pad_entries_at_safe(struct scoutfs_srch_file *sfl, u64 blk,
struct scoutfs_srch_block *srb)
{
struct scoutfs_srch_entry sre;
u32 target;
s32 diff;
u64 hash;
u64 ino;
u64 id;
int ret;
hash = le64_to_cpu(srb->tail.hash);
ino = le64_to_cpu(srb->tail.ino) | (1ULL << 62);
id = le64_to_cpu(srb->tail.id);
target = SCOUTFS_SRCH_BLOCK_SAFE_BYTES + 2;
while ((diff = target - le32_to_cpu(srb->entry_bytes)) > 0) {
ino ^= 1ULL << (7 * 8);
if (diff % 20 == 0) {
id ^= 1ULL << (7 * 8);
} else {
id ^= 1ULL << (6 * 8);
}
sre.hash = cpu_to_le64(hash);
sre.ino = cpu_to_le64(ino);
sre.id = cpu_to_le64(id);
ret = append_padded_entry(sfl, blk, srb, &sre);
if (ret == 0)
ret = append_padded_entry(sfl, blk, srb, &sre);
BUG_ON(ret != 0);
diff = target - le32_to_cpu(srb->entry_bytes);
}
}
/*
* The caller is dropping an ino/id because the tracking rbtree is full.
* This loses information so we can't return any entries at or after the
* one that we dropped. Update end to the entry before the dropped
* entry if it's less than the current end.
*/
static void set_end_before(struct scoutfs_srch_entry *end, u64 ino, u64 id)
{
struct scoutfs_srch_entry sre;
sre.hash = end->hash;
sre.ino = cpu_to_le64(ino);
sre.id = cpu_to_le64(id);
sre_dec(&sre);
if (sre_cmp(&sre, end) < 0)
*end = sre;
}
/*
* Track an inode and id of an xattr hash that we found while searching.
* We'll return inos from the nodes in order to userspace when we're
* done searching. The first time we see the entry we track it, the
* second time must be a deletion so we remove it.
*
* We count the number of tracked entries here. Once we hit the limit
* we drop entries which are greater than what's tracked. If we track
* new entries which are within the set then we drop the last entry.
* When we drop entries we have to trim the range of entries that we'll
* return because we've lost data. The caller will perform the search
* again from that point, giving them another window of tracked entries
* to fill from that entry.
*/
static int track_found(struct scoutfs_srch_rb_root *sroot, u64 ino, u64 id,
unsigned long limit, struct scoutfs_srch_entry *end)
{
struct rb_node **node = &sroot->root.rb_node;
struct rb_node *parent = NULL;
struct scoutfs_srch_rb_node *snode;
int cmp = 1; /* set last for first insertion */
while (*node) {
parent = *node;
snode = container_of(*node, struct scoutfs_srch_rb_node, node);
cmp = scoutfs_cmp(ino, snode->ino) ?:
scoutfs_cmp(id, snode->id);
if (cmp < 0) {
node = &(*node)->rb_left;
} else if (cmp > 0) {
node = &(*node)->rb_right;
} else {
/* update last if removed as a dupe */
if (sroot->last == &snode->node)
sroot->last = rb_prev(sroot->last);
rb_erase(&snode->node, &sroot->root);
kfree(snode);
sroot->nr--;
return 0;
}
}
/* can't track greater while we're at the limit */
if (sroot->nr >= limit && cmp > 0 && parent == sroot->last) {
set_end_before(end, ino, id);
return 0;
}
snode = kzalloc(sizeof(*snode), GFP_NOFS);
if (!snode)
return -ENOMEM;
rb_link_node(&snode->node, parent, node);
rb_insert_color(&snode->node, &sroot->root);
/* track a newly inserted last item */
if (cmp > 0 && parent == sroot->last)
sroot->last = &snode->node;
snode->ino = ino;
snode->id = id;
sroot->nr++;
/* remove and update last if we inserted earlier at limit */
if (sroot->nr > limit && sroot->last != &snode->node) {
snode = container_of(sroot->last, struct scoutfs_srch_rb_node,
node);
sroot->last = rb_prev(sroot->last);
set_end_before(end, snode->ino, snode->id);
rb_erase(&snode->node, &sroot->root);
kfree(snode);
sroot->nr--;
}
return 0;
}
/*
* Sweep all the unsorted entries of a log file looking for hash matches
* and tracking their xattr inos and ids. If the tracking sroot fills
* we update end but keep searching because we might find earlier
* entries.
*/
static int search_log_file(struct super_block *sb,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end,
unsigned long limit)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_entry sre;
struct scoutfs_srch_entry prev;
struct scoutfs_srch_block *srb;
int ret = 0;
u64 blk;
int pos;
int i;
for (blk = 0; blk < le64_to_cpu(sfl->blocks); blk++) {
scoutfs_block_put(sb, bl);
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
break;
srb = bl->data;
memset(&prev, 0, sizeof(prev));
pos = 0;
scoutfs_inc_counter(sb, srch_search_log_block);
for (i = 0; i < le32_to_cpu(srb->entry_nr); i++) {
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, &sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
pos += ret;
prev = sre;
if (sre_cmp(start, &sre) > 0 ||
sre_cmp(&sre, end) > 0)
continue;
ret = track_found(sroot, le64_to_cpu(sre.ino),
le64_to_cpu(sre.id), limit, end);
if (ret < 0)
break;
}
}
scoutfs_block_put(sb, bl);
return ret;
}
/*
* Search a sorted file for entries for inodes that could contain the
* xattr hash that we're looking for. The caller has checked that the
* start entry is contained in the file. We find the first block that
* could contain it and stream entries from there until we fill the
* rbtree or arrive at the end entry.
*/
static int search_sorted_file(struct super_block *sb,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end,
unsigned long limit)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry sre;
struct scoutfs_srch_entry prev;
struct scoutfs_block *bl = NULL;
int ret = 0;
int pos = 0;
s64 left;
s64 right;
u64 first;
u64 blk;
if (sfl->blocks == 0)
return 0;
/* binary search for first block in the range */
first = U64_MAX;
left = 0;
right = le64_to_cpu(sfl->blocks) - 1;
while (left <= right) {
blk = (left + right) >> 1;
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
if (sre_cmp(end, &srb->first) < 0) {
right = blk - 1;
} else if (sre_cmp(start, &srb->last) > 0) {
left = blk + 1;
} else {
first = min(blk, first);
right = blk - 1;
}
scoutfs_block_put(sb, bl);
bl = NULL;
}
/* no blocks in range */
if (first == U64_MAX) {
ret = 0;
goto out;
}
blk = first;
/* stream entries until end or we're past the full tracking rb_root */
for (;;) {
if (bl == NULL) {
/* only check on each new input block */
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
memset(&prev, 0, sizeof(prev));
pos = 0;
scoutfs_inc_counter(sb, srch_search_sorted_block);
}
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, &sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
pos += ret;
prev = sre;
if (sre_cmp(start, &sre) > 0)
continue;
if (sre_cmp(&sre, end) > 0)
break;
ret = track_found(sroot, le64_to_cpu(sre.ino),
le64_to_cpu(sre.id), limit, end);
if (ret < 0)
goto out;
if (pos >= le32_to_cpu(srb->entry_bytes)) {
scoutfs_block_put(sb, bl);
bl = NULL;
if (++blk == le64_to_cpu(sfl->blocks))
break;
}
}
ret = 0;
out:
scoutfs_block_put(sb, bl);
return ret;
}
static int search_file(struct super_block *sb, int type,
struct scoutfs_srch_file *sfl,
struct scoutfs_srch_rb_root *sroot,
struct scoutfs_srch_entry *start,
struct scoutfs_srch_entry *end, unsigned long limit)
{
/* ignore files that don't have our hash */
if (sre_cmp(start, &sfl->last) > 0 ||
sre_cmp(end, &sfl->first) < 0)
return 0;
if (type == SCOUTFS_SRCH_LOG_TYPE) {
scoutfs_inc_counter(sb, srch_search_log);
return search_log_file(sb, sfl, sroot, start, end, limit);
} else {
scoutfs_inc_counter(sb, srch_search_sorted);
return search_sorted_file(sb, sfl, sroot, start, end, limit);
}
}
static void srch_init_rb_root(struct scoutfs_srch_rb_root *sroot)
{
sroot->root = RB_ROOT;
sroot->last = NULL;
sroot->nr = 0;
}
void scoutfs_srch_destroy_rb_root(struct scoutfs_srch_rb_root *sroot)
{
struct scoutfs_srch_rb_node *snode;
struct scoutfs_srch_rb_node *pos;
rbtree_postorder_for_each_entry_safe(snode, pos, &sroot->root, node)
kfree(snode);
srch_init_rb_root(sroot);
}
/*
* There are no constraints on the distribution of entries in log or
* sorted srch files. We limit the number of entries we track to avoid
* consuming absurd amounts of memory for very large searches. The
* larger the limit the more memory each search will take. The smaller
* this is the more searches will be necessary to find all the entries.
*/
#define SRCH_LIMIT 1000000
/*
* Search all the srch files for entries recording that inodes might
* have a given xattr.
*
* Advancing from an inode number that was returned is the only way the
* caller can make forward progress between searches. We might not find
* any inodes if we have the bad luck of pruning all the entries we
* tracked with deletions. We'll restart the search ourselves in this
* case to see if we can find an inode to return to the caller.
*/
int scoutfs_srch_search_xattrs(struct super_block *sb,
struct scoutfs_srch_rb_root *sroot,
u64 hash, u64 ino, u64 last_ino, bool *done)
{
struct scoutfs_net_roots roots;
struct scoutfs_srch_entry start;
struct scoutfs_srch_entry end;
struct scoutfs_srch_entry final;
struct scoutfs_log_trees lt;
struct scoutfs_srch_file sfl;
SCOUTFS_BTREE_ITEM_REF(iref);
DECLARE_SAVED_REFS(saved);
struct scoutfs_key key;
unsigned long limit = SRCH_LIMIT;
int ret;
scoutfs_inc_counter(sb, srch_search_xattrs);
*done = false;
srch_init_rb_root(sroot);
start.hash = cpu_to_le64(hash);
start.ino = cpu_to_le64(ino);
start.id = 0;
final.hash = cpu_to_le64(hash);
final.ino = cpu_to_le64(last_ino);
final.id = cpu_to_le64(U64_MAX);
retry:
scoutfs_srch_destroy_rb_root(sroot);
ret = scoutfs_client_get_roots(sb, &roots);
if (ret)
goto out;
end = final;
/* search intersecting sorted files, then logs */
init_srch_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE, 0, 0);
for (;;) {
ret = scoutfs_btree_next(sb, &roots.srch_root, &key, &iref);
if (ret == 0) {
if (iref.key->sk_type != key.sk_type) {
ret = -ENOENT;
} else if (iref.val_len == sizeof(sfl)) {
key = *iref.key;
scoutfs_key_inc(&key);
memcpy(&sfl, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) {
if (ret == -ENOENT) {
if (key.sk_type == SCOUTFS_SRCH_BLOCKS_TYPE) {
init_srch_key(&key,
SCOUTFS_SRCH_LOG_TYPE, 0, 0);
continue;
} else {
break;
}
}
goto out;
}
ret = search_file(sb, key.sk_type, &sfl, sroot,
&start, &end, limit);
if (ret < 0)
goto out;
}
/* search all the log files being written by mounts */
scoutfs_key_init_log_trees(&key, 0, 0);
for (;;) {
ret = scoutfs_btree_next(sb, &roots.logs_root, &key, &iref);
if (ret == -ENOENT)
break;
if (ret == 0) {
if (iref.val_len == sizeof(lt)) {
key = *iref.key;
scoutfs_key_inc(&key);
memcpy(&lt, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
ret = search_file(sb, SCOUTFS_SRCH_LOG_TYPE, &lt.srch_file,
sroot, &start, &end, limit);
if (ret < 0)
goto out;
}
/* keep searching if we didn't find any entries in the limit */
if (sroot->nr == 0 && sre_cmp(&end, &final) < 0) {
start = end;
sre_inc(&start);
scoutfs_inc_counter(sb, srch_search_retry_empty);
goto retry;
}
/* let the caller know our search was exhaustive */
*done = sre_cmp(&end, &final) == 0;
ret = 0;
out:
ret = scoutfs_block_check_stale(sb, ret, &saved, &roots.srch_root.ref,
&roots.logs_root.ref);
if (ret == -ESTALE)
goto retry;
return ret;
}
/*
* Running in the server, rotate the client's log file as they commit if
* it's large enough.
*/
int scoutfs_srch_rotate_log(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
struct scoutfs_srch_file *sfl, bool force)
{
struct scoutfs_key key;
int ret;
if (sfl->ref.blkno && !force && scoutfs_trigger(sb, SRCH_FORCE_LOG_ROTATE))
force = true;
if (sfl->ref.blkno == 0 ||
(!force && le64_to_cpu(sfl->blocks) < SCOUTFS_SRCH_LOG_BLOCK_LIMIT))
return 0;
init_srch_key(&key, SCOUTFS_SRCH_LOG_TYPE,
le64_to_cpu(sfl->ref.blkno), 0);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sfl, sizeof(*sfl));
/*
* While it's fine to replay moving the client's logging srch
* file to the core btree item, server commits should keep it
* from happening. So we'll warn if we see it happen. This can
* be removed eventually.
*/
if (WARN_ON_ONCE(ret == -EEXIST))
ret = 0;
if (ret == 0) {
memset(sfl, 0, sizeof(*sfl));
scoutfs_inc_counter(sb, srch_rotate_log);
}
return ret;
}
/*
* Running in the server, get a compaction operation to send to the
* client. We first see if there are any pending operations to continue
* working on. If not, we see if any tier has enough files waiting for
* a compaction. We first search log files and then each greater size
* tier. We skip input files which are currently being read by busy
* compaction items.
*/
int scoutfs_srch_get_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
u64 rid, struct scoutfs_srch_compact *sc)
{
struct scoutfs_srch_file sfl;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_spbm busy;
struct scoutfs_key key;
int cur_order = -1;
int order;
int type;
int ret;
int err;
int i;
/*
* Search for pending or busy items. If we find a pending item
* we move it to busy and return it. We build up a bitmap of
* input files which are in busy items.
*/
scoutfs_spbm_init(&busy);
for (init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE, 0, 0); ;
scoutfs_key_inc(&key)) {
/* _PENDING_ and _BUSY_ are last, _next won't see other types */
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == -ENOENT)
break;
if (ret == 0) {
if (iref.val_len == sizeof(*sc)) {
key = *iref.key;
memcpy(sc, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
/* record all the busy input files */
if (key.sk_type == SCOUTFS_SRCH_BUSY_TYPE) {
for (i = 0; i < sc->nr; i++) {
ret = scoutfs_spbm_set(&busy,
le64_to_cpu(sc->in[i].sfl.ref.blkno));
if (ret < 0)
goto out;
}
continue;
}
/* or move the first pending to busy and return it */
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid,
le64_to_cpu(sc->id));
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sc, sizeof(*sc));
if (ret < 0)
goto out;
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(sc->id), 0);
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0) {
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid,
le64_to_cpu(sc->id));
err = scoutfs_btree_delete(sb, alloc, wri, root, &key);
BUG_ON(err); /* XXX both pending and busy :/ */
goto out;
}
/* found one */
ret = 0;
goto out;
}
/* no pending, look for sufficient files to start a new compaction */
memset(sc, 0, sizeof(struct scoutfs_srch_compact));
/* first look for unsorted log files */
type = SCOUTFS_SRCH_LOG_TYPE;
init_srch_key(&key, type, 0, 0);
for (;;scoutfs_key_inc(&key)) {
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == 0) {
if (iref.key->sk_type != type) {
ret = -ENOENT;
} else if (iref.val_len == sizeof(sfl)) {
key = *iref.key;
memcpy(&sfl, iref.val, iref.val_len);
} else {
ret = -EIO;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) {
/* see if we ran out of log files or files entirely */
if (ret == -ENOENT) {
sc->nr = 0;
if (type == SCOUTFS_SRCH_LOG_TYPE) {
type = SCOUTFS_SRCH_BLOCKS_TYPE;
init_srch_key(&key, type, 0, 0);
continue;
} else {
ret = 0;
}
}
goto out;
}
/* skip any files already being compacted */
if (scoutfs_spbm_test(&busy, le64_to_cpu(sfl.ref.blkno)))
continue;
/* reset if we iterated into the next size category */
if (type == SCOUTFS_SRCH_BLOCKS_TYPE) {
order = fls64(le64_to_cpu(sfl.blocks)) /
SCOUTFS_SRCH_COMPACT_ORDER;
if (order != cur_order) {
cur_order = order;
sc->nr = 0;
}
}
sc->in[sc->nr++].sfl = sfl;
if (sc->nr == SCOUTFS_SRCH_COMPACT_NR)
break;
scoutfs_key_inc(&key);
}
if (type == SCOUTFS_SRCH_LOG_TYPE)
sc->flags = SCOUTFS_SRCH_COMPACT_FLAG_LOG;
else
sc->flags = SCOUTFS_SRCH_COMPACT_FLAG_SORTED;
/* record that our client has a compaction in process */
sc->id = sc->in[0].sfl.ref.blkno;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(sc->id));
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sc, sizeof(*sc));
out:
scoutfs_spbm_destroy(&busy);
if (ret < 0)
sc->nr = 0;
if (sc->nr < SCOUTFS_SRCH_COMPACT_NR)
memset(&sc->in[sc->nr], 0,
(SCOUTFS_SRCH_COMPACT_NR - sc->nr) * sizeof(sc->in[0]));
return ret;
}
/*
* get_ previously created a busy item to reserve the files for a compaction.
* The caller has finished the input struct and we can update the persistent
* copy.
*/
int scoutfs_srch_update_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_srch_compact *sc)
{
struct scoutfs_key key;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(sc->id));
return scoutfs_btree_update(sb, alloc, wri, root, &key,
sc, sizeof(struct scoutfs_srch_compact));
}
static void init_file_key(struct scoutfs_key *key, int type,
struct scoutfs_srch_file *sfl)
{
if (type == SCOUTFS_SRCH_LOG_TYPE)
init_srch_key(key, type, le64_to_cpu(sfl->ref.blkno), 0);
else
init_srch_key(key, type, le64_to_cpu(sfl->blocks),
le64_to_cpu(sfl->ref.blkno));
}
/*
* A compaction has completed so we remove the input file reference
* items and add the output file, if it has contents. If this returns
* an error then the file items were not changed.
*/
static int commit_files(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root,
struct scoutfs_srch_compact *sc)
{
struct scoutfs_srch_file *sfl;
struct scoutfs_key key;
int type;
int ret;
int err;
int i;
if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_LOG)
type = SCOUTFS_SRCH_LOG_TYPE;
else
type = SCOUTFS_SRCH_BLOCKS_TYPE;
if (sc->out.blocks != 0) {
sfl = &sc->out;
init_file_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE, sfl);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
sfl, sizeof(*sfl));
if (ret < 0)
goto out;
}
for (i = 0; i < sc->nr; i++) {
sfl = &sc->in[i].sfl;
init_file_key(&key, type, sfl);
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0) {
while (--i >= 0) {
sfl = &sc->in[i].sfl;
init_file_key(&key, type, sfl);
err = scoutfs_btree_insert(sb, alloc, wri,
root, &key,
sfl, sizeof(*sfl));
BUG_ON(err); /* lost srch file */
}
if (sc->out.blocks != 0) {
sfl = &sc->out;
init_file_key(&key, SCOUTFS_SRCH_BLOCKS_TYPE,
sfl);
err = scoutfs_btree_delete(sb, alloc, wri,
root, &key);
BUG_ON(err); /* duplicate srch files data */
}
goto out;
}
}
ret = 0;
out:
return ret;
}
/*
* Running in the server: commit the result of a compaction. Given the
* response id, find the compaction's busy item. The busy item is
* returned to a pending item or is advanced depending on the result.
* If the compaction completed then we replace the input files with the
* output files and transition the compaction to delete the input files.
* Once the input files are deleted we can remove the compaction item.
*/
int scoutfs_srch_commit_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_srch_compact *res,
struct scoutfs_alloc_list_head *av,
struct scoutfs_alloc_list_head *fr)
{
struct scoutfs_srch_compact *pending = NULL;
struct scoutfs_srch_compact *busy;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_key key;
int ret;
int err;
int i;
/* only free allocators when we finish deleting */
memset(av, 0, sizeof(struct scoutfs_alloc_list_head));
memset(fr, 0, sizeof(struct scoutfs_alloc_list_head));
busy = kzalloc(sizeof(struct scoutfs_srch_compact), GFP_NOFS);
if (busy == NULL) {
ret = -ENOMEM;
goto out;
}
/* find the record of our compaction */
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(res->id));
ret = scoutfs_btree_lookup(sb, root, &key, &iref);
if (ret == 0) {
if (iref.val_len == sizeof(struct scoutfs_srch_compact))
memcpy(busy, iref.val, iref.val_len);
else
ret = -EIO;
scoutfs_btree_put_iref(&iref);
}
if (ret < 0) /* XXX leaks allocators */
goto out;
/* restore busy to pending if the operation failed */
if (res->flags & SCOUTFS_SRCH_COMPACT_FLAG_ERROR) {
pending = busy;
ret = 0;
goto update;
}
/* store result as pending if it isn't done */
if (!(res->flags & SCOUTFS_SRCH_COMPACT_FLAG_DONE)) {
pending = res;
ret = 0;
goto update;
}
/* update file references if we finished compaction (!deleting) */
if (!(res->flags & SCOUTFS_SRCH_COMPACT_FLAG_DELETE)) {
ret = commit_files(sb, alloc, wri, root, res);
if (ret < 0) {
/* XXX we can't commit, shutdown? */
goto out;
}
/* transition flags for deleting input files */
for (i = 0; i < res->nr; i++) {
res->in[i].blk = 0;
res->in[i].pos = 0;
}
res->flags &= ~(SCOUTFS_SRCH_COMPACT_FLAG_DONE |
SCOUTFS_SRCH_COMPACT_FLAG_LOG |
SCOUTFS_SRCH_COMPACT_FLAG_SORTED);
res->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DELETE;
pending = res;
ret = 0;
goto update;
}
/* ok, finished deleting, reclaim allocs and delete busy */
*av = res->meta_avail;
*fr = res->meta_freed;
pending = NULL;
ret = 0;
update:
if (pending) {
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(pending->id), 0);
ret = scoutfs_btree_insert(sb, alloc, wri, root, &key,
pending, sizeof(*pending));
if (ret < 0)
goto out;
}
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, le64_to_cpu(res->id));
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
if (ret < 0 && pending) {
init_srch_key(&key, SCOUTFS_SRCH_PENDING_TYPE,
le64_to_cpu(pending->id), 0);
err = scoutfs_btree_delete(sb, alloc, wri, root, &key);
BUG_ON(err); /* both busy and pending present */
}
out:
WARN_ON_ONCE(ret < 0); /* XXX inconsistency */
kfree(busy);
return ret;
}
/*
* Remove a busy item for the given client and give the caller its
* allocators. Returns -ENOENT when there are no more items.
*/
int scoutfs_srch_cancel_compact(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_btree_root *root, u64 rid,
struct scoutfs_alloc_list_head *av,
struct scoutfs_alloc_list_head *fr)
{
struct scoutfs_srch_compact *sc;
SCOUTFS_BTREE_ITEM_REF(iref);
struct scoutfs_key key;
struct scoutfs_key last;
int ret;
init_srch_key(&key, SCOUTFS_SRCH_BUSY_TYPE, rid, 0);
init_srch_key(&last, SCOUTFS_SRCH_BUSY_TYPE, rid, U64_MAX);
ret = scoutfs_btree_next(sb, root, &key, &iref);
if (ret == 0) {
if (scoutfs_key_compare(iref.key, &last) > 0) {
ret = -ENOENT;
} else if (iref.val_len != sizeof(*sc)) {
ret = -EIO;
} else {
key = *iref.key;
sc = iref.val;
*av = sc->meta_avail;
*fr = sc->meta_freed;
}
scoutfs_btree_put_iref(&iref);
}
if (ret < 0)
goto out;
ret = scoutfs_btree_delete(sb, alloc, wri, root, &key);
out:
return ret;
}
/*
* We should commit our progress when we have sufficient dirty blocks or
* don't have enough metadata alloc space for our caller's operations.
*/
static bool should_commit(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri, u32 nr)
{
return (scoutfs_block_writer_dirty_bytes(sb, wri) >=
SRCH_COMPACT_DIRTY_LIMIT_BYTES) ||
scoutfs_alloc_meta_low(sb, alloc, nr);
}
struct tourn_node {
struct scoutfs_srch_entry sre;
int ind;
};
static void tourn_update(struct tourn_node *tnodes, struct tourn_node *tn)
{
struct tourn_node *sib;
struct tourn_node *par;
size_t ind;
/* root is at [1] */
while (tn != &tnodes[1]) {
ind = tn - tnodes;
sib = &tnodes[ind ^ 1];
par = &tnodes[ind >> 1];
*par = sre_cmp(&tn->sre, &sib->sre) < 0 ? *tn : *sib;
tn = par;
}
}
/* return the entry at the current position, can return enoent if done */
typedef int (*kway_get_t)(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg);
/* only called after _get returns 0, advances to next entry for _get */
typedef void (*kway_advance_t)(struct super_block *sb, void *arg);
static int kway_merge(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_file *sfl,
kway_get_t kway_get, kway_advance_t kway_adv,
void **args, int nr, bool logs_input)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry last_tail;
struct scoutfs_block *bl = NULL;
struct tourn_node *tnodes;
struct tourn_node *leaves;
struct tourn_node *root;
struct tourn_node *tn;
int last_bytes = 0;
int nr_parents;
int nr_nodes;
int empty = 0;
int ret = 0;
int diff;
u64 blk;
int ind;
int i;
if (WARN_ON_ONCE(nr <= 0))
return -EINVAL;
/* always at least one parent for single leaf */
nr_parents = max_t(unsigned long, 1, roundup_pow_of_two(nr) - 1);
/* root at [1] for easy sib/parent index calc, final pad for odd sib */
nr_nodes = 1 + nr_parents + nr + 1;
tnodes = __vmalloc(nr_nodes * sizeof(struct tourn_node),
GFP_NOFS, PAGE_KERNEL);
if (!tnodes)
return -ENOMEM;
memset(tnodes, 0xff, nr_nodes * sizeof(struct tourn_node));
root = &tnodes[1];
leaves = &root[nr_parents];
/* initialize tournament leaves */
for (i = 0; i < nr; i++) {
tn = &leaves[i];
tn->ind = i;
ret = kway_get(sb, &tn->sre, args[i]);
if (ret == 0) {
tourn_update(tnodes, &leaves[i]);
} else if (ret == -ENOENT) {
empty++;
} else {
goto out;
}
}
/* always append new blocks */
blk = le64_to_cpu(sfl->blocks);
while (empty < nr) {
if (bl == NULL) {
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
/* could grow and dirty to a leaf */
if (should_commit(sb, alloc, wri, sfl->height + 1)) {
ret = 0;
goto out;
}
ret = get_file_block(sb, alloc, wri, sfl,
GFB_INSERT | GFB_DIRTY, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
scoutfs_inc_counter(sb, srch_compact_dirty_block);
}
if (sre_cmp(&root->sre, &srb->last) != 0) {
last_bytes = le32_to_cpu(srb->entry_bytes);
last_tail = srb->last;
ret = encode_entry(srb->entries +
le32_to_cpu(srb->entry_bytes),
&root->sre, &srb->tail);
if (WARN_ON_ONCE(ret <= 0)) {
/* shouldn't happen */
ret = -EIO;
goto out;
}
if (srb->entry_bytes == 0) {
if (blk == 0)
sfl->first = root->sre;
srb->first = root->sre;
}
le32_add_cpu(&srb->entry_nr, 1);
le32_add_cpu(&srb->entry_bytes, ret);
srb->last = root->sre;
srb->tail = root->sre;
sfl->last = root->sre;
le64_add_cpu(&sfl->entries, 1);
ret = 0;
if (le32_to_cpu(srb->entry_bytes) >
SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
scoutfs_block_put(sb, bl);
bl = NULL;
blk++;
}
/* end sorted block on _SAFE offset for testing */
if (bl && le32_to_cpu(srb->entry_nr) == 1 && logs_input &&
scoutfs_trigger(sb, SRCH_COMPACT_LOGS_PAD_SAFE)) {
pad_entries_at_safe(sfl, blk, srb);
scoutfs_block_put(sb, bl);
bl = NULL;
blk++;
}
scoutfs_inc_counter(sb, srch_compact_entry);
} else {
/*
* Duplicate entries indicate deletion so we
* undo the previously encoded entry and ignore
* this entry. This only happens within each
* block. Deletions can span block boundaries
* and will be filtered out by search and
* hopefully removed in future compactions.
*/
diff = le32_to_cpu(srb->entry_bytes) - last_bytes;
if (diff) {
memset(srb->entries + last_bytes, 0, diff);
if (srb->entry_bytes == 0) {
/* last_tail will be 0 */
if (blk == 0)
sfl->first = last_tail;
srb->first = last_tail;
}
le32_add_cpu(&srb->entry_nr, -1);
srb->entry_bytes = cpu_to_le32(last_bytes);
srb->last = last_tail;
srb->tail = last_tail;
sfl->last = last_tail;
le64_add_cpu(&sfl->entries, -1);
}
scoutfs_inc_counter(sb, srch_compact_removed_entry);
}
/* get the next */
ind = root->ind;
tn = &leaves[ind];
kway_adv(sb, args[ind]);
ret = kway_get(sb, &tn->sre, args[ind]);
if (ret == -ENOENT) {
/* this index is done */
memset(&tn->sre, 0xff, sizeof(tn->sre));
empty++;
ret = 0;
} else if (ret < 0) {
if (ret == -ENOANO) /* just testing trigger */
ret = 0;
goto out;
}
/* update the tourney and carry on */
tourn_update(tnodes, tn);
#if 0
/* would be worth it if we have uneven key distribution */
if (ind < nr - 1) {
/* order doesn't matter, fill hole */
swap(args[ind], args[nr - 1]);
swap(tn->sre, leaves[nr - 1].sre);
}
/* drop a level of the tree when we shrink to a power of 2 */
if (nr > 0 && is_power_of_two(nr)) {
memcpy(leaves - nr, leaves, nr * sizeof(*tn));
leaves -= nr;
for (i = 0; i < nr; i += 2)
tourn_update(least, leaves[i]);
}
#endif
}
/* could stream a final index.. arguably a small portion of work */
out:
scoutfs_block_put(sb, bl);
vfree(tnodes);
return ret;
}
#define SRES_PER_PAGE (PAGE_SIZE / sizeof(struct scoutfs_srch_entry))
static struct scoutfs_srch_entry *page_priv_sre(struct page *page)
{
return (struct scoutfs_srch_entry *)page_address(page) + page->private;
}
static int kway_get_page(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg)
{
struct page *page = arg;
struct scoutfs_srch_entry *sre = page_priv_sre(page);
if (page->private >= SRES_PER_PAGE || sre->ino == 0)
return -ENOENT;
*sre_ret = *sre;
return 0;
}
static void kway_adv_page(struct super_block *sb, void *arg)
{
struct page *page = arg;
page->private++;
}
static int cmp_page_sre(const void *A, const void *B)
{
const struct scoutfs_srch_entry *a = A;
const struct scoutfs_srch_entry *b = B;
return sre_cmp(a, b);
}
static void swap_page_sre(void *A, void *B, int size)
{
struct scoutfs_srch_entry *a = A;
struct scoutfs_srch_entry *b = B;
swap(*a, *b);
}
/*
* Compact a set of log files by sorting all their entries and writing
* them to a sorted output file. We decode all the file's entries into
* pages, sort the contents of each page, and then stream a k-way merge
* of the entries in the pages into an output file. While not sorted,
* the input log files entries are encoded so we can allocate quite a
* bit more memory in pages than the files took in blocks on disk (~2x
* typically, ~10x worst case).
*
* Because we read and sort all the input files we must perform the full
* compaction in one operation. The server must have given us a
* sufficiently large avail/freed lists, otherwise we'll return ENOSPC.
*/
static int compact_logs(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
DECLARE_SRCH_INFO(sb, srinf);
struct scoutfs_srch_block *srb = NULL;
struct scoutfs_srch_entry *sre;
struct scoutfs_srch_entry prev;
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_file *sfl;
struct page *page = NULL;
struct page *tmp;
void **args = NULL;
int nr_pages = 0;
LIST_HEAD(pages);
int sfl_ind;
u64 blk = 0;
int pos = 0;
int ret;
int i;
if (sc->nr <= 1) {
ret = -EINVAL;
goto out;
}
memset(&prev, 0, sizeof(prev));
/* decode all the log file's block's entries into pages */
for (sfl_ind = 0, sfl = &sc->in[0].sfl; sfl_ind < sc->nr; ) {
if (bl == NULL) {
/* only check on each new input block */
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
ret = get_file_block(sb, NULL, NULL, sfl, 0, blk, &bl);
if (ret < 0)
goto out;
srb = bl->data;
}
if (page == NULL) {
page = alloc_page(GFP_NOFS);
if (!page) {
ret = -ENOMEM;
goto out;
}
page->private = 0;
list_add_tail(&page->lru, &pages);
nr_pages++;
scoutfs_inc_counter(sb, srch_compact_log_page);
}
sre = page_priv_sre(page);
if (pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES) {
/* can only be inconsistency :/ */
ret = EIO;
break;
}
ret = decode_entry(srb->entries + pos, sre, &prev);
if (ret <= 0) {
/* can only be inconsistency :/ */
ret = EIO;
goto out;
}
prev = *sre;
pos += ret;
if (pos >= le32_to_cpu(srb->entry_bytes)) {
scoutfs_block_put(sb, bl);
bl = NULL;
memset(&prev, 0, sizeof(prev));
pos = 0;
if (++blk == le64_to_cpu(sfl->blocks)) {
blk = 0;
sfl_ind++;
sfl = &sc->in[sfl_ind].sfl;
}
}
if (++page->private == SRES_PER_PAGE)
page = NULL;
}
/* add a terminal entry to the last partial page */
if (page) {
sre = page_priv_sre(page);
sre->ino = 0;
}
/* allocate args array for k-way merge */
args = vmalloc(nr_pages * sizeof(struct page *));
if (!args) {
ret = -ENOMEM;
goto out;
}
/* sort page entries and reset private for _next */
i = 0;
list_for_each_entry(page, &pages, lru) {
args[i++] = page;
if (atomic_read(&srinf->shutdown)) {
ret = -ESHUTDOWN;
goto out;
}
sort(page_address(page), page->private,
sizeof(struct scoutfs_srch_entry), cmp_page_sre,
swap_page_sre);
page->private = 0;
}
ret = kway_merge(sb, alloc, wri, &sc->out, kway_get_page, kway_adv_page,
args, nr_pages, true);
if (ret < 0)
goto out;
/* make sure we finished all the pages */
list_for_each_entry(page, &pages, lru) {
sre = page_priv_sre(page);
if (page->private < SRES_PER_PAGE && sre->ino != 0) {
ret = -ENOSPC;
goto out;
}
}
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
ret = 0;
out:
scoutfs_block_put(sb, bl);
vfree(args);
list_for_each_entry_safe(page, tmp, &pages, lru) {
list_del(&page->lru);
__free_page(page);
}
return ret;
}
struct kway_file_reader {
struct scoutfs_srch_file *sfl;
struct scoutfs_block *bl;
struct scoutfs_srch_entry prev;
struct scoutfs_srch_entry decoded_sre;
u64 blk;
u32 skip;
u32 pos;
int decoded_bytes;
};
static int kway_get_reader(struct super_block *sb,
struct scoutfs_srch_entry *sre_ret, void *arg)
{
struct kway_file_reader *rdr = arg;
struct scoutfs_srch_block *srb;
int ret;
if (rdr->blk == le64_to_cpu(rdr->sfl->blocks))
return -ENOENT;
if (rdr->bl == NULL) {
ret = get_file_block(sb, NULL, NULL, rdr->sfl, 0, rdr->blk,
&rdr->bl);
if (ret < 0)
return ret;
memset(&rdr->prev, 0, sizeof(rdr->prev));
}
srb = rdr->bl->data;
if (rdr->pos > SCOUTFS_SRCH_BLOCK_SAFE_BYTES ||
rdr->skip > SCOUTFS_SRCH_BLOCK_SAFE_BYTES ||
rdr->skip >= le32_to_cpu(srb->entry_bytes)) {
/* XXX inconsistency */
return -EIO;
}
if (rdr->decoded_bytes == 0 && rdr->pos == SCOUTFS_SRCH_BLOCK_SAFE_BYTES &&
scoutfs_trigger(sb, SRCH_MERGE_STOP_SAFE)) {
/* only used in testing */
return -ENOANO;
}
/* decode entry, possibly skipping start of the block */
while (rdr->decoded_bytes == 0 || rdr->pos < rdr->skip) {
ret = decode_entry(srb->entries + rdr->pos,
&rdr->decoded_sre, &rdr->prev);
if (ret <= 0) {
/* XXX inconsistency */
return -EIO;
}
rdr->decoded_bytes = ret;
if (rdr->pos < rdr->skip) {
rdr->prev = rdr->decoded_sre;
rdr->pos += ret;
if (rdr->pos >= rdr->skip)
rdr->skip = 0;
rdr->decoded_bytes = 0;
}
}
*sre_ret = rdr->decoded_sre;
return 0;
}
static void kway_adv_reader(struct super_block *sb, void *arg)
{
struct kway_file_reader *rdr = arg;
struct scoutfs_srch_block *srb;
/* _get must have set */
BUG_ON(rdr->bl == NULL);
BUG_ON(rdr->decoded_bytes == 0);
rdr->prev = rdr->decoded_sre;
rdr->pos += rdr->decoded_bytes;
rdr->decoded_bytes = 0;
srb = rdr->bl->data;
if (rdr->pos >= le32_to_cpu(srb->entry_bytes)) {
rdr->pos = 0;
scoutfs_block_put(sb, rdr->bl);
rdr->bl = NULL;
rdr->blk++;
}
}
/*
* Compact a set of sorted files by performing a k-way merge of the files
* into an output sorted file. The k-way merge works with an iterator
* which reads blocks and decodes entries.
*/
static int compact_sorted(struct super_block *sb,
struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
struct kway_file_reader *rdrs = NULL;
void **args = NULL;
int ret;
int nr;
int i;
if (WARN_ON_ONCE(sc->nr <= 1))
return -EINVAL;
nr = sc->nr;
/* allocate args array for k-way merge */
rdrs = kmalloc_array(nr, sizeof(rdrs[0]), __GFP_ZERO | GFP_NOFS);
args = kmalloc_array(nr, sizeof(args[0]), GFP_NOFS);
if (!rdrs || !args) {
ret = -ENOMEM;
goto out;
}
for (i = 0; i < nr; i++) {
if (le64_to_cpu(sc->in[i].blk) >
le64_to_cpu(sc->in[i].sfl.blocks)) {
ret = -EINVAL;
goto out;
}
rdrs[i].sfl = &sc->in[i].sfl;
rdrs[i].blk = le64_to_cpu(sc->in[i].blk);
rdrs[i].skip = le64_to_cpu(sc->in[i].pos);
args[i] = &rdrs[i];
}
ret = kway_merge(sb, alloc, wri, &sc->out, kway_get_reader,
kway_adv_reader, args, nr, false);
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
for (i = 0; i < nr; i++) {
sc->in[i].blk = cpu_to_le64(rdrs[i].blk);
sc->in[i].pos = cpu_to_le64(rdrs[i].pos);
if (rdrs[i].blk < le64_to_cpu(sc->in[i].sfl.blocks))
sc->flags &= ~SCOUTFS_SRCH_COMPACT_FLAG_DONE;
}
out:
for (i = 0; rdrs && i < nr; i++)
scoutfs_block_put(sb, rdrs[i].bl);
kfree(rdrs);
kfree(args);
return ret;
}
/*
* Delete a file that has been compacted and is no longer referenced by
* items in the srch_root. The server protects the input file from
* other compactions while we're working, but other readers could be
* still trying to read it while searching.
*
* We don't modify the blocks to avoid the cost of allocating and
* freeing dirty parent metadata blocks, and we want to avoid triggering
* stale reads in racing readers. We free blocks from leaf parents
* upwards and from left to right. Once we've freed a block we never
* visit it again. We store our walk position in each file's compact
* input so that it can be stored in pending items as progress is made
* over multiple operations.
*/
static int delete_file(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact_input *in)
{
struct scoutfs_block *bl = NULL;
struct scoutfs_srch_parent *srp;
u64 blkno;
u64 blk;
u64 inc;
int level;
int ret;
int i;
blk = le64_to_cpu(in->blk);
level = max(le64_to_cpu(in->pos), 1ULL);
if (level > in->sfl.height) {
ret = 0;
goto out;
}
for (; level < in->sfl.height; level++) {
for (inc = 1, i = 2; i <= level; i++)
inc *= SCOUTFS_SRCH_PARENT_REFS;
while (blk < le64_to_cpu(in->sfl.blocks)) {
ret = read_path_block(sb, wri, &in->sfl, blk, level,
&bl);
if (ret < 0)
goto out;
srp = bl->data;
for (i = calc_ref_ind(blk, level);
i < SCOUTFS_SRCH_PARENT_REFS &&
blk < le64_to_cpu(in->sfl.blocks);
i++, blk += inc) {
blkno = le64_to_cpu(srp->refs[i].blkno);
if (!blkno)
continue;
/* free below, then final root block */
if (should_commit(sb, alloc, wri, 2)) {
ret = 0;
goto out;
}
ret = scoutfs_free_meta(sb, alloc, wri, blkno);
if (ret < 0)
goto out;
}
scoutfs_block_put(sb, bl);
bl = NULL;
}
blk = 0;
}
if (level == in->sfl.height) {
ret = scoutfs_free_meta(sb, alloc, wri,
le64_to_cpu(in->sfl.ref.blkno));
if (ret < 0)
goto out;
level++;
}
ret = 0;
out:
in->blk = cpu_to_le64(blk);
in->pos = cpu_to_le64(level);
scoutfs_block_put(sb, bl);
return ret;
}
static int delete_files(struct super_block *sb, struct scoutfs_alloc *alloc,
struct scoutfs_block_writer *wri,
struct scoutfs_srch_compact *sc)
{
int ret = 0;
int i;
for (i = 0; i < sc->nr; i++) {
ret = delete_file(sb, alloc, wri, &sc->in[i]);
if (ret < 0 ||
(le64_to_cpu(sc->in[i].pos) <= sc->in[i].sfl.height))
break;
}
if (i == sc->nr)
sc->flags |= SCOUTFS_SRCH_COMPACT_FLAG_DONE;
return ret;
}
/* wait 10s between compact attempts on error, immediate after success */
#define SRCH_COMPACT_DELAY_MS (10 * MSEC_PER_SEC)
/*
* Get a compaction operation from the server, sort the entries from the
* input files as they're read, and stream the remaining sorted entries
* into a newly written output file. The server is protecting the input
* files from other compactions, they will be stable. The server gives
* us a populated allocator that should be enough to write a new file
* and delete the old file blocks. We'll regularly write out dirty
* blocks as we hit a dirty limit threshold so there will be some cow
* overhead of repeatedly dirtying, say, parent allocator and file radix
* blocks. We don't reclaim freed blocks in the allocator after each
* write so the initial allocator pool has to account for that cow
* overhead.
*
* All of our modifications are written into free blocks from the
* filesystem's perspective. If anything goes wrong we return an error
* and the server will ignore all our work and reclaim the initial
* allocator they gave us.
*/
static void scoutfs_srch_compact_worker(struct work_struct *work)
{
struct srch_info *srinf = container_of(work, struct srch_info,
compact_dwork.work);
struct scoutfs_srch_compact *sc = NULL;
struct super_block *sb = srinf->sb;
struct scoutfs_block_writer wri;
struct scoutfs_alloc alloc;
unsigned long delay;
int ret;
int err;
sc = kmalloc(sizeof(struct scoutfs_srch_compact), GFP_NOFS);
if (sc == NULL) {
ret = -ENOMEM;
goto out;
}
scoutfs_block_writer_init(sb, &wri);
ret = scoutfs_client_srch_get_compact(sb, sc);
if (ret < 0 || sc->nr == 0)
goto out;
scoutfs_alloc_init(&alloc, &sc->meta_avail, &sc->meta_freed);
if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_LOG) {
ret = compact_logs(sb, &alloc, &wri, sc);
} else if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_SORTED) {
ret = compact_sorted(sb, &alloc, &wri, sc);
} else if (sc->flags & SCOUTFS_SRCH_COMPACT_FLAG_DELETE) {
ret = delete_files(sb, &alloc, &wri, sc);
} else {
ret = -EINVAL;
}
if (ret < 0)
goto commit;
ret = scoutfs_alloc_prepare_commit(sb, &alloc, &wri) ?:
scoutfs_block_writer_write(sb, &wri);
commit:
/* the server won't use our partial compact if _ERROR is set */
sc->meta_avail = alloc.avail;
sc->meta_freed = alloc.freed;
sc->flags |= ret < 0 ? SCOUTFS_SRCH_COMPACT_FLAG_ERROR : 0;
err = scoutfs_client_srch_commit_compact(sb, sc);
if (err < 0 && ret == 0)
ret = err;
out:
/* our allocators and files should be stable */
WARN_ON_ONCE(ret == -ESTALE);
if (ret < 0)
scoutfs_inc_counter(sb, srch_compact_error);
scoutfs_block_writer_forget_all(sb, &wri);
if (!atomic_read(&srinf->shutdown)) {
delay = (sc->nr > 0 && ret == 0) ? 0 : msecs_to_jiffies(SRCH_COMPACT_DELAY_MS);
queue_delayed_work(srinf->workq, &srinf->compact_dwork, delay);
}
kfree(sc);
}
void scoutfs_srch_destroy(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_SRCH_INFO(sb, srinf);
if (!srinf)
return;
if (srinf->workq) {
/* pending grace work queues normal work */
atomic_set(&srinf->shutdown, 1);
cancel_delayed_work_sync(&srinf->compact_dwork);
flush_workqueue(srinf->workq);
destroy_workqueue(srinf->workq);
}
kfree(srinf);
sbi->srch_info = NULL;
}
int scoutfs_srch_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct srch_info *srinf;
int ret;
srinf = kzalloc(sizeof(struct srch_info), GFP_KERNEL);
if (!srinf)
return -ENOMEM;
srinf->sb = sb;
atomic_set(&srinf->shutdown, 0);
INIT_DELAYED_WORK(&srinf->compact_dwork, scoutfs_srch_compact_worker);
sbi->srch_info = srinf;
srinf->workq = alloc_workqueue("scoutfs_srch_compact",
WQ_NON_REENTRANT | WQ_UNBOUND |
WQ_HIGHPRI, 0);
if (!srinf->workq) {
ret = -ENOMEM;
goto out;
}
queue_delayed_work(srinf->workq, &srinf->compact_dwork,
msecs_to_jiffies(SRCH_COMPACT_DELAY_MS));
ret = 0;
out:
if (ret)
scoutfs_srch_destroy(sb);
return ret;
}