Add simple debugging range locking layer

We can work on shared mechanics without requiring a full locking server.
We can stand up a simple layer which uses shared data structures in a
kernel image to lock between mounts in the same kernel.

On mount we add supers to a list.  Held locks are tracked in a rbtree.
A lock attempt blocks until it doesn't conflict with anything in the
rbtree.

As locks are acquired we walk all the other supers and write/invaludate
any items they have which intersect with the acquired range.  This is
easier to implement and less efficient than caching locks after they're
unlocked and implementing downconvert/blocking/revoke.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2017-02-15 08:29:45 -08:00
parent f373f05fb7
commit b3b2693939
7 changed files with 397 additions and 3 deletions

View File

@@ -3,5 +3,5 @@ obj-$(CONFIG_SCOUTFS_FS) := scoutfs.o
CFLAGS_scoutfs_trace.o = -I$(src) # define_trace.h double include
scoutfs-y += alloc.o bio.o compact.o counters.o data.o dir.o kvec.o inode.o \
ioctl.o item.o key.o manifest.o msg.o seg.o scoutfs_trace.o \
super.o trans.o treap.o xattr.o
ioctl.o item.o key.o lock.o manifest.o msg.o seg.o \
scoutfs_trace.o super.o trans.o treap.o xattr.o

View File

@@ -24,6 +24,7 @@
#include "seg.h"
#include "counters.h"
#include "scoutfs_trace.h"
#include "trans.h"
/*
* A simple rbtree of cached items isolates the item API callers from
@@ -1536,6 +1537,79 @@ int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg)
return 0;
}
/*
* The caller wants us to write out any dirty items within the given
* range. We look for any dirty items within the range and if we find
* any we issue a sync which writes out all the dirty items.
*/
int scoutfs_item_writeback(struct super_block *sb,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct item_cache *cac = sbi->item_cache;
struct cached_item *item;
unsigned long flags;
bool sync = false;
int ret = 0;
/* XXX think about racing with trans write */
spin_lock_irqsave(&cac->lock, flags);
if (cac->nr_dirty_items) {
item = next_item(&cac->items, start);
if (item && !(item->dirty & ITEM_DIRTY))
item = next_dirty(item);
if (item && scoutfs_key_compare(item->key, end) <= 0)
sync = true;
}
spin_unlock_irqrestore(&cac->lock, flags);
if (sync)
ret = scoutfs_sync_fs(sb, 1);
return ret;
}
/*
* The caller wants us to drop any items within the range on the floor.
* They should have ensured that items in this range won't be dirty.
*/
void scoutfs_item_invalidate(struct super_block *sb,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct item_cache *cac = sbi->item_cache;
struct cached_item *next;
struct cached_item *item;
struct rb_node *node;
unsigned long flags;
/* XXX think about racing with trans write */
spin_lock_irqsave(&cac->lock, flags);
for (item = next_item(&cac->items, start);
item && scoutfs_key_compare(item->key, end) <= 0;
item = next) {
/* XXX seems like this should be a helper? */
node = rb_next(&item->node);
if (node)
next = container_of(node, struct cached_item, node);
else
next = NULL;
WARN_ON_ONCE(item->dirty & ITEM_DIRTY);
erase_item(sb, cac, item);
}
spin_unlock_irqrestore(&cac->lock, flags);
}
int scoutfs_item_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);

View File

@@ -57,6 +57,12 @@ bool scoutfs_item_has_dirty(struct super_block *sb);
bool scoutfs_item_dirty_fits_single(struct super_block *sb, u32 nr_items,
u32 key_bytes, u32 val_bytes);
int scoutfs_item_dirty_seg(struct super_block *sb, struct scoutfs_segment *seg);
int scoutfs_item_writeback(struct super_block *sb,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end);
void scoutfs_item_invalidate(struct super_block *sb,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end);
int scoutfs_item_setup(struct super_block *sb);
void scoutfs_item_destroy(struct super_block *sb);

282
kmod/src/lock.c Normal file
View File

@@ -0,0 +1,282 @@
/*
* Copyright (C) 2018 Versity Software, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public
* License v2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/sched.h>
#include <linux/slab.h>
#include "super.h"
#include "lock.h"
#include "item.h"
#include "scoutfs_trace.h"
/*
* This is meant to be simple and correct, not performant.
*/
static DECLARE_RWSEM(global_rwsem);
static LIST_HEAD(global_super_list);
/*
* Allocated once and pointed to by the lock info of all the supers with
* the same fsid. Freed as the last super unmounts.
*/
struct held_locks {
spinlock_t lock;
struct list_head list;
wait_queue_head_t waitq;
};
/*
* allocated per-super. Stored in the global list for finding supers
* with fsids and stored in a list with others with the same fsid for
* invalidation. Freed on unmount.
*/
struct lock_info {
struct super_block *sb;
struct held_locks *held;
struct list_head id_head;
struct list_head global_head;
};
#define DECLARE_LOCK_INFO(sb, name) \
struct lock_info *name = SCOUTFS_SB(sb)->lock_info
/*
* locks are compatible if they're from the same super, or are both reads,
* or don't overlap.
*/
static bool compatible_locks(struct scoutfs_lock *a, struct scoutfs_lock *b)
{
return a->sb == b->sb ||
(a->mode == SCOUTFS_LOCK_MODE_READ &&
b->mode == SCOUTFS_LOCK_MODE_READ) ||
scoutfs_key_compare_ranges(a->start, a->end, b->start, b->end);
}
static bool lock_added(struct held_locks *held, struct scoutfs_lock *add)
{
struct scoutfs_lock *lck;
bool added = true;
spin_lock(&held->lock);
list_for_each_entry(lck, &held->list, head) {
if (!compatible_locks(lck, add)) {
added = false;
break;
}
}
if (added)
list_add(&add->head, &held->list);
spin_unlock(&held->lock);
return added;
}
/*
* Invalidate caches on this super because another super has acquired
* a lock with the given mode and range. We always have to write out
* dirty overlapping items. If they're writing then we need to also
* invalidate all cached overlapping structures.
*/
static int invalidate_caches(struct super_block *sb, int mode,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end)
{
int ret;
ret = scoutfs_item_writeback(sb, start, end);
if (ret)
return ret;
if (mode == SCOUTFS_LOCK_MODE_WRITE) {
scoutfs_item_invalidate(sb, start, end);
#if 0
scoutfs_dir_invalidate(sb, start, end) ?:
scoutfs_inode_invalidate(sb, start, end) ?:
scoutfs_data_invalidate(sb, start, end);
#endif
}
return 0;
}
#define for_each_other_linf(linf, from_linf) \
for (linf = list_entry(from_linf->id_head.next, struct lock_info, \
id_head); \
linf != from_linf; \
linf = list_entry(linf->id_head.next, struct lock_info, \
id_head))
static int invalidate_others(struct super_block *from, int mode,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end)
{
DECLARE_LOCK_INFO(from, from_linf);
struct lock_info *linf;
int ret;
down_read(&global_rwsem);
for_each_other_linf(linf, from_linf) {
ret = invalidate_caches(linf->sb, mode, start, end);
if (ret)
break;
}
up_read(&global_rwsem);
return ret;
}
static void unlock(struct held_locks *held, struct scoutfs_lock *lck)
{
spin_lock(&held->lock);
list_del_init(&lck->head);
spin_unlock(&held->lock);
wake_up(&held->waitq);
}
/*
* Acquire a coherent lock on the given range of keys. While the lock
* is held other lockers are serialized. Cache coherency is maintained
* by the locking infrastructure. Lock acquisition causes writeout from
* or invalidation of other caches.
*
* The caller provides the opaque lock structure used for storage and
* their start and end pointers will be accessed while the lock is held.
*/
int scoutfs_lock_range(struct super_block *sb, int mode,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end,
struct scoutfs_lock *lck)
{
DECLARE_LOCK_INFO(sb, linf);
struct held_locks *held = linf->held;
int ret;
INIT_LIST_HEAD(&lck->head);
lck->sb = sb;
lck->start = start;
lck->end = end;
lck->mode = mode;
ret = wait_event_interruptible(held->waitq, lock_added(held, lck));
if (ret == 0) {
ret = invalidate_others(sb, mode, start, end);
if (ret)
unlock(held, lck);
}
return ret;
}
void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck)
{
DECLARE_LOCK_INFO(sb, linf);
struct held_locks *held = linf->held;
unlock(held, lck);
}
/*
* The moment this is done we can have other mounts start asking
* us to write back and invalidate, so do this very very late.
*/
int scoutfs_lock_setup(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
struct scoutfs_sb_info *other_sbi;
struct lock_info *other_linf;
struct held_locks *held;
struct lock_info *linf;
linf = kmalloc(sizeof(struct lock_info), GFP_KERNEL);
if (!linf)
return -ENOMEM;
held = kmalloc(sizeof(struct held_locks), GFP_KERNEL);
if (!held) {
kfree(linf);
return -ENOMEM;
}
spin_lock_init(&held->lock);
INIT_LIST_HEAD(&held->list);
init_waitqueue_head(&held->waitq);
linf->sb = sb;
linf->held = held;
INIT_LIST_HEAD(&linf->id_head);
INIT_LIST_HEAD(&linf->global_head);
sbi->lock_info = linf;
trace_printk("sb %p id %016llx allocated linf %p held %p\n",
sb, le64_to_cpu(sbi->super.id), linf, held);
down_write(&global_rwsem);
list_for_each_entry(other_linf, &global_super_list, global_head) {
other_sbi = SCOUTFS_SB(other_linf->sb);
if (other_sbi->super.id == sbi->super.id) {
list_add(&linf->id_head, &other_linf->id_head);
linf->held = other_linf->held;
trace_printk("sharing held %p\n", linf->held);
break;
}
}
/* add to global list after walking so we don't see ourselves */
list_add(&linf->global_head, &global_super_list);
up_write(&global_rwsem);
if (linf->held != held)
kfree(held);
return 0;
}
void scoutfs_lock_destroy(struct super_block *sb)
{
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
DECLARE_LOCK_INFO(sb, linf);
struct held_locks *held;
if (linf) {
down_write(&global_rwsem);
list_del_init(&linf->global_head);
if (!list_empty(&linf->id_head)) {
list_del_init(&linf->id_head);
held = NULL;
} else {
held = linf->held;
}
up_write(&global_rwsem);
trace_printk("sb %p id %016llx freeing linf %p held %p\n",
sb, le64_to_cpu(sbi->super.id), linf, held);
kfree(held);
kfree(linf);
}
}

26
kmod/src/lock.h Normal file
View File

@@ -0,0 +1,26 @@
#ifndef _SCOUTFS_LOCK_H_
#define _SCOUTFS_LOCK_H_
struct scoutfs_lock {
struct list_head head;
struct super_block *sb;
struct scoutfs_key_buf *start;
struct scoutfs_key_buf *end;
int mode;
};
enum {
SCOUTFS_LOCK_MODE_READ,
SCOUTFS_LOCK_MODE_WRITE,
};
int scoutfs_lock_range(struct super_block *sb, int mode,
struct scoutfs_key_buf *start,
struct scoutfs_key_buf *end,
struct scoutfs_lock *lck);
void scoutfs_unlock_range(struct super_block *sb, struct scoutfs_lock *lck);
int scoutfs_lock_setup(struct super_block *sb);
void scoutfs_lock_destroy(struct super_block *sb);
#endif

View File

@@ -35,6 +35,7 @@
#include "treap.h"
#include "compact.h"
#include "data.h"
#include "lock.h"
#include "scoutfs_trace.h"
static struct kset *scoutfs_kset;
@@ -219,7 +220,8 @@ static int scoutfs_fill_super(struct super_block *sb, void *data, int silent)
scoutfs_treap_setup(sb) ?:
// scoutfs_buddy_setup(sb) ?:
scoutfs_compact_setup(sb) ?:
scoutfs_setup_trans(sb);
scoutfs_setup_trans(sb) ?:
scoutfs_lock_setup(sb);
if (ret)
return ret;
@@ -250,6 +252,7 @@ static void scoutfs_kill_sb(struct super_block *sb)
kill_block_super(sb);
if (sbi) {
scoutfs_lock_destroy(sb);
scoutfs_compact_destroy(sb);
scoutfs_shutdown_trans(sb);
scoutfs_data_destroy(sb);

View File

@@ -13,6 +13,7 @@ struct segment_cache;
struct treap_info;
struct compact_info;
struct data_info;
struct lock_info;
struct scoutfs_sb_info {
struct super_block *sb;
@@ -40,6 +41,8 @@ struct scoutfs_sb_info {
wait_queue_head_t trans_write_wq;
struct workqueue_struct *trans_write_workq;
struct lock_info *lock_info;
/* $sysfs/fs/scoutfs/$id/ */
struct kset *kset;