Alloc inodes from pool from server

Inode allocation was always modifying the in-memory super block.  This
doesn't work when the server is solely responsible for modifying the
super blocks.  We add network messages to have mounts send a message to
the server to request inodes that they can use to satisfy allocation.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-04-12 16:58:07 -07:00
parent 453715a78d
commit b50de90196
7 changed files with 221 additions and 14 deletions

View File

@@ -362,11 +362,21 @@ struct scoutfs_net_header {
__u8 type;
__u8 status;
__u8 data[0];
};
} __packed;
/*
* When there's no more free inodes this will be sent with ino = ~0 and
* nr = 0.
*/
struct scoutfs_net_inode_alloc {
__le64 ino;
__le64 nr;
} __packed;
enum {
/* sends and receives a struct scoutfs_timeval */
SCOUTFS_NET_TRADE_TIME = 0,
SCOUTFS_NET_ALLOC_INODES,
SCOUTFS_NET_UNKNOWN,
};

View File

@@ -17,6 +17,7 @@
#include <linux/xattr.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/sched.h>
#include "format.h"
#include "super.h"
@@ -30,6 +31,7 @@
#include "msg.h"
#include "kvec.h"
#include "item.h"
#include "net.h"
/*
* XXX
@@ -37,6 +39,14 @@
* - use inode item value lengths for forward/back compat
*/
struct free_ino_pool {
wait_queue_head_t waitq;
spinlock_t lock;
u64 ino;
u64 nr;
bool in_flight;
};
static struct kmem_cache *scoutfs_inode_cachep;
static void scoutfs_inode_ctor(void *obj)
@@ -359,24 +369,98 @@ u64 scoutfs_last_ino(struct super_block *sb)
return last;
}
/*
* Network replies refill the pool, providing ino = ~0ULL nr = 0 when
* there's no more inodes (which should never happen in practice.)
*/
void scoutfs_inode_fill_pool(struct super_block *sb, u64 ino, u64 nr)
{
struct free_ino_pool *pool = SCOUTFS_SB(sb)->free_ino_pool;
trace_printk("filling ino %llu nr %llu\n", ino, nr);
spin_lock(&pool->lock);
pool->ino = ino;
pool->nr = nr;
pool->in_flight = false;
spin_unlock(&pool->lock);
wake_up(&pool->waitq);
}
static bool pool_in_flight(struct free_ino_pool *pool)
{
bool in_flight;
spin_lock(&pool->lock);
in_flight = pool->in_flight;
spin_unlock(&pool->lock);
return in_flight;
}
/*
* We have a pool of free inodes given to us by the server. If it
* empties we only ever have one request for new inodes in flight. The
* net layer calls us when it gets a reply. If there's no more inodes
* we'll get ino == ~0 and nr == 0.
*/
static int alloc_ino(struct super_block *sb, u64 *ino)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct free_ino_pool *pool = SCOUTFS_SB(sb)->free_ino_pool;
bool request;
int ret;
spin_lock(&sbi->next_ino_lock);
*ino = 0;
if (super->next_ino == 0) {
ret = -ENOSPC;
} else {
*ino = le64_to_cpu(super->next_ino);
le64_add_cpu(&super->next_ino, 1);
ret = 0;
spin_lock(&pool->lock);
while (pool->nr == 0 && pool->ino != ~0ULL) {
if (pool->in_flight) {
request = false;
} else {
pool->in_flight = true;
request = true;
}
spin_unlock(&pool->lock);
if (request) {
ret = scoutfs_net_alloc_inodes(sb);
if (ret) {
spin_lock(&pool->lock);
pool->in_flight = false;
spin_unlock(&pool->lock);
wake_up(&pool->waitq);
goto out;
}
}
ret = wait_event_interruptible(pool->waitq,
!pool_in_flight(pool));
if (ret)
goto out;
spin_lock(&pool->lock);
}
spin_unlock(&sbi->next_ino_lock);
if (pool->nr == 0) {
*ino = 0;
ret = -ENOSPC;
} else {
*ino = pool->ino++;
pool->nr--;
ret = 0;
}
spin_unlock(&pool->lock);
out:
trace_printk("ret %d ino %llu pool ino %llu nr %llu req %u (racey)\n",
ret, *ino, pool->ino, pool->nr, pool->in_flight);
return ret;
}
@@ -633,6 +717,30 @@ int scoutfs_orphan_inode(struct inode *inode)
return ret;
}
int scoutfs_inode_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct free_ino_pool *pool;
pool = kzalloc(sizeof(struct free_ino_pool), GFP_KERNEL);
if (!pool)
return -ENOMEM;
init_waitqueue_head(&pool->waitq);
spin_lock_init(&pool->lock);
sbi->free_ino_pool = pool;
return 0;
}
void scoutfs_inode_destroy(struct super_block *sb)
{
struct free_ino_pool *pool = SCOUTFS_SB(sb)->free_ino_pool;
kfree(pool);
}
void scoutfs_inode_exit(void)
{
if (scoutfs_inode_cachep) {

View File

@@ -41,6 +41,7 @@ struct inode *scoutfs_iget(struct super_block *sb, u64 ino);
int scoutfs_dirty_inode_item(struct inode *inode);
void scoutfs_dirty_inode(struct inode *inode, int flags);
void scoutfs_update_inode_item(struct inode *inode);
void scoutfs_inode_fill_pool(struct super_block *sb, u64 ino, u64 nr);
struct inode *scoutfs_new_inode(struct super_block *sb, struct inode *dir,
umode_t mode, dev_t rdev);
void scoutfs_inode_inc_data_version(struct inode *inode);
@@ -53,7 +54,7 @@ u64 scoutfs_last_ino(struct super_block *sb);
void scoutfs_inode_exit(void);
int scoutfs_inode_init(void);
int scoutfs_item_setup(struct super_block *sb);
void scoutfs_item_destroy(struct super_block *sb);
int scoutfs_inode_setup(struct super_block *sb);
void scoutfs_inode_destroy(struct super_block *sb);
#endif

View File

@@ -22,6 +22,7 @@
#include "format.h"
#include "net.h"
#include "counters.h"
#include "inode.h"
#include "scoutfs_trace.h"
/*
@@ -253,6 +254,47 @@ static struct send_buf *alloc_sbuf(unsigned data_len)
return sbuf;
}
/*
* XXX should this call into inodes? not sure about the layering here.
*/
static struct send_buf *process_alloc_inodes(struct super_block *sb,
void *req, int req_len)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_super_block *super = &sbi->super;
struct scoutfs_net_inode_alloc *ial;
struct send_buf *sbuf;
int ret;
u64 ino;
u64 nr;
if (req_len != 0)
return ERR_PTR(-EINVAL);
sbuf = alloc_sbuf(sizeof(struct scoutfs_net_inode_alloc));
if (!sbuf)
return ERR_PTR(-ENOMEM);
spin_lock(&sbi->next_ino_lock);
ino = le64_to_cpu(super->next_ino);
nr = min(100000ULL, ~0ULL - ino);
le64_add_cpu(&super->next_ino, nr);
spin_unlock(&sbi->next_ino_lock);
/* XXX think about server ring commits */
ret = 0; //sync_or_something();
ial = (void *)sbuf->nh->data;
ial->ino = cpu_to_le64(ino);
ial->nr = cpu_to_le64(nr);
sbuf->nh->status = SCOUTFS_NET_STATUS_SUCCESS;
return sbuf;
}
/*
* Log the time in the request and reply with our current time.
*/
@@ -300,6 +342,9 @@ static int process_request(struct net_info *nti, struct recv_buf *rbuf)
if (rbuf->nh->type == SCOUTFS_NET_TRADE_TIME)
sbuf = process_trade_time(sb, (void *)rbuf->nh->data,
data_len);
else if (rbuf->nh->type == SCOUTFS_NET_ALLOC_INODES)
sbuf = process_alloc_inodes(sb, (void *)rbuf->nh->data,
data_len);
else
sbuf = ERR_PTR(-EINVAL);
@@ -702,7 +747,8 @@ static int add_send_buf(struct super_block *sb, int type, void *data,
nh = sbuf->nh;
nh->type = type;
memcpy(nh->data, data, data_len);
if (data_len)
memcpy(nh->data, data, data_len);
mutex_lock(&nti->mutex);
@@ -721,6 +767,43 @@ static int add_send_buf(struct super_block *sb, int type, void *data,
return 0;
}
static int alloc_inodes_reply(struct super_block *sb, void *reply, int ret)
{
struct scoutfs_net_inode_alloc *ial = reply;
u64 ino;
u64 nr;
if (ret != sizeof(*ial)) {
ret = -EINVAL;
goto out;
}
ino = le64_to_cpu(ial->ino);
nr = le64_to_cpu(ial->nr);
/* catch wrapping */
if (ino + nr < ino) {
ret = -EINVAL;
goto out;
}
/* XXX compare to greatest inode we've seen? */
ret = 0;
out:
if (ret < 0)
scoutfs_inode_fill_pool(sb, 0, 0);
else
scoutfs_inode_fill_pool(sb, ino, nr);
return ret;
}
int scoutfs_net_alloc_inodes(struct super_block *sb)
{
return add_send_buf(sb, SCOUTFS_NET_ALLOC_INODES, NULL, 0,
alloc_inodes_reply);
}
static int trade_time_reply(struct super_block *sb, void *reply, int ret)
{
struct scoutfs_timespec *ts = reply;

View File

@@ -2,6 +2,7 @@
#define _SCOUTFS_NET_H_
int scoutfs_net_trade_time(struct super_block *sb);
int scoutfs_net_alloc_inodes(struct super_block *sb);
int scoutfs_net_setup(struct super_block *sb);
void scoutfs_net_destroy(struct super_block *sb);

View File

@@ -215,6 +215,7 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
scoutfs_seg_setup(sb) ?:
scoutfs_manifest_setup(sb) ?:
scoutfs_item_setup(sb) ?:
scoutfs_inode_setup(sb) ?:
scoutfs_data_setup(sb) ?:
scoutfs_alloc_setup(sb) ?:
scoutfs_compact_setup(sb) ?:
@@ -259,6 +260,7 @@ static void scoutfs_kill_sb(struct super_block *sb)
scoutfs_compact_destroy(sb);
scoutfs_shutdown_trans(sb);
scoutfs_data_destroy(sb);
scoutfs_inode_destroy(sb);
scoutfs_item_destroy(sb);
scoutfs_alloc_destroy(sb);
scoutfs_manifest_destroy(sb);

View File

@@ -14,6 +14,7 @@ struct compact_info;
struct data_info;
struct lock_info;
struct net_info;
struct free_ino_pool;
struct scoutfs_sb_info {
struct super_block *sb;
@@ -28,6 +29,7 @@ struct scoutfs_sb_info {
struct seg_alloc *seg_alloc;
struct compact_info *compact_info;
struct data_info *data_info;
struct free_ino_pool *free_ino_pool;
atomic_t trans_holds;
wait_queue_head_t trans_hold_wq;