scoutfs: compact using net requests

Currently compaction is only performed by one thread running in the
server.  Total metadata throughput of the system is limited by only
having one compaction operation in flight at a time.

This refactors the compaction code to have the server send compaction
requests to clients who then perform the compaction and send responses
to the server.  This spreads compaction load out amongst all the clients
and greatly increases total compaction throughput.

The manifest keeps track of compactions that are in flight at a given
level so that we maintain segment count invariants with multiple
compactions in flight.  It also uses the sparse bitmap to lock down
segments that are being used as inputs to avoid duplicating items across
two concurrent compactions.

A server thread still coordinates which segments are compacted.  The
search for a candidate compaction operation is largely unchanged.  It
now has to deal with being unable to process a compaction because its
segments are busy.  We add some logic to keep searching in a level until
we find a compaction that doesn't intersect with current compaction
requests.  If there are none at the level we move up to the next level.

The server will only issue a given number of compaction requests to a
client at a time.  When it needs to send a compaction request it rotates
through the current clients until it finds one that doesn't have the max
in flight.

If a client disconnects the server forgets the compactions it had sent
to that client.  If those compactions still need to be processed they'll
be sent to the next client.

The segnos that are allocated for compaction are not reclaimed if a
client disconnects or the server crashes.  This is a known deficiency
that will be addressed with the broader work to add crash recovery to
the multiple points in the protocol where the server and client trade
ownership of persistent state.

The server needs to block as it does work for compaction in the
notify_up and response callbacks.  We move them out from under spin
locks.

The server needs to clean up allocated segnos for a compaction request
that fails.  We let the client send a data payload along with an error
response so that it can give the server the id of the compaction that
failed.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2018-07-27 09:47:57 -07:00
committed by Zach Brown
parent 07eec357ee
commit 2cc990406a
13 changed files with 1331 additions and 345 deletions

View File

@@ -357,6 +357,63 @@ out:
}
}
/*
* Perform a compaction in the client as requested by the server. The
* server has protected the input segments and allocated the output
* segnos for us. This executes in work queued by the client's net
* connection. It only reads and write segments. The server will
* update the manifest and allocators while processing the response. An
* error response includes the compaction id so that the server can
* clean it up.
*
* If we get duplicate requests across a reconnected socket we can have
* two workers performing the same compaction simultaneously. This
* isn't particularly efficient but it's rare and won't corrupt the
* output. Our response can be lost if the socket is shutdown while
* it's in flight, the server deals with this.
*/
static int client_compact(struct super_block *sb,
struct scoutfs_net_connection *conn,
u8 cmd, u64 id, void *arg, u16 arg_len)
{
struct scoutfs_net_compact_response *resp = NULL;
struct scoutfs_net_compact_request *req;
int ret;
if (arg_len != sizeof(struct scoutfs_net_compact_request)) {
ret = -EINVAL;
goto out;
}
req = arg;
trace_scoutfs_client_compact_start(sb, le64_to_cpu(req->id),
req->last_level, req->flags);
resp = kzalloc(sizeof(struct scoutfs_net_compact_response), GFP_NOFS);
if (!resp) {
ret = -ENOMEM;
} else {
resp->id = req->id;
ret = scoutfs_compact(sb, req, resp);
}
trace_scoutfs_client_compact_stop(sb, le64_to_cpu(req->id), ret);
if (ret < 0)
ret = scoutfs_net_response(sb, conn, cmd, id, ret,
&req->id, sizeof(req->id));
else
ret = scoutfs_net_response(sb, conn, cmd, id, 0,
resp, sizeof(*resp));
kfree(resp);
out:
return ret;
}
static scoutfs_net_request_t client_req_funcs[] = {
[SCOUTFS_NET_CMD_COMPACT] = client_compact,
};
/*
* Called when either a connect attempt or established connection times
* out and fails.
@@ -405,9 +462,8 @@ int scoutfs_client_setup(struct super_block *sb)
INIT_DELAYED_WORK(&client->connect_dwork,
scoutfs_client_connect_worker);
/* client doesn't process any incoming requests yet */
client->conn = scoutfs_net_alloc_conn(sb, NULL, client_notify_down, 0,
NULL, "client");
client_req_funcs, "client");
if (!client->conn) {
ret = -ENOMEM;
goto out;

View File

@@ -13,6 +13,7 @@
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/sort.h>
#include "super.h"
#include "format.h"
@@ -30,10 +31,6 @@
* segments in each level of the lsm tree and is what merges duplicate
* and deletion keys.
*
* When the manifest is modified in a way that requires compaction it
* kicks the compaction thread. The compaction thread calls into the
* manifest to find the segments that need to be compaction.
*
* The compaction operation itself always involves a single "upper"
* segment at a given level and a limited number of "lower" segments at
* the next higher level whose key range intersects with the upper
@@ -42,23 +39,10 @@
* Compaction proceeds by iterating over the items in the upper segment
* and items in each of the lower segments in sort order. The items
* from the two input segments are copied into new output segments in
* sorted order. Item space is reclaimed as duplicate or deletion items
* are removed.
*
* Once the compaction is completed the manifest is updated to remove
* the input segments and add the output segments. Here segment space
* is reclaimed when the input items fit in fewer output segments.
* sorted order. Space is reclaimed as duplicate or deletion items are
* removed and fewer segments are written than were read.
*/
struct compact_info {
struct super_block *sb;
struct workqueue_struct *workq;
struct work_struct work;
};
#define DECLARE_COMPACT_INFO(sb, name) \
struct compact_info *name = SCOUTFS_SB(sb)->compact_info
struct compact_seg {
struct list_head entry;
@@ -72,16 +56,12 @@ struct compact_seg {
bool part_of_move;
};
/*
* A compaction request. It's filled up in scoutfs_compact_add() as
* the manifest is wlaked and it finds segments involved in the compaction.
*/
struct compact_cursor {
struct list_head csegs;
/* buffer holds allocations and our returning them */
u64 segnos[SCOUTFS_COMPACTION_MAX_UPDATE];
unsigned nr_segnos;
u64 segnos[SCOUTFS_COMPACTION_MAX_OUTPUT];
unsigned int nr_segnos;
u8 lower_level;
u8 last_level;
@@ -371,6 +351,12 @@ static int compact_segments(struct super_block *sb,
break;
}
/* didn't get enough segnos */
if (next_segno >= curs->nr_segnos) {
ret = -ENOSPC;
break;
}
cseg->segno = curs->segnos[next_segno];
curs->segnos[next_segno] = 0;
next_segno++;
@@ -435,153 +421,216 @@ static int compact_segments(struct super_block *sb,
}
/*
* Manifest walking is providing the details of the overall compaction
* operation.
* We want all the non-zero segnos sorted at the front of the array
* and the empty segnos all packed at the end. This is easily done by
* subtracting one from both then comparing as usual. All relations hold
* except that 0 becomes the greatest instead of the least.
*/
void scoutfs_compact_describe(struct super_block *sb, void *data,
u8 upper_level, u8 last_level, bool sticky)
static int sort_cmp_segnos(const void *A, const void *B)
{
struct compact_cursor *curs = data;
const u64 a = *(const u64 *)A - 1;
const u64 b = *(const u64 *)B - 1;
curs->lower_level = upper_level + 1;
curs->last_level = last_level;
curs->sticky = sticky;
return a < b ? -1 : a > b ? 1 : 0;
}
/*
* Add a segment involved in the compaction operation.
*
* XXX Today we know that the caller is always adding only one upper segment
* and is then possibly adding all the lower overlapping segments.
*/
int scoutfs_compact_add(struct super_block *sb, void *data,
struct scoutfs_manifest_entry *ment)
static void sort_swap_segnos(void *A, void *B, int size)
{
struct compact_cursor *curs = data;
struct compact_seg *cseg;
int ret;
u64 *a = A;
u64 *b = B;
cseg = alloc_cseg(sb, &ment->first, &ment->last);
if (!cseg) {
ret = -ENOMEM;
swap(*a, *b);
}
static int verify_request(struct super_block *sb,
struct scoutfs_net_compact_request *req)
{
int ret = -EINVAL;
int nr_segnos;
int nr_ents;
int i;
/* no unknown flags */
if (req->flags & ~SCOUTFS_NET_COMPACT_FLAG_STICKY)
goto out;
/* find the number of segments and entries */
for (i = 0; i < ARRAY_SIZE(req->segnos); i++) {
if (req->segnos[i] == 0)
break;
}
nr_segnos = i;
for (i = 0; i < ARRAY_SIZE(req->ents); i++) {
if (req->ents[i].segno == 0)
break;
}
nr_ents = i;
/* must have at least an upper */
if (nr_ents == 0)
goto out;
sort(req->segnos, nr_segnos, sizeof(req->segnos[i]),
sort_cmp_segnos, sort_swap_segnos);
/* segnos must be unique */
for (i = 1; i < nr_segnos; i++) {
if (req->segnos[i] == req->segnos[i - 1])
goto out;
}
list_add_tail(&cseg->entry, &curs->csegs);
/* if we have a lower it must be under upper */
if (nr_ents > 1 && (req->ents[1].level != req->ents[0].level + 1))
goto out;
cseg->segno = ment->segno;
cseg->seq = ment->seq;
cseg->level = ment->level;
/* make sure lower ents are on the same level */
for (i = 2; i < nr_ents; i++) {
if (req->ents[i].level != req->ents[i - 1].level)
goto out;
}
if (!curs->upper)
curs->upper = cseg;
else if (!curs->lower)
curs->lower = cseg;
if (curs->lower)
curs->last_lower = cseg;
for (i = 1; i < nr_ents; i++) {
/* lowers must overlap with upper */
if (scoutfs_key_compare_ranges(&req->ents[0].first,
&req->ents[0].last,
&req->ents[i].first,
&req->ents[i].last) != 0)
goto out;
/* lowers must be on the level below upper */
if (req->ents[i].level != req->ents[0].level + 1)
goto out;
}
/* last level must include lowest level */
if (req->last_level < req->ents[nr_ents - 1].level)
goto out;
for (i = 2; i < nr_ents; i++) {
/* lowers must be sorted by first key */
if (scoutfs_key_compare(&req->ents[i].first,
&req->ents[i - 1].first) <= 0)
goto out;
/* lowers must not overlap with each other */
if (scoutfs_key_compare_ranges(&req->ents[i].first,
&req->ents[i].last,
&req->ents[i - 1].first,
&req->ents[i - 1].last) == 0)
goto out;
}
ret = 0;
out:
if (WARN_ON_ONCE(ret < 0)) {
scoutfs_inc_counter(sb, compact_invalid_request);
printk("id %llu last_level %u flags 0x%x\n",
le64_to_cpu(req->id), req->last_level, req->flags);
printk("segnos: ");
for (i = 0; i < ARRAY_SIZE(req->segnos); i++)
printk("%llu ", le64_to_cpu(req->segnos[i]));
printk("\n");
printk("entries: ");
for (i = 0; i < ARRAY_SIZE(req->ents); i++) {
printk(" [%u] segno %llu seq %llu level %u first "SK_FMT" last "SK_FMT"\n",
i, le64_to_cpu(req->ents[i].segno),
le64_to_cpu(req->ents[i].seq),
req->ents[i].level,
SK_ARG(&req->ents[i].first),
SK_ARG(&req->ents[i].last));
}
printk("\n");
}
return ret;
}
/*
* Give the compaction cursor a segno to allocate from.
*/
void scoutfs_compact_add_segno(struct super_block *sb, void *data, u64 segno)
{
struct compact_cursor *curs = data;
curs->segnos[curs->nr_segnos++] = segno;
}
/*
* Commit the result of a compaction based on the state of the cursor.
* The server caller stops the manifest from being written while we're
* making changes. We lock the manifest to atomically make our changes.
* Translate the compaction request into our native structs that we use
* to perform the compaction. The caller has verified that the request
* satisfies our constraints.
*
* The erorr handling is sketchy here because calling the manifest from
* here is temporary. We should be sending a message to the server
* instead of calling the allocator and manifest.
* If we return an error the caller will clean up a partially prepared
* cursor.
*/
int scoutfs_compact_commit(struct super_block *sb, void *c, void *r)
static int prepare_curs(struct super_block *sb, struct compact_cursor *curs,
struct scoutfs_net_compact_request *req)
{
struct scoutfs_manifest_entry ment;
struct compact_cursor *curs = c;
struct list_head *results = r;
struct compact_seg *cseg;
int ret;
int ret = 0;
int i;
/* free unused segnos that were allocated for the compaction */
for (i = 0; i < curs->nr_segnos; i++) {
if (curs->segnos[i]) {
ret = scoutfs_server_free_segno(sb, curs->segnos[i]);
BUG_ON(ret);
}
curs->lower_level = req->ents[0].level + 1;
curs->last_level = req->last_level;
curs->sticky = !!(req->flags & SCOUTFS_NET_COMPACT_FLAG_STICKY);
for (i = 0; i < ARRAY_SIZE(req->segnos); i++) {
if (req->segnos[i] == 0)
break;
curs->segnos[i] = le64_to_cpu(req->segnos[i]);
}
curs->nr_segnos = i;
scoutfs_manifest_lock(sb);
for (i = 0; i < ARRAY_SIZE(req->ents); i++) {
if (req->ents[i].segno == 0)
break;
/* delete input segments, probably freeing their segnos */
list_for_each_entry(cseg, &curs->csegs, entry) {
if (!cseg->part_of_move) {
ret = scoutfs_server_free_segno(sb, cseg->segno);
BUG_ON(ret);
scoutfs_init_ment_from_net(&ment, &req->ents[i]);
cseg = alloc_cseg(sb, &ment.first, &ment.last);
if (!cseg) {
ret = -ENOMEM;
break;
}
scoutfs_manifest_init_entry(&ment, cseg->level, 0, cseg->seq,
&cseg->first, NULL);
ret = scoutfs_manifest_del(sb, &ment);
BUG_ON(ret);
list_add_tail(&cseg->entry, &curs->csegs);
cseg->segno = ment.segno;
cseg->seq = ment.seq;
cseg->level = ment.level;
if (!curs->upper)
curs->upper = cseg;
else if (!curs->lower)
curs->lower = cseg;
if (curs->lower)
curs->last_lower = cseg;
}
/* add output entries */
list_for_each_entry(cseg, results, entry) {
/* XXX moved upper segments won't have read the segment :P */
if (cseg->seg)
scoutfs_seg_init_ment(&ment, cseg->level, cseg->seg);
else
scoutfs_manifest_init_entry(&ment, cseg->level,
cseg->segno, cseg->seq,
&cseg->first, &cseg->last);
ret = scoutfs_manifest_add(sb, &ment);
BUG_ON(ret);
}
scoutfs_manifest_unlock(sb);
return 0;
return ret;
}
/*
* The compaction worker tries to make forward progress with compaction
* every time its kicked. It pretends to send a message requesting
* compaction parameters but in reality the net request function there
* is calling directly into the manifest and back into our compaction
* add routines.
* Perform a compaction by translating the incoming request into our
* working state, iterating over input segments and write output
* segments, then generating the response that describes the output
* segments.
*
* We always try to clean up everything on errors.
* The server will either commit our response or cleanup the request
* if we return an error that the caller sends in response.
*/
static void scoutfs_compact_func(struct work_struct *work)
int scoutfs_compact(struct super_block *sb,
struct scoutfs_net_compact_request *req,
struct scoutfs_net_compact_response *resp)
{
struct compact_info *ci = container_of(work, struct compact_info, work);
struct super_block *sb = ci->sb;
struct compact_cursor curs = {{NULL,}};
struct scoutfs_manifest_entry ment;
struct scoutfs_bio_completion comp;
struct compact_seg *cseg;
LIST_HEAD(results);
int ret;
int err;
int nr;
INIT_LIST_HEAD(&curs.csegs);
scoutfs_bio_init_comp(&comp);
ret = scoutfs_client_get_compaction(sb, (void *)&curs);
/* short circuit no compaction work to do */
if (ret == 0 && list_empty(&curs.csegs))
return;
ret = verify_request(sb, req) ?:
prepare_curs(sb, &curs, req);
if (ret)
goto out;
/* trace compaction ranges */
list_for_each_entry(cseg, &curs.csegs, entry) {
@@ -590,84 +639,40 @@ static void scoutfs_compact_func(struct work_struct *work)
&cseg->last);
}
if (ret == 0 && !list_empty(&curs.csegs)) {
ret = compact_segments(sb, &curs, &comp, &results);
ret = compact_segments(sb, &curs, &comp, &results);
/* always wait for io completion */
err = scoutfs_bio_wait_comp(sb, &comp);
if (!ret && err)
ret = err;
}
/* don't update manifest on error, just free segnos */
if (ret) {
list_for_each_entry(cseg, &results, entry) {
if (!cseg->part_of_move)
curs.segnos[curs.nr_segnos++] = cseg->segno;
}
free_cseg_list(sb, &curs.csegs);
free_cseg_list(sb, &results);
}
err = scoutfs_client_finish_compaction(sb, &curs, &results);
/* always wait for io completion */
err = scoutfs_bio_wait_comp(sb, &comp);
if (!ret && err)
ret = err;
if (ret)
goto out;
/* fill entries for written output segments */
nr = 0;
list_for_each_entry(cseg, &results, entry) {
/* XXX moved upper segments won't have read the segment :P */
if (cseg->seg)
scoutfs_seg_init_ment(&ment, cseg->level, cseg->seg);
else
scoutfs_manifest_init_entry(&ment, cseg->level,
cseg->segno, cseg->seq,
&cseg->first, &cseg->last);
trace_scoutfs_compact_output(sb, ment.level, ment.segno,
ment.seq, &ment.first,
&ment.last);
scoutfs_init_ment_to_net(&resp->ents[nr++], &ment);
}
ret = 0;
out:
if (ret == -ESTALE)
scoutfs_inc_counter(sb, compact_stale_error);
free_cseg_list(sb, &curs.csegs);
free_cseg_list(sb, &results);
if (ret == -ESTALE)
scoutfs_inc_counter(sb, compact_stale_error);
WARN_ON_ONCE(ret && ret != -ESTALE);
trace_scoutfs_compact_func(sb, ret);
}
void scoutfs_compact_kick(struct super_block *sb)
{
DECLARE_COMPACT_INFO(sb, ci);
queue_work(ci->workq, &ci->work);
}
int scoutfs_compact_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct compact_info *ci;
ci = kzalloc(sizeof(struct compact_info), GFP_KERNEL);
if (!ci)
return -ENOMEM;
ci->sb = sb;
INIT_WORK(&ci->work, scoutfs_compact_func);
ci->workq = alloc_workqueue("scoutfs_compact", 0, 1);
if (!ci->workq) {
kfree(ci);
return -ENOMEM;
}
sbi->compact_info = ci;
return 0;
}
/*
* The system should be idle, there should not be any more manifest
* modification which would kick compaction.
*/
void scoutfs_compact_destroy(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_COMPACT_INFO(sb, ci);
if (ci) {
/* stop compaction from requeueing itself */
cancel_work_sync(&ci->work);
destroy_workqueue(ci->workq);
sbi->compact_info = NULL;
kfree(ci);
}
return ret;
}

View File

@@ -1,16 +1,8 @@
#ifndef _SCOUTFS_COMPACT_H_
#define _SCOUTFS_COMPACT_H_
void scoutfs_compact_kick(struct super_block *sb);
void scoutfs_compact_describe(struct super_block *sb, void *data,
u8 upper_level, u8 last_level, bool sticky);
int scoutfs_compact_add(struct super_block *sb, void *data,
struct scoutfs_manifest_entry *ment);
void scoutfs_compact_add_segno(struct super_block *sb, void *data, u64 segno);
int scoutfs_compact_commit(struct super_block *sb, void *c, void *r);
int scoutfs_compact_setup(struct super_block *sb);
void scoutfs_compact_destroy(struct super_block *sb);
int scoutfs_compact(struct super_block *sb,
struct scoutfs_net_compact_request *req,
struct scoutfs_net_compact_response *resp);
#endif

View File

@@ -15,7 +15,9 @@
EXPAND_COUNTER(btree_read_error) \
EXPAND_COUNTER(btree_stale_read) \
EXPAND_COUNTER(btree_write_error) \
EXPAND_COUNTER(compact_invalid_request) \
EXPAND_COUNTER(compact_operations) \
EXPAND_COUNTER(compact_segment_busy) \
EXPAND_COUNTER(compact_segment_moved) \
EXPAND_COUNTER(compact_segment_read) \
EXPAND_COUNTER(compact_segment_write_bytes) \

View File

@@ -369,6 +369,7 @@ struct scoutfs_super_block {
struct scoutfs_btree_ring bring;
__le64 next_seg_seq;
__le64 next_node_id;
__le64 next_compact_id;
struct scoutfs_btree_root alloc_root;
struct scoutfs_manifest manifest;
struct scoutfs_inet_addr server_addr;
@@ -548,6 +549,7 @@ enum {
SCOUTFS_NET_CMD_GET_LAST_SEQ,
SCOUTFS_NET_CMD_GET_MANIFEST_ROOT,
SCOUTFS_NET_CMD_STATFS,
SCOUTFS_NET_CMD_COMPACT,
SCOUTFS_NET_CMD_UNKNOWN,
};
@@ -623,15 +625,49 @@ struct scoutfs_net_extent_list {
/* arbitrarily makes a nice ~1k extent list payload */
#define SCOUTFS_NET_EXTENT_LIST_MAX_NR 64
/* XXX eventually we'll have net compaction and will need agents to agree */
/* one upper segment and fanout lower segments */
#define SCOUTFS_COMPACTION_MAX_INPUT (1 + SCOUTFS_MANIFEST_FANOUT)
/* sticky can add one, and so can item page alignment */
#define SCOUTFS_COMPACTION_SLOP 2
/* delete all inputs and insert all outputs (same goes for alloc|free segnos) */
#define SCOUTFS_COMPACTION_MAX_UPDATE \
(2 * (SCOUTFS_COMPACTION_MAX_INPUT + SCOUTFS_COMPACTION_SLOP))
#define SCOUTFS_COMPACTION_MAX_INPUT (1 + SCOUTFS_MANIFEST_FANOUT)
/* sticky can split the input and item alignment padding can add a lower */
#define SCOUTFS_COMPACTION_SEGNO_OVERHEAD 2
#define SCOUTFS_COMPACTION_MAX_OUTPUT \
(SCOUTFS_COMPACTION_MAX_INPUT + SCOUTFS_COMPACTION_SEGNO_OVERHEAD)
/*
* A compact request is sent by the server to the client. It provides
* the input segments and enough allocated segnos to write the results.
* The id uniquely identifies this compaction request and is included in
* the response to clean up its allocated resources.
*/
struct scoutfs_net_compact_request {
__le64 id;
__u8 last_level;
__u8 flags;
__le64 segnos[SCOUTFS_COMPACTION_MAX_OUTPUT];
struct scoutfs_net_manifest_entry ents[SCOUTFS_COMPACTION_MAX_INPUT];
} __packed;
/*
* A sticky compaction has more lower level segments that overlap with
* the end of the upper after the last lower level segment included in
* the compaction. Items left in the upper segment after the last lower
* need to be written to the upper level instead of the lower. The
* upper segment "sticks" in place instead of moving down to the lower
* level.
*/
#define SCOUTFS_NET_COMPACT_FLAG_STICKY (1 << 0)
/*
* A compact response is sent by the client to the server. It describes
* the written output segments that need to be added to the manifest.
* The server compares the response to the request to free unused
* allocated segnos and input manifest entries. An empty response is
* valid and can happen if, say, the upper input segment completely
* deleted all the items in a single overlapping lower segment.
*/
struct scoutfs_net_compact_response {
__le64 id;
struct scoutfs_net_manifest_entry ents[SCOUTFS_COMPACTION_MAX_OUTPUT];
} __packed;
/*
* Scoutfs file handle structure - this can be copied out to userspace

View File

@@ -29,6 +29,7 @@
#include "counters.h"
#include "triggers.h"
#include "client.h"
#include "spbm.h"
#include "scoutfs_trace.h"
/*
@@ -47,6 +48,8 @@ struct manifest {
/* calculated on mount, const thereafter */
u64 level_limits[SCOUTFS_MANIFEST_MAX_LEVEL + 1];
u64 compacts_pending[SCOUTFS_MANIFEST_MAX_LEVEL + 1];
struct scoutfs_spbm segno_busy;
unsigned long flags;
@@ -884,6 +887,121 @@ out:
return ret;
}
static bool level_should_compact(struct super_block *sb, int level)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
BUG_ON(!rwsem_is_locked(&mani->rwsem));
return ((s64)le64_to_cpu(super->manifest.level_counts[level]) -
(s64)mani->compacts_pending[level]) > mani->level_limits[level];
}
int scoutfs_manifest_should_compact(struct super_block *sb)
{
DECLARE_MANIFEST(sb, mani);
bool should = false;
int level;
down_read(&mani->rwsem);
for (level = mani->nr_levels - 1; level >= 0; level--) {
if (level_should_compact(sb, level)) {
should = true;
break;
}
}
up_read(&mani->rwsem);
return should;
}
/*
* Record that a compaction operation is in flight. We mark the segnos
* involved so that we don't use them as inputs for other compactions
* and assume that the compaction will delete a segment from its upper
* level when deciding what level to compact.
*/
static int start_compact_request(struct super_block *sb,
struct scoutfs_net_compact_request *req)
{
DECLARE_MANIFEST(sb, mani);
int level;
int ret = 0;
int i;
BUG_ON(!rwsem_is_locked(&mani->rwsem));
for (i = 0; i < ARRAY_SIZE(req->ents); i++) {
if (req->ents[i].segno == 0)
break;
ret = scoutfs_spbm_set(&mani->segno_busy,
le64_to_cpu(req->ents[i].segno));
if (ret) {
while (i-- > 0)
scoutfs_spbm_clear(&mani->segno_busy,
le64_to_cpu(req->ents[i].segno));
break;
}
}
if (ret == 0) {
level = req->ents[0].level;
mani->compacts_pending[level]++;
}
return ret;
}
/*
* A compaction request has completed. No longer account for it in the
* level pending counts and stop tracking all its segments.
*
* This can be called in error paths with an empty zeroed request and it
* will do nothing.
*/
void scoutfs_manifest_compact_done(struct super_block *sb,
struct scoutfs_net_compact_request *req)
{
DECLARE_MANIFEST(sb, mani);
int level;
int i;
down_write(&mani->rwsem);
for (i = 0; i < ARRAY_SIZE(req->ents); i++) {
if (req->ents[i].segno == 0)
break;
scoutfs_spbm_clear(&mani->segno_busy,
le64_to_cpu(req->ents[i].segno));
}
if (i > 0) {
level = req->ents[0].level;
mani->compacts_pending[level]--;
}
up_write(&mani->rwsem);
}
static int add_entry_unless_busy(struct super_block *sb,
struct scoutfs_net_compact_request *req,
unsigned int ind,
struct scoutfs_manifest_entry *ment)
{
DECLARE_MANIFEST(sb, mani);
if (scoutfs_spbm_test(&mani->segno_busy, ment->segno)) {
scoutfs_inc_counter(sb, compact_segment_busy);
return -EAGAIN;
}
scoutfs_init_ment_to_net(&req->ents[ind], ment);
return 0;
}
/*
* Give the caller the segments that will be involved in the next
* compaction.
@@ -898,15 +1016,19 @@ out:
* We add all the segments to the compaction caller's data and let it do
* its thing. It'll allocate and free segments and update the manifest.
*
* Returns the number of input segments or -errno.
* Returns:
* 0: no compactions were needed at the given level
* > 0: number of total imput segments in the compaction
* -EAGAIN: segments were already in a pending compaction
* -errno: fatal error
*
* XXX this will get a lot more clever:
* - ensuring concurrent compactions don't overlap
* XXX this could be more clever:
* - prioritize segments with deletion or incremental records
* - prioritize partial segments
* - maybe compact segments by age in a given level
*/
int scoutfs_manifest_next_compact(struct super_block *sb, void *data)
static int next_compact_req(struct super_block *sb, int level,
struct scoutfs_net_compact_request *req)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
@@ -918,30 +1040,17 @@ int scoutfs_manifest_next_compact(struct super_block *sb, void *data)
SCOUTFS_BTREE_ITEM_REF(iref);
SCOUTFS_BTREE_ITEM_REF(over_iref);
SCOUTFS_BTREE_ITEM_REF(prev);
struct scoutfs_key zeros;
static struct scoutfs_key zeros;
bool wrapped;
bool sticky;
int level;
int ret;
int nr = 0;
int i;
scoutfs_key_set_zeros(&zeros);
memset(req, 0, sizeof(*req));
down_write(&mani->rwsem);
for (level = mani->nr_levels - 1; level >= 0; level--) {
if (le64_to_cpu(super->manifest.level_counts[level]) >
mani->level_limits[level])
break;
}
trace_scoutfs_manifest_next_compact(sb, level);
if (level < 0) {
ret = 0;
goto out;
}
BUG_ON(!rwsem_is_locked(&mani->rwsem));
/* fill ment and ret == 0 if we find an entry at the level */
if (level == 0) {
@@ -1003,9 +1112,9 @@ again:
}
/* add the upper input segment */
ret = scoutfs_compact_add(sb, data, &ment);
ret = add_entry_unless_busy(sb, req, nr, &ment);
if (ret)
goto out;
goto skip;
nr++;
/* and add a fanout's worth of lower overlapping segments */
@@ -1029,7 +1138,7 @@ again:
break;
}
ret = scoutfs_compact_add(sb, data, &over);
ret = add_entry_unless_busy(sb, req, nr, &over);
if (ret)
goto out;
nr++;
@@ -1042,16 +1151,20 @@ again:
if (ret < 0 && ret != -ENOENT)
goto out;
scoutfs_compact_describe(sb, data, level, mani->nr_levels - 1, sticky);
req->last_level = mani->nr_levels - 1;
if (sticky)
req->flags |= SCOUTFS_NET_COMPACT_FLAG_STICKY;
ret = start_compact_request(sb, req);
if (ret)
goto out;
ret = 0;
skip:
/* record the next key to start from */
mani->compact_keys[level] = ment.last;
scoutfs_key_inc(&mani->compact_keys[level]);
ret = 0;
out:
up_write(&mani->rwsem);
scoutfs_btree_put_iref(&iref);
scoutfs_btree_put_iref(&over_iref);
scoutfs_btree_put_iref(&prev);
@@ -1059,6 +1172,81 @@ out:
return ret ?: nr;
}
/*
* Find the next segment to compact into its lower overlapping segments.
* Fill out the callers request describing all the segments involved in
* the operation.
*
* First we search for a level to compact. A level needs compaction if
* it has more segments than its limit. We search from the bottom up
* because segments are written at the top when there's space. By
* compacting from the bottom we pull new segments down until there's
* space. If we compacted from the top down then we could create an
* imbalanced top-heavy structure.
*
* At each level we find the segment from a cursor and try to compact it
* into its lower segments. Any of the segments involved could already
* be part of a pending compaction and need to be skipped. In that case
* we move to the next segment at the level. All the segments at the
* level could be busy so we detect when we skip to the first value we
* skipped to and move on.
*
* If we return a filled compact request then we've tracked it. We
* assume it will delete an upper segment and have marked all its segnos
* as busy so they won't be used by future compaction requests. The
* caller must call complete_done when the compact operation completes.
*/
int scoutfs_manifest_next_compact(struct super_block *sb,
struct scoutfs_net_compact_request *req)
{
DECLARE_MANIFEST(sb, mani);
struct scoutfs_key key = {0,};
bool first;
int level;
int ret = 0;
memset(req, 0, sizeof(*req));
down_write(&mani->rwsem);
for (level = mani->nr_levels - 1; level >= 0; level--) {
if (!level_should_compact(sb, level))
continue;
first = true;
for (;;) {
ret = next_compact_req(sb, level, req);
if (ret > 0 || (ret < 0 && ret != -EAGAIN))
goto out;
if (ret == 0)
break;
/* remember first skip and keep going */
if (first) {
first = false;
key = mani->compact_keys[level];
continue;
}
/* bail if we looped around */
if (!scoutfs_key_compare(&key,
&mani->compact_keys[level])) {
ret = 0;
break;
}
}
/* continue to next level */
}
out:
up_write(&mani->rwsem);
trace_scoutfs_manifest_next_compact(sb, level, ret);
return ret;
}
int scoutfs_manifest_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
@@ -1071,6 +1259,7 @@ int scoutfs_manifest_setup(struct super_block *sb)
return -ENOMEM;
init_rwsem(&mani->rwsem);
scoutfs_spbm_init(&mani->segno_busy);
for (i = 0; i < ARRAY_SIZE(mani->compact_keys); i++)
scoutfs_key_set_zeros(&mani->compact_keys[i]);
@@ -1101,6 +1290,7 @@ void scoutfs_manifest_destroy(struct super_block *sb)
struct manifest *mani = sbi->manifest;
if (mani) {
scoutfs_spbm_destroy(&mani->segno_busy);
kfree(mani);
sbi->manifest = NULL;
}

View File

@@ -38,7 +38,11 @@ int scoutfs_manifest_read_items(struct super_block *sb,
int scoutfs_manifest_next_key(struct super_block *sb, struct scoutfs_key *key,
struct scoutfs_key *next_key);
int scoutfs_manifest_next_compact(struct super_block *sb, void *data);
int scoutfs_manifest_should_compact(struct super_block *sb);
int scoutfs_manifest_next_compact(struct super_block *sb,
struct scoutfs_net_compact_request *req);
void scoutfs_manifest_compact_done(struct super_block *sb,
struct scoutfs_net_compact_request *req);
bool scoutfs_manifest_level0_full(struct super_block *sb);

View File

@@ -348,7 +348,7 @@ static int submit_send(struct super_block *sb,
WARN_ON_ONCE(flags & SCOUTFS_NET_FLAGS_UNKNOWN) ||
WARN_ON_ONCE(net_err >= SCOUTFS_NET_ERR_UNKNOWN) ||
WARN_ON_ONCE(data_len > SCOUTFS_NET_MAX_DATA_LEN) ||
WARN_ON_ONCE(data_len && (!data || net_err)) ||
WARN_ON_ONCE(data_len && data == NULL) ||
WARN_ON_ONCE(net_err && (!(flags & SCOUTFS_NET_FLAG_RESPONSE))) ||
WARN_ON_ONCE(id == 0 && (flags & SCOUTFS_NET_FLAG_RESPONSE)))
return -EINVAL;
@@ -430,12 +430,13 @@ static void saw_valid_greeting(struct scoutfs_net_connection *conn, u64 node_id)
conn->valid_greeting = 1;
conn->node_id = node_id;
if (conn->notify_up)
conn->notify_up(sb, conn, conn->info, node_id);
list_splice_tail_init(&conn->resend_queue, &conn->send_queue);
queue_work(conn->workq, &conn->send_work);
spin_unlock(&conn->lock);
if (conn->notify_up)
conn->notify_up(sb, conn, conn->info, node_id);
}
/*
@@ -589,10 +590,6 @@ static bool invalid_message(struct scoutfs_net_header *nh)
nh->error >= SCOUTFS_NET_ERR_UNKNOWN)
return true;
/* errors can't have payloads */
if (nh->data_len != 0 && nh->error != SCOUTFS_NET_ERR_NONE)
return true;
/* payloads have a limit */
if (le16_to_cpu(nh->data_len) > SCOUTFS_NET_MAX_DATA_LEN)
return true;

View File

@@ -998,21 +998,24 @@ TRACE_EVENT(scoutfs_xattr_set,
);
TRACE_EVENT(scoutfs_manifest_next_compact,
TP_PROTO(struct super_block *sb, int level),
TP_PROTO(struct super_block *sb, int level, int ret),
TP_ARGS(sb, level),
TP_ARGS(sb, level, ret),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(int, level)
__field(int, ret)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->level = level;
__entry->ret = ret;
),
TP_printk(FSID_FMT" level %d", __entry->fsid, __entry->level)
TP_printk(FSID_FMT" level %d ret %d", __entry->fsid, __entry->level,
__entry->ret)
);
TRACE_EVENT(scoutfs_advance_dirty_super,
@@ -1069,22 +1072,127 @@ TRACE_EVENT(scoutfs_dir_add_next_linkref,
__entry->found_dir_ino, __entry->name_len)
);
TRACE_EVENT(scoutfs_compact_func,
TP_PROTO(struct super_block *sb, int ret),
TRACE_EVENT(scoutfs_client_compact_start,
TP_PROTO(struct super_block *sb, u64 id, u8 last_level, u8 flags),
TP_ARGS(sb, ret),
TP_ARGS(sb, id, last_level, flags),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u64, id)
__field(__u8, last_level)
__field(__u8, flags)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->id = id;
__entry->last_level = last_level;
__entry->flags = flags;
),
TP_printk("fsid "FSID_FMT" id %llu last_level %u flags 0x%x",
__entry->fsid, __entry->id, __entry->last_level,
__entry->flags)
);
TRACE_EVENT(scoutfs_client_compact_stop,
TP_PROTO(struct super_block *sb, u64 id, int ret),
TP_ARGS(sb, id, ret),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u64, id)
__field(int, ret)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->id = id;
__entry->ret = ret;
),
TP_printk(FSID_FMT" ret %d", __entry->fsid, __entry->ret)
TP_printk("fsid "FSID_FMT" id %llu ret %d",
__entry->fsid, __entry->id, __entry->ret)
);
TRACE_EVENT(scoutfs_server_compact_start,
TP_PROTO(struct super_block *sb, u64 id, u8 level, u64 node_id,
unsigned long client_nr, unsigned long server_nr,
unsigned long per_client),
TP_ARGS(sb, id, level, node_id, client_nr, server_nr, per_client),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u64, id)
__field(__u8, level)
__field(__u64, node_id)
__field(unsigned long, client_nr)
__field(unsigned long, server_nr)
__field(unsigned long, per_client)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->id = id;
__entry->level = level;
__entry->node_id = node_id;
__entry->client_nr = client_nr;
__entry->server_nr = server_nr;
__entry->per_client = per_client;
),
TP_printk("fsid "FSID_FMT" id %llu level %u node_id %llu client_nr %lu server_nr %lu per_client %lu",
__entry->fsid, __entry->id, __entry->level, __entry->node_id,
__entry->client_nr, __entry->server_nr, __entry->per_client)
);
TRACE_EVENT(scoutfs_server_compact_done,
TP_PROTO(struct super_block *sb, u64 id, u64 node_id,
unsigned long server_nr),
TP_ARGS(sb, id, node_id, server_nr),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u64, id)
__field(__u64, node_id)
__field(unsigned long, server_nr)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->id = id;
__entry->node_id = node_id;
__entry->server_nr = server_nr;
),
TP_printk("fsid "FSID_FMT" id %llu node_id %llu server_nr %lu",
__entry->fsid, __entry->id, __entry->node_id,
__entry->server_nr)
);
TRACE_EVENT(scoutfs_server_compact_response,
TP_PROTO(struct super_block *sb, u64 id, int error),
TP_ARGS(sb, id, error),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__u64, id)
__field(int, error)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->id = id;
__entry->error = error;
),
TP_printk("fsid "FSID_FMT" id %llu error %d",
__entry->fsid, __entry->id, __entry->error)
);
TRACE_EVENT(scoutfs_write_begin,
@@ -1277,6 +1385,12 @@ DEFINE_EVENT(scoutfs_manifest_class, scoutfs_compact_input,
TP_ARGS(sb, level, segno, seq, first, last)
);
DEFINE_EVENT(scoutfs_manifest_class, scoutfs_compact_output,
TP_PROTO(struct super_block *sb, u8 level, u64 segno, u64 seq,
struct scoutfs_key *first, struct scoutfs_key *last),
TP_ARGS(sb, level, segno, seq, first, last)
);
DEFINE_EVENT(scoutfs_manifest_class, scoutfs_read_item_segment,
TP_PROTO(struct super_block *sb, u8 level, u64 segno, u64 seq,
struct scoutfs_key *first, struct scoutfs_key *last),
@@ -1694,6 +1808,14 @@ DEFINE_EVENT(scoutfs_work_class, scoutfs_server_commit_work_exit,
TP_PROTO(struct super_block *sb, u64 data, int ret),
TP_ARGS(sb, data, ret)
);
DEFINE_EVENT(scoutfs_work_class, scoutfs_server_compact_work_enter,
TP_PROTO(struct super_block *sb, u64 data, int ret),
TP_ARGS(sb, data, ret)
);
DEFINE_EVENT(scoutfs_work_class, scoutfs_server_compact_work_exit,
TP_PROTO(struct super_block *sb, u64 data, int ret),
TP_ARGS(sb, data, ret)
);
DEFINE_EVENT(scoutfs_work_class, scoutfs_net_proc_work_enter,
TP_PROTO(struct super_block *sb, u64 data, int ret),
TP_ARGS(sb, data, ret)
@@ -2267,6 +2389,39 @@ DEFINE_EVENT(scoutfs_segno_class, scoutfs_free_segno,
TP_PROTO(struct super_block *sb, u64 segno),
TP_ARGS(sb, segno)
);
DEFINE_EVENT(scoutfs_segno_class, scoutfs_remove_segno,
TP_PROTO(struct super_block *sb, u64 segno),
TP_ARGS(sb, segno)
);
DECLARE_EVENT_CLASS(scoutfs_server_client_count_class,
TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients),
TP_ARGS(sb, node_id, nr_clients),
TP_STRUCT__entry(
__field(__u64, fsid)
__field(__s64, node_id)
__field(unsigned long, nr_clients)
),
TP_fast_assign(
__entry->fsid = FSID_ARG(sb);
__entry->node_id = node_id;
__entry->nr_clients = nr_clients;
),
TP_printk("fsid "FSID_FMT" node_id %llu nr_clients %lu",
__entry->fsid, __entry->node_id, __entry->nr_clients)
);
DEFINE_EVENT(scoutfs_server_client_count_class, scoutfs_server_client_up,
TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients),
TP_ARGS(sb, node_id, nr_clients)
);
DEFINE_EVENT(scoutfs_server_client_count_class, scoutfs_server_client_down,
TP_PROTO(struct super_block *sb, u64 node_id, unsigned long nr_clients),
TP_ARGS(sb, node_id, nr_clients)
);
#endif /* _TRACE_SCOUTFS_H */

View File

@@ -315,15 +315,6 @@ out:
}
/*
* This just frees the segno for the given seg. It's gross but
* symmetrical with only being able to allocate segnos by allocating a
* seg. We'll probably have to do better.
*/
int scoutfs_seg_free_segno(struct super_block *sb, struct scoutfs_segment *seg)
{
return scoutfs_server_free_segno(sb, seg->segno);
}
/*
* The bios submitted by this don't have page references themselves. If

View File

@@ -34,7 +34,6 @@ void scoutfs_seg_put(struct scoutfs_segment *seg);
int scoutfs_seg_alloc(struct super_block *sb, u64 segno,
struct scoutfs_segment **seg_ret);
int scoutfs_seg_free_segno(struct super_block *sb, struct scoutfs_segment *seg);
bool scoutfs_seg_fits_single(u32 nr_items, u32 val_bytes);
bool scoutfs_seg_append_item(struct super_block *sb, struct scoutfs_segment *seg,
struct scoutfs_key *key, struct kvec *val,

View File

@@ -34,6 +34,11 @@
#include "net.h"
#include "endian_swap.h"
/*
* XXX pre commit:
* - comments
*/
/*
* Every active mount can act as the server that listens on a net
* connection and accepts connections from all the other mounts acting
@@ -48,20 +53,19 @@
struct server_info {
struct super_block *sb;
spinlock_t lock;
wait_queue_head_t waitq;
struct workqueue_struct *wq;
struct delayed_work dwork;
struct completion shutdown_comp;
bool bind_warned;
struct scoutfs_net_connection *conn;
/* request processing coordinates committing manifest and alloc */
struct rw_semaphore commit_rwsem;
struct llist_head commit_waiters;
struct work_struct commit_work;
/* adding new segments can have to wait for compaction */
wait_queue_head_t compaction_waitq;
/* server remembers the stable manifest root for clients */
seqcount_t stable_seqcount;
struct scoutfs_btree_root stable_manifest_root;
@@ -75,6 +79,13 @@ struct server_info {
struct list_head pending_frees;
struct list_head clients;
unsigned long nr_clients;
/* track compaction in flight */
unsigned long compacts_per_client;
unsigned long nr_compacts;
struct list_head compacts;
struct work_struct compact_work;
};
#define DECLARE_SERVER_INFO(sb, name) \
@@ -86,6 +97,7 @@ struct server_info {
struct server_client_info {
u64 node_id;
struct list_head head;
unsigned long nr_compacts;
};
struct commit_waiter {
@@ -391,7 +403,7 @@ static int free_extent(struct super_block *sb, u64 start, u64 len)
* This is called by the compaction code which is running in the server.
* The server caller has held all the locks, etc.
*/
int scoutfs_server_free_segno(struct super_block *sb, u64 segno)
static int free_segno(struct super_block *sb, u64 segno)
{
scoutfs_inc_counter(sb, server_free_segno);
trace_scoutfs_free_segno(sb, segno);
@@ -459,11 +471,51 @@ out:
return ret;
}
/*
* "allocating" a segno removes an unknown segment from the allocator
* and returns it, "removing" a segno removes a specific segno from the
* allocator.
*/
static int remove_segno(struct super_block *sb, u64 segno)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct scoutfs_extent ext;
int ret;
trace_scoutfs_remove_segno(sb, segno);
scoutfs_extent_init(&ext, SCOUTFS_FREE_EXTENT_BLKNO_TYPE, 0,
segno << SCOUTFS_SEGMENT_BLOCK_SHIFT,
SCOUTFS_SEGMENT_BLOCKS, 0, 0);
down_write(&server->alloc_rwsem);
ret = scoutfs_extent_remove(sb, server_extent_io, &ext, NULL);
up_write(&server->alloc_rwsem);
return ret;
}
static void shutdown_server(struct server_info *server)
{
complete(&server->shutdown_comp);
}
/*
* Queue compaction work if clients have capacity for processing
* requests and the manifest knows of levels with too many segments.
*/
static void try_queue_compact(struct server_info *server)
{
struct super_block *sb = server->sb;
bool can_request;
spin_lock(&server->lock);
can_request = server->nr_compacts <
(server->nr_clients * server->compacts_per_client);
spin_unlock(&server->lock);
if (can_request && scoutfs_manifest_should_compact(sb))
queue_work(server->wq, &server->compact_work);
}
/*
* This is called while still holding the rwsem that prevents commits so
* that the caller can be sure to be woken by the next commit after they
@@ -787,8 +839,7 @@ retry:
scoutfs_manifest_unlock(sb);
up_read(&server->commit_rwsem);
/* XXX waits indefinitely? io errors? */
wait_event(server->compaction_waitq,
!scoutfs_manifest_level0_full(sb));
wait_event(server->waitq, !scoutfs_manifest_level0_full(sb));
goto retry;
}
@@ -804,7 +855,7 @@ retry:
if (ret == 0) {
ret = wait_for_commit(&cw);
if (ret == 0)
scoutfs_compact_kick(sb);
try_queue_compact(server);
}
out:
@@ -1044,97 +1095,597 @@ out:
return ret;
}
/* requests sent to clients are tracked so we can free resources */
struct compact_request {
struct list_head head;
u64 node_id;
struct scoutfs_net_compact_request req;
};
/*
* Eventually we're going to have messages that control compaction.
* Each client mount would have long-lived work that sends requests
* which are stuck in processing until there's work to do. They'd get
* their entries, perform the compaction, and send a reply. But we're
* not there yet.
*
* This is a short circuit that's called directly by a work function
* that's only queued on the server. It makes compaction work inside
* the commit consistency mechanics inside request processing and
* demonstrates the moving pieces that we'd need to cut up into a series
* of messages and replies.
*
* The compaction work caller cleans up everything on errors.
* Find a node that can process our compaction request. Return a
* node_id if we found a client and added the compaction to the client
* and server counts. Returns 0 if no suitable clients were found.
*/
int scoutfs_client_get_compaction(struct super_block *sb, void *curs)
static u64 compact_request_start(struct super_block *sb,
struct compact_request *cr)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct commit_waiter cw;
u64 segno;
int ret = 0;
int nr;
int i;
struct server_client_info *last;
struct server_client_info *sci;
u64 node_id = 0;
down_read(&server->commit_rwsem);
spin_lock(&server->lock);
nr = scoutfs_manifest_next_compact(sb, curs);
if (nr <= 0) {
up_read(&server->commit_rwsem);
return nr;
}
/* XXX no last_entry_or_null? :( */
if (!list_empty(&server->clients))
last = list_last_entry(&server->clients,
struct server_client_info, head);
else
last = NULL;
/* allow for expansion slop from sticky and alignment */
for (i = 0; i < nr + SCOUTFS_COMPACTION_SLOP; i++) {
ret = alloc_segno(sb, &segno);
if (ret < 0)
while ((sci = list_first_entry_or_null(&server->clients,
struct server_client_info,
head)) != NULL) {
list_move_tail(&sci->head, &server->clients);
if (sci->nr_compacts < server->compacts_per_client) {
list_add(&cr->head, &server->compacts);
server->nr_compacts++;
sci->nr_compacts++;
node_id = sci->node_id;
cr->node_id = node_id;
break;
}
if (sci == last)
break;
scoutfs_compact_add_segno(sb, curs, segno);
}
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
trace_scoutfs_server_compact_start(sb, le64_to_cpu(cr->req.id),
cr->req.ents[0].level, node_id,
node_id ? sci->nr_compacts : 0,
server->nr_compacts,
server->compacts_per_client);
if (ret == 0)
ret = wait_for_commit(&cw);
spin_unlock(&server->lock);
return node_id;
}
/*
* Find a tracked compact request for the compaction id, remove it from
* the server and client counts, and return it to the caller.
*/
static struct compact_request *compact_request_done(struct super_block *sb,
u64 id)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct compact_request *ret = NULL;
struct server_client_info *sci;
struct compact_request *cr;
spin_lock(&server->lock);
list_for_each_entry(cr, &server->compacts, head) {
if (le64_to_cpu(cr->req.id) != id)
continue;
list_for_each_entry(sci, &server->clients, head) {
if (sci->node_id == cr->node_id) {
sci->nr_compacts--;
break;
}
}
server->nr_compacts--;
list_del_init(&cr->head);
ret = cr;
break;
}
trace_scoutfs_server_compact_done(sb, id, ret ? ret->node_id : 0,
server->nr_compacts);
spin_unlock(&server->lock);
return ret;
}
/*
* This is a stub for recording the results of a compaction. We just
* call back into compaction to have it call the manifest and allocator
* updates.
* When a client disconnects we forget the compactions that they had
* in flight so that we have capacity to send compaction requests to the
* remaining clients.
*
* In the future we'd encode the manifest and segnos in requests sent to
* the server who'd update the manifest and allocator in request
* processing.
*
* As we finish a compaction we wait level0 writers if it opened up
* space in level 0.
* XXX we do not free their allocated segnos because they could still be
* running and writing to those blocks. To do this safely we'd need
* full recovery procedures with fencing to ensure that they're not able
* to write to those blocks anymore.
*/
int scoutfs_client_finish_compaction(struct super_block *sb, void *curs,
void *list)
static void forget_client_compacts(struct super_block *sb,
struct server_client_info *sci)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct compact_request *cr;
struct compact_request *pos;
LIST_HEAD(forget);
spin_lock(&server->lock);
list_for_each_entry_safe(cr, pos, &server->compacts, head) {
if (cr->node_id == sci->node_id) {
sci->nr_compacts--;
server->nr_compacts--;
list_move(&cr->head, &forget);
}
}
spin_unlock(&server->lock);
list_for_each_entry_safe(cr, pos, &forget, head) {
scoutfs_manifest_compact_done(sb, &cr->req);
list_del_init(&cr->head);
kfree(cr);
}
}
static int segno_in_ents(__le64 segno, struct scoutfs_net_manifest_entry *ents,
unsigned int nr)
{
int i;
for (i = 0; i < nr; i++) {
if (ents[i].segno == 0)
break;
if (segno == ents[i].segno)
return 1;
}
return 0;
}
static int remove_segnos(struct super_block *sb, __le64 * __packed segnos,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup);
/*
* Free segnos if they're not found in the unless entries. If this
* returns an error then we've cleaned up partial frees on error. This
* panics if it sees an error and can't cleanup on error.
*
* There are variants of this for lots of add/del, alloc/remove data
* structurs.
*/
static int free_segnos(struct super_block *sb, __le64 * __packed segnos,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup)
{
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (segnos[i] == 0)
break;
if (segno_in_ents(segnos[i], unless, nr_unless))
continue;
ret = free_segno(sb, le64_to_cpu(segnos[i]));
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
remove_segnos(sb, segnos, i, unless, nr_unless, false);
break;
}
}
return ret;
}
static int alloc_segnos(struct super_block *sb, __le64 * __packed segnos,
unsigned int nr)
{
u64 segno;
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
ret = alloc_segno(sb, &segno);
if (ret < 0) {
free_segnos(sb, segnos, i, NULL, 0, false);
break;
}
segnos[i] = cpu_to_le64(segno);
}
return ret;
}
static int remove_segnos(struct super_block *sb, __le64 * __packed segnos,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup)
{
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (segnos[i] == 0)
break;
if (segno_in_ents(segnos[i], unless, nr_unless))
continue;
ret = remove_segno(sb, le64_to_cpu(segnos[i]));
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
free_segnos(sb, segnos, i, unless, nr_unless, false);
break;
}
}
return ret;
}
static int remove_entry_segnos(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup);
static int free_entry_segnos(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup)
{
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (ents[i].segno == 0)
break;
if (segno_in_ents(ents[i].segno, unless, nr_unless))
continue;
ret = free_segno(sb, le64_to_cpu(ents[i].segno));
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
remove_entry_segnos(sb, ents, i, unless, nr_unless,
false);
break;
}
}
return ret;
}
static int remove_entry_segnos(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr,
struct scoutfs_net_manifest_entry *unless,
unsigned int nr_unless, bool cleanup)
{
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (ents[i].segno == 0)
break;
if (segno_in_ents(ents[i].segno, unless, nr_unless))
continue;
ret = remove_segno(sb, le64_to_cpu(ents[i].segno));
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
free_entry_segnos(sb, ents, i, unless, nr_unless,
false);
break;
}
}
return ret;
}
static int del_manifest_entries(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr, bool cleanup);
static int add_manifest_entries(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr, bool cleanup)
{
struct scoutfs_manifest_entry ment;
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (ents[i].segno == 0)
break;
scoutfs_init_ment_from_net(&ment, &ents[i]);
ret = scoutfs_manifest_add(sb, &ment);
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
del_manifest_entries(sb, ents, i, false);
break;
}
}
return ret;
}
static int del_manifest_entries(struct super_block *sb,
struct scoutfs_net_manifest_entry *ents,
unsigned int nr, bool cleanup)
{
struct scoutfs_manifest_entry ment;
int ret = 0;
int i;
for (i = 0; i < nr; i++) {
if (ents[i].segno == 0)
break;
scoutfs_init_ment_from_net(&ment, &ents[i]);
ret = scoutfs_manifest_del(sb, &ment);
BUG_ON(ret < 0 && !cleanup);
if (ret < 0) {
add_manifest_entries(sb, ents, i, false);
break;
}
}
return ret;
}
/*
* Process a received compaction response. This is called in concurrent
* processing work context so it's racing with other compaction
* responses and new compaction requests being built and sent.
*
* If the compaction failed then we only have to free the allocated
* output segnos sent in the request.
*
* If the compaction succeeded then we need to delete the input manifest
* entries, add any new output manifest entries, and free allocated
* segnos and input manifest segnos that aren't found in output segnos.
*
* And finally we always remove the compaction from the runtime client
* accounting
*
* As we finish a compaction we wake level0 writers if there's now space
* in level 0 for a new segment.
*
* Errors in processing are taken as an indication that this server is
* no longer able to do its job. We return hard errors which shut down
* the server in the hopes that another healthy server will start up.
* We may want to revisit this.
*/
static int compact_response(struct super_block *sb,
struct scoutfs_net_connection *conn,
void *resp, unsigned int resp_len,
int error, void *data)
{
struct server_info *server = SCOUTFS_SB(sb)->server_info;
struct scoutfs_net_compact_response *cresp = NULL;
struct compact_request *cr = NULL;
bool level0_was_full = false;
bool add_ents = false;
bool del_ents = false;
bool rem_segnos = false;
struct commit_waiter cw;
bool level0_was_full;
__le64 id;
int ret;
if (error) {
/* an error response without an id is fatal */
if (resp_len != sizeof(__le64)) {
ret = -EINVAL;
goto out;
}
memcpy(&id, resp, resp_len);
} else {
if (resp_len != sizeof(struct scoutfs_net_compact_response)) {
ret = -EINVAL;
goto out;
}
cresp = resp;
id = cresp->id;
}
trace_scoutfs_server_compact_response(sb, le64_to_cpu(id), error);
/* XXX we only free tracked requests on responses, must still exist */
cr = compact_request_done(sb, le64_to_cpu(id));
if (WARN_ON_ONCE(cr == NULL)) {
ret = -ENOENT;
goto out;
}
down_read(&server->commit_rwsem);
scoutfs_manifest_lock(sb);
level0_was_full = scoutfs_manifest_level0_full(sb);
ret = scoutfs_compact_commit(sb, curs, list);
if (ret == 0) {
queue_commit_work(server, &cw);
if (level0_was_full && !scoutfs_manifest_level0_full(sb))
wake_up(&server->compaction_waitq);
if (error) {
ret = 0;
goto cleanup;
}
up_read(&server->commit_rwsem);
/* delete old manifest entries */
ret = del_manifest_entries(sb, cr->req.ents, ARRAY_SIZE(cr->req.ents),
true);
if (ret)
goto cleanup;
add_ents = true;
/* add new manifest entries */
ret = add_manifest_entries(sb, cresp->ents, ARRAY_SIZE(cresp->ents),
true);
if (ret)
goto cleanup;
del_ents = true;
/* free allocated segnos not found in new entries */
ret = free_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos),
cresp->ents, ARRAY_SIZE(cresp->ents), true);
if (ret)
goto cleanup;
rem_segnos = true;
/* free input segnos not found in new entries */
ret = free_entry_segnos(sb, cr->req.ents, ARRAY_SIZE(cr->req.ents),
cresp->ents, ARRAY_SIZE(cresp->ents), true);
cleanup:
/* cleanup partial commits on errors */
if (ret < 0 && rem_segnos)
remove_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos),
cresp->ents, ARRAY_SIZE(cresp->ents), false);
if (ret < 0 && del_ents)
del_manifest_entries(sb, cresp->ents, ARRAY_SIZE(cresp->ents),
false);
if (ret < 0 && add_ents)
add_manifest_entries(sb, cr->req.ents,
ARRAY_SIZE(cr->req.ents), false);
/* free all the allocated output segnos if compaction failed */
if ((error || ret < 0) && cr != NULL)
free_segnos(sb, cr->req.segnos, ARRAY_SIZE(cr->req.segnos),
NULL, 0, false);
if (ret == 0 && level0_was_full && !scoutfs_manifest_level0_full(sb))
wake_up(&server->waitq);
if (ret == 0)
queue_commit_work(server, &cw);
scoutfs_manifest_unlock(sb);
up_read(&server->commit_rwsem);
if (cr) {
scoutfs_manifest_compact_done(sb, &cr->req);
kfree(cr);
}
if (ret == 0) {
ret = wait_for_commit(&cw);
if (ret == 0)
try_queue_compact(server);
}
scoutfs_compact_kick(sb);
out:
return ret;
}
/*
* The compaction worker executes as the manifest is updated and we see
* that a level has too many segments and clients aren't processing all
* their max number of compaction requests. Only one compaction worker
* executes.
*
* We have the manifest build us a compaction request, find a client to
* send it too, and record it for later completion processing.
*
* The manifest tracks pending compactions and won't use the same
* segments as inputs to multiple compactions. We track the number of
* compactions in flight to each client to keep them balanced.
*/
static void scoutfs_server_compact_worker(struct work_struct *work)
{
struct server_info *server = container_of(work, struct server_info,
compact_work);
struct super_block *sb = server->sb;
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
struct scoutfs_net_compact_request *req;
struct compact_request *cr;
struct commit_waiter cw;
int nr_segnos = 0;
u64 node_id;
__le64 id;
int ret;
trace_scoutfs_server_compact_work_enter(sb, 0, 0);
cr = kzalloc(sizeof(struct compact_request), GFP_NOFS);
if (!cr) {
ret = -ENOMEM;
goto out;
}
req = &cr->req;
/* get the input manifest entries */
ret = scoutfs_manifest_next_compact(sb, req);
if (ret <= 0)
goto out;
nr_segnos = ret + SCOUTFS_COMPACTION_SEGNO_OVERHEAD;
/* get the next id and allocate possible output segnos */
down_read(&server->commit_rwsem);
spin_lock(&server->lock);
id = super->next_compact_id;
le64_add_cpu(&super->next_compact_id, 1);
spin_unlock(&server->lock);
ret = alloc_segnos(sb, req->segnos, nr_segnos);
if (ret == 0)
queue_commit_work(server, &cw);
up_read(&server->commit_rwsem);
if (ret == 0)
ret = wait_for_commit(&cw);
if (ret)
goto out;
/* try to send to a node with capacity, they can disconnect */
retry:
req->id = id;
node_id = compact_request_start(sb, cr);
if (node_id == 0) {
ret = 0;
goto out;
}
/* response processing can complete compaction before this returns */
ret = scoutfs_net_submit_request_node(sb, server->conn, node_id,
SCOUTFS_NET_CMD_COMPACT,
req, sizeof(*req),
compact_response, NULL, NULL);
if (ret < 0) {
cr = compact_request_done(sb, le64_to_cpu(id));
BUG_ON(cr == NULL); /* must still be there, no node cleanup */
}
if (ret == -ENOTCONN)
goto retry;
if (ret < 0)
goto out;
/* cr is now owned by response processing */
cr = NULL;
ret = 1;
out:
if (ret <= 0 && cr != NULL) {
scoutfs_manifest_compact_done(sb, req);
/* don't need to wait for commit when freeing in cleanup */
down_read(&server->commit_rwsem);
free_segnos(sb, req->segnos, nr_segnos, NULL, 0, false);
up_read(&server->commit_rwsem);
kfree(cr);
}
if (ret > 0)
try_queue_compact(server);
trace_scoutfs_server_compact_work_exit(sb, 0, ret);
}
/*
* This relies on the caller having read the current super and advanced
* its seq so that it's dirty. This will go away when we communicate
@@ -1172,9 +1723,14 @@ static void server_notify_up(struct super_block *sb,
if (node_id != 0) {
sci->node_id = node_id;
sci->nr_compacts = 0;
spin_lock(&server->lock);
list_add_tail(&sci->head, &server->clients);
server->nr_clients++;
trace_scoutfs_server_client_up(sb, node_id, server->nr_clients);
spin_unlock(&server->lock);
try_queue_compact(server);
}
}
@@ -1187,8 +1743,14 @@ static void server_notify_down(struct super_block *sb,
if (node_id != 0) {
spin_lock(&server->lock);
list_del(&sci->head);
list_del_init(&sci->head);
server->nr_clients--;
trace_scoutfs_server_client_down(sb, node_id,
server->nr_clients);
spin_unlock(&server->lock);
forget_client_compacts(sb, sci);
try_queue_compact(server);
} else {
shutdown_server(server);
}
@@ -1264,8 +1826,7 @@ static void scoutfs_server_worker(struct work_struct *work)
/* start up the server subsystems before accepting */
ret = scoutfs_btree_setup(sb) ?:
scoutfs_manifest_setup(sb) ?:
scoutfs_compact_setup(sb);
scoutfs_manifest_setup(sb);
if (ret)
goto shutdown;
@@ -1275,6 +1836,7 @@ static void scoutfs_server_worker(struct work_struct *work)
scoutfs_info(sb, "server started on "SIN_FMT, SIN_ARG(&sin));
/* start accepting connections and processing work */
server->conn = conn;
scoutfs_net_listen(sb, conn);
/* wait for listening down or umount, conn can still be live */
@@ -1285,13 +1847,12 @@ static void scoutfs_server_worker(struct work_struct *work)
shutdown:
/* wait for request processing */
scoutfs_net_shutdown(sb, conn);
/* drain compact work queued by responses */
cancel_work_sync(&server->compact_work);
/* wait for commit queued by request processing */
flush_work(&server->commit_work);
server->conn = NULL;
/* shut down all the server subsystems */
scoutfs_compact_destroy(sb);
/* (wait for possible double commit work queued by compaction) */
flush_work(&server->commit_work);
destroy_pending_frees(sb);
scoutfs_manifest_destroy(sb);
scoutfs_btree_destroy(sb);
@@ -1325,19 +1886,22 @@ int scoutfs_server_setup(struct super_block *sb)
server->sb = sb;
spin_lock_init(&server->lock);
init_waitqueue_head(&server->waitq);
init_completion(&server->shutdown_comp);
server->bind_warned = false;
INIT_DELAYED_WORK(&server->dwork, scoutfs_server_worker);
init_rwsem(&server->commit_rwsem);
init_llist_head(&server->commit_waiters);
INIT_WORK(&server->commit_work, scoutfs_server_commit_func);
init_waitqueue_head(&server->compaction_waitq);
seqcount_init(&server->stable_seqcount);
spin_lock_init(&server->seq_lock);
INIT_LIST_HEAD(&server->pending_seqs);
init_rwsem(&server->alloc_rwsem);
INIT_LIST_HEAD(&server->pending_frees);
INIT_LIST_HEAD(&server->clients);
server->compacts_per_client = 2;
INIT_LIST_HEAD(&server->compacts);
INIT_WORK(&server->compact_work, scoutfs_server_compact_worker);
server->wq = alloc_workqueue("scoutfs_server",
WQ_UNBOUND | WQ_NON_REENTRANT, 0);

View File

@@ -53,11 +53,6 @@ void scoutfs_init_ment_to_net(struct scoutfs_net_manifest_entry *net_ment,
void scoutfs_init_ment_from_net(struct scoutfs_manifest_entry *ment,
struct scoutfs_net_manifest_entry *net_ment);
int scoutfs_client_get_compaction(struct super_block *sb, void *curs);
int scoutfs_client_finish_compaction(struct super_block *sb, void *curs,
void *list);
int scoutfs_server_free_segno(struct super_block *sb, u64 segno);
int scoutfs_server_setup(struct super_block *sb);
void scoutfs_server_destroy(struct super_block *sb);