scoutfs: dlmglue support for concurrent writer locks

This is a bit trickier than just dropping in a cw holders count.

dlmglue was comparing levels by a simple greater than or less than check.
Since CW locks are not compatible with PR or EX, this check breaks down.
Instead we provide a function which can tell us whether a conversion to a
given lock levels is is compatible (cache-wise) with the level we have.

We also have some slightly more complicated logic in downconvert. As a
result we update the helper that dlmglue uses to choose a downconvert level.

Signed-off-by: Mark Fasheh <mfasheh@versity.com>
This commit is contained in:
Mark Fasheh
2017-10-26 19:01:07 -05:00
committed by Zach Brown
parent 9fc67bcf13
commit e70dbedb7b
2 changed files with 158 additions and 34 deletions

View File

@@ -139,7 +139,6 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
static int ocfs2_downconvert_thread(void *arg);
static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres);
static inline int ocfs2_highest_compat_lock_level(int level);
static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
int new_level);
static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
@@ -192,6 +191,8 @@ static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
stats = &res->l_lock_prmode;
else if (level == DLM_LOCK_EX)
stats = &res->l_lock_exmode;
else if (level == DLM_LOCK_CW)
stats = &res->l_lock_cwmode;
else
return;
@@ -363,6 +364,9 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
case DLM_LOCK_PR:
lockres->l_ro_holders++;
break;
case DLM_LOCK_CW:
lockres->l_cw_holders++;
break;
default:
BUG();
}
@@ -382,34 +386,81 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
BUG_ON(!lockres->l_ro_holders);
lockres->l_ro_holders--;
break;
case DLM_LOCK_CW:
BUG_ON(!lockres->l_cw_holders);
lockres->l_cw_holders--;
break;
default:
BUG();
}
}
/* WARNING: This function lives in a world where the only three lock
* levels are EX, PR, and NL. It *will* have to be adjusted when more
* lock types are added. */
static inline int ocfs2_highest_compat_lock_level(int level)
/*
* Compatibility matrix indexed by lock level - idea borrowed from
* fs/dlm/lock.c. Going across is the level our lock holds, going down
* is the level we're asked to convert to. The UN column and PD
* columns are unused and act as padding.
*/
static const int level_compat_matrix[8][8] = {
/* Lockres granted level */
/* UN NL CR CW PR PW EX PD */
{0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
{0, 1, 1, 1, 1, 1, 1, 0}, /* NL */
{0, 0, 1, 1, 1, 1, 1, 0}, /* CR */
{0, 0, 0, 1, 0, 1, 1, 0}, /* CW */ /* <-- Wanted levels */
{0, 0, 0, 0, 1, 1, 1, 0}, /* PR */
{0, 0, 0, 0, 0, 1, 1, 0}, /* PW */
{0, 0, 0, 0, 0, 0, 1, 0}, /* EX */
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
};
static inline int __levels_compat(int lockres_level, int wanted)
{
return level_compat_matrix[wanted + 1][lockres_level + 1];
}
static inline int levels_compat(struct ocfs2_lock_res *lockres, int wanted)
{
return __levels_compat(lockres->l_level, wanted);
}
/*
* WARNING: We have to adjust this function when adding lock levels to
* dlmglue
*
* Given a lock blocking 'lockres' at 'level', what new level should
* we downconvert to. This function will never return a level which
* would result in an upconvert.
*/
static inline int ocfs2_downconvert_level(struct ocfs2_lock_res *lockres,
int level)
{
int new_level = DLM_LOCK_EX;
if (level == DLM_LOCK_EX)
new_level = DLM_LOCK_NL;
else if (level == DLM_LOCK_PR)
new_level = DLM_LOCK_PR;
else if (level == DLM_LOCK_PR) {
if (lockres->l_level == DLM_LOCK_EX)
new_level = DLM_LOCK_PR;
else
new_level = DLM_LOCK_NL;
} else if (level == DLM_LOCK_CW)
new_level = DLM_LOCK_CW;
return new_level;
}
#define H_EX 0x1
#define H_PR 0x2
#define H_ANY (H_EX|H_PR)
#define H_CW 0x4
#define H_ANY (H_EX|H_PR|H_CW)
static int lockres_has_holders(struct ocfs2_lock_res *lockres, int which)
{
if (which & H_EX && lockres->l_ex_holders)
return 1;
if (which & H_PR && lockres->l_ro_holders)
return 1;
if (which & H_CW && lockres->l_cw_holders)
return 1;
return 0;
}
@@ -460,14 +511,15 @@ static void lockres_inc_refresh_gen(struct ocfs2_lock_res *lockres)
static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
{
int dc_level = ocfs2_downconvert_level(lockres, lockres->l_blocking);
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
lockres->l_level = lockres->l_requested;
if (lockres->l_level <=
ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
if (levels_compat(lockres, dc_level)) {
lockres->l_blocking = DLM_LOCK_NL;
lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
}
@@ -476,20 +528,24 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res
static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
{
int old_level = lockres->l_level;
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
/* Convert from RO to EX doesn't really need anything as our
* information is already up to data. Convert from NL to
* *anything* however should mark ourselves as needing an
* update */
if (lockres->l_level == DLM_LOCK_NL &&
lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) {
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
lockres_inc_refresh_gen(lockres);
}
/*
* Converting from NL to any mode, or upconverting between
* incompatible modes will require a refresh.
*/
lockres->l_level = lockres->l_requested;
if (lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) {
if (old_level == DLM_LOCK_NL ||
(old_level == DLM_LOCK_CW &&
lockres->l_level != DLM_LOCK_NL)) {
lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
lockres_inc_refresh_gen(lockres);
}
}
/*
* We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
@@ -535,8 +591,8 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
* one that goes low enough to satisfy the level we're
* blocking. this also catches the case where we get
* duplicate BASTs */
if (ocfs2_highest_compat_lock_level(level) <
ocfs2_highest_compat_lock_level(lockres->l_blocking))
if (ocfs2_downconvert_level(lockres, level) <
ocfs2_downconvert_level(lockres, lockres->l_blocking))
needs_downconvert = 1;
lockres->l_blocking = level;
@@ -552,6 +608,16 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
return needs_downconvert;
}
static void set_lock_blocking(struct ocfs2_lock_res *lockres, int level)
{
struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
int needs_downconvert;
needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
if (needs_downconvert)
ocfs2_schedule_blocked_lock(osb, lockres);
}
/*
* OCFS2_LOCK_PENDING and l_pending_gen.
*
@@ -936,7 +1002,7 @@ static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lock
{
BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
return wanted <= ocfs2_downconvert_level(lockres, lockres->l_blocking);
}
/* the caller doesn't have to wait on a blocked lock if their wanted level
@@ -945,7 +1011,7 @@ static inline int lockres_allow_recursion(struct ocfs2_lock_res *lockres,
int wanted)
{
return (lockres->l_ops->flags & LOCK_TYPE_RECURSIVE) &&
wanted <= lockres->l_level &&
levels_compat(lockres, wanted) &&
lockres_has_holders(lockres, H_ANY);
}
@@ -1028,6 +1094,20 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
}
#endif
static inline int cw_incompat_convert(struct ocfs2_lock_res *lockres,
int level)
{
/* Have CW, want PR/EX */
if (lockres->l_level == DLM_LOCK_CW &&
(level == DLM_LOCK_PR || level == DLM_LOCK_EX))
return 1;
/* Have EX/PR, want CW */
if (level == DLM_LOCK_CW &&
(lockres->l_level == DLM_LOCK_PR || lockres->l_level == DLM_LOCK_EX))
return 1;
return 0;
}
static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
@@ -1073,7 +1153,7 @@ again:
* here. If the lock is blocked waiting on a downconvert,
* we'll get caught below. */
if (lockres->l_flags & OCFS2_LOCK_BUSY &&
level > lockres->l_level) {
!levels_compat(lockres, level)) {
/* is someone sitting in dlm_lock? If so, wait on
* them. */
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
@@ -1096,7 +1176,7 @@ again:
* OCFS2_LOCK_BLOCKED check to ensure that there is no pending
* downconvert request.
*/
if (level <= lockres->l_level)
if (levels_compat(lockres, level))
goto update_holders;
}
@@ -1110,6 +1190,23 @@ again:
goto unlock;
}
/*
* Convert from PR/EX to CW and vice-versa. Those levels are
* not compatible with each other. As a result, we have to
* wait for holders on the lock to drain. The easiest way to
* do this is by forcing a downconvert. We can then allow the
* process to come back and reacquire the lock at the correct
* level.
*/
if (cw_incompat_convert(lockres, level)) {
/* ocfs2_unblock_lock will drop to NL, then we can upconvert. */
set_lock_blocking(lockres, DLM_LOCK_EX);
lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
wait = 1;
goto unlock;
}
/* NL->Anything, PR->EX conditions are handled here */
if (level > lockres->l_level) {
if (noqueue_attempted > 0) {
ret = -EAGAIN;
@@ -1504,7 +1601,11 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
kick = 1;
break;
case DLM_LOCK_PR:
if (!lockres_has_holders(lockres, H_EX))
if (!lockres_has_holders(lockres, H_EX|H_CW))
kick = 1;
break;
case DLM_LOCK_CW:
if (!lockres_has_holders(lockres, H_EX|H_PR))
kick = 1;
break;
default:
@@ -1740,22 +1841,30 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
seq_printf(m, "0x%x\t", lvb[i]);
#ifdef CONFIG_OCFS2_FS_STATS
# define lock_num_cwmode(_l) ((_l)->l_lock_cwmode.ls_gets)
# define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets)
# define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets)
# define lock_num_cwmode_failed(_l) ((_l)->l_lock_cwmode.ls_fail)
# define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail)
# define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail)
# define lock_total_cwmode(_l) ((_l)->l_lock_cwmode.ls_total)
# define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total)
# define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total)
# define lock_max_cwmode(_l) ((_l)->l_lock_cwmode.ls_max)
# define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max)
# define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max)
# define lock_refresh(_l) ((_l)->l_lock_refresh)
#else
# define lock_num_cwmode(_l) (0)
# define lock_num_prmode(_l) (0)
# define lock_num_exmode(_l) (0)
# define lock_num_cwmode_failed(_l) (0)
# define lock_num_prmode_failed(_l) (0)
# define lock_num_exmode_failed(_l) (0)
# define lock_total_cwmode(_l) (0ULL)
# define lock_total_prmode(_l) (0ULL)
# define lock_total_exmode(_l) (0ULL)
# define lock_max_cwmode(_l) (0)
# define lock_max_prmode(_l) (0)
# define lock_max_exmode(_l) (0)
# define lock_refresh(_l) (0)
@@ -2344,18 +2453,31 @@ recheck:
* then requeue. */
if (lockres->l_blocking == DLM_LOCK_EX &&
lockres_has_holders(lockres, H_ANY)) {
mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
mlog(ML_BASTS, "lockres %s, ReQ: EX/PR/CW Holders %u,%u\n",
lockres->l_name, lockres->l_ex_holders,
lockres->l_ro_holders);
lockres->l_ro_holders, lockres->l_cw_holders);
goto leave_requeue;
}
/* If it's a PR we're blocking, then only
* requeue if we've got any EX holders */
* requeue if we've got any EX or CW holders */
if (lockres->l_blocking == DLM_LOCK_PR &&
lockres_has_holders(lockres, H_EX)) {
mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
lockres->l_name, lockres->l_ex_holders);
lockres_has_holders(lockres, H_CW|H_EX)) {
mlog(ML_BASTS, "lockres %s, ReQ: EX/CW Holders %u,%u\n",
lockres->l_name, lockres->l_ex_holders,
lockres->l_cw_holders);
goto leave_requeue;
}
/*
* Same logic as above, we're checking for any holders that
* are incompatible with CW.
*/
if (lockres->l_blocking == DLM_LOCK_CW
&& lockres_has_holders(lockres, H_EX|H_PR)) {
mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
lockres->l_name, lockres->l_ex_holders,
lockres->l_ro_holders);
goto leave_requeue;
}
@@ -2370,7 +2492,7 @@ recheck:
goto leave_requeue;
}
new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
new_level = ocfs2_downconvert_level(lockres, lockres->l_blocking);
if (lockres->l_ops->check_downconvert
&& !lockres->l_ops->check_downconvert(lockres, new_level)) {

View File

@@ -107,6 +107,7 @@ struct ocfs2_lock_res {
unsigned long l_flags;
char l_name[OCFS2_LOCK_ID_MAX_LEN];
unsigned int l_ro_holders;
unsigned int l_cw_holders;
unsigned int l_ex_holders;
signed char l_level;
signed char l_requested;
@@ -131,6 +132,7 @@ struct ocfs2_lock_res {
struct ocfs2_lock_stats l_lock_prmode; /* PR mode stats */
u32 l_lock_refresh; /* Disk refreshes */
struct ocfs2_lock_stats l_lock_exmode; /* EX mode stats */
struct ocfs2_lock_stats l_lock_cwmode; /* CW mode stats */
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map l_lockdep_map;