scst_dlm: Fix use-after-free by waiting for lkb destruction

When scst_dlm_unlock_wait() timed out, the caller could proceed to
free the containing storage of @lksb while DLM still held a reference
to it in lkb->lkb_lksb. A subsequently delivered AST -- e.g. a
recovery-synthesized UNLOCK_REPLY for a departed peer -- would then
write sb_status into freed memory.

Make scst_dlm_remove_lock() honor its contract on every path: after
the convert-to-NL clean-release step, issue
dlm_unlock(DLM_LKF_FORCEUNLOCK) and loop on the completion until a
destroying CAST (sb_status == -DLM_EUNLOCK) is observed. FORCEUNLOCK
ensures that such a CAST arrives even if an earlier operation is
still in flight or a peer is departing, since
dlm_recover_waiters_pre() synthesizes the reply in the latter case.
Non-destroying CASTs from previously canceled converts are logged
and the wait continues.

Route the per-registrant teardown (scst_dlm_pr_rm_reg_ls), the
remote-UA teardown (scst_dlm_rm_rem_ua_ls), and the remaining
stack-allocated lksb teardown sites through scst_dlm_remove_lock()
so they all observe the destroying CAST before their storage may
be reused. As a side benefit, those sites gain the convert-to-NL
step that preserves LVB validity on peers per the DLM EX/PW
release rule.
This commit is contained in:
Brian M
2026-05-20 12:38:32 -07:00
committed by Gleb Chesnokov
parent e2c57de2d5
commit 67d2469c9b

View File

@@ -68,6 +68,8 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d
dlm_lockspace_t *ls,
struct scst_dev_registrant *reg,
int index);
static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb,
const char *name);
static inline void compile_time_size_checks(void)
{
@@ -141,29 +143,58 @@ out:
}
/*
* scst_dlm_unlock_wait - Discard a DLM lock
* scst_dlm_force_unlock_and_drain - Destroy the lkb backing @lksb and wait
* until the destroying CAST has been observed.
*
* After this returns, DLM no longer holds a reference to @lksb: no further
* AST can fire for it, so the containing storage may be freed.
*
* DLM_LKF_FORCEUNLOCK guarantees that a destroying CAST with sb_status
* -DLM_EUNLOCK will arrive even if an earlier operation is still in flight
* or a peer is departing (recovery synthesizes the reply in that case via
* dlm_recover_waiters_pre()). Other (non-destroying) CASTs that may have
* been pending — e.g. a stale reply for a canceled convert — are logged
* and the wait continues.
*
* The 60s watchdog logs progress but never returns early. The only
* "hang forever" case is a permanently broken lockspace; md-cluster and
* gfs2 accept the same constraint.
*/
static int scst_dlm_unlock_wait(dlm_lockspace_t *ls, struct scst_lksb *lksb)
static void scst_dlm_force_unlock_and_drain(dlm_lockspace_t *ls,
struct scst_lksb *lksb,
const char *name)
{
unsigned long start = jiffies;
u32 lkid = lksb->lksb.sb_lkid;
int res;
sBUG_ON(!ls);
init_completion(&lksb->compl);
res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
if (res < 0)
goto out;
res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
if (res > 0) {
res = lksb->lksb.sb_status;
if (res == -DLM_EUNLOCK || res == -DLM_ECANCEL)
res = 0;
} else if (res == 0) {
res = -ETIMEDOUT;
res = dlm_unlock(ls, lkid, DLM_LKF_FORCEUNLOCK, &lksb->lksb, lksb);
if (res < 0) {
PRINT_ERROR("dlm_unlock(FORCEUNLOCK, lkid=%08x, name=%s) submission failed: %d",
lkid, name ? : "?", res);
return;
}
out:
return res;
while (true) {
long wait = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
if (wait > 0) {
int status = lksb->lksb.sb_status;
if (status == -DLM_EUNLOCK)
return;
PRINT_WARNING("stale CAST (lkid=%08x name=%s status=%d %ums)",
lkid, name ? : "?", status,
jiffies_to_msecs(jiffies - start));
continue;
}
PRINT_WARNING("draining FORCEUNLOCK (lkid=%08x name=%s, %ums)",
lkid, name ? : "?",
jiffies_to_msecs(jiffies - start));
}
}
/* Number of persistent reservation registrants. */
@@ -197,8 +228,6 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls,
struct scst_dev_registrant *reg,
bool include_rem_ua)
{
int res;
if (include_rem_ua) {
struct scst_dlm_rem_ua *ua, *tmp;
@@ -217,10 +246,7 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls,
if (!reg->lksb.lksb.sb_lkid)
return;
res = scst_dlm_unlock_wait(ls, &reg->lksb);
WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)",
reg->lksb.lksb.sb_lkid, res);
reg->lksb.lksb.sb_lkid = 0;
scst_dlm_remove_lock(ls, &reg->lksb, NULL);
reg->dlm_idx = -1;
}
@@ -393,9 +419,12 @@ static int scst_copy_from_dlm(struct scst_device *dev, dlm_lockspace_t *ls,
lvb->pr_type);
}
} else {
PRINT_ERROR("pr_add_registrant %s." PR_REG_LOCK
" failed\n", dev->virt_name, i);
scst_dlm_unlock_wait(ls, &reg_lksb[i]);
char reg_name[32];
snprintf(reg_name, sizeof(reg_name), PR_REG_LOCK, i);
PRINT_ERROR("pr_add_registrant %s.%s failed\n",
dev->virt_name, reg_name);
scst_dlm_remove_lock(ls, &reg_lksb[i], reg_name);
continue;
}
@@ -444,8 +473,7 @@ out:
cancel:
for (i = 0; i < nr_registrants; i++)
if (reg_lksb[i].lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &reg_lksb[i]);
scst_dlm_remove_lock(ls, &reg_lksb[i], NULL);
goto out;
}
@@ -811,21 +839,43 @@ static void scst_pr_toggle_lock(struct scst_pr_dlm_data *pr_dlm,
pr_dlm->dev->virt_name, lock_name, res);
if (!lksb.lksb.sb_lkid)
continue;
scst_dlm_lock_wait(ls, DLM_LOCK_NL, &lksb,
DLM_LKF_CONVERT, lock_name, NULL);
scst_dlm_unlock_wait(ls, &lksb);
scst_dlm_remove_lock(ls, &lksb, lock_name);
}
}
/* Remove a lock from the local DLM lockspace instance. */
/*
* scst_dlm_remove_lock - Remove a lock from the local DLM lockspace instance.
*
* Contract: when this function returns, the lkb has been destroyed and DLM
* no longer holds a reference to @lksb. No further AST can fire for it, so
* the containing storage may be freed.
*
* The clean-release idiom for a lock that may be held in EX or PW is to
* convert to NL first (signals "I'm done writing; LVB state is settled")
* and then unlock. The convert is best-effort: on failure we proceed
* to a FORCEUNLOCK regardless, so the contract is still met. The final
* scst_dlm_force_unlock_and_drain() waits for the destroying CAST before
* returning.
*/
static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb,
const char *name)
{
unsigned long start;
int res;
if (!lksb->lksb.sb_lkid)
return;
scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name,
NULL);
scst_dlm_unlock_wait(ls, lksb);
start = jiffies;
res = scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name,
NULL);
if (res < 0)
PRINT_WARNING("convert-to-NL failed (lkid=%08x, name=%s, res=%d, %ums)",
lksb->lksb.sb_lkid, name ? : "?", res,
jiffies_to_msecs(jiffies - start));
scst_dlm_force_unlock_and_drain(ls, lksb, name);
lksb->lksb.sb_lkid = 0;
}
@@ -1066,7 +1116,7 @@ static dlm_lockspace_t *get_lockspace(struct scst_device *dev)
else if (modified_lvb)
scst_trigger_reread_lvb(pr_dlm, ls);
scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);
/*
* Only store the lockspace pointer in pr_dlm->ls after the lockspace
@@ -1153,7 +1203,7 @@ static void scst_dlm_pr_write_unlock(struct scst_device *dev,
DLM_LKF_CONVERT | DLM_LKF_VALBLK, PR_DATA_LOCK,
NULL);
scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK);
scst_dlm_unlock_wait(ls, pr_lksb);
scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK);
}
static bool scst_dlm_reserved(struct scst_device *dev)
@@ -1214,7 +1264,7 @@ static void scst_dlm_res_unlock(struct scst_device *dev,
scst_pr_toggle_lock(pr_dlm, ls, PR_PRE_UPDATE_LOCK);
scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK);
}
scst_dlm_unlock_wait(ls, pr_lksb);
scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK);
}
static bool scst_dlm_is_rsv_holder(struct scst_device *dev,
@@ -1438,8 +1488,7 @@ static void scst_reread_lvb_work(struct work_struct *work)
NULL);
if (res >= 0)
scst_trigger_reread_lvb(pr_dlm, ls);
if (pr_lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);
unlock_ls:
mutex_unlock(&pr_dlm->ls_mutex);
@@ -1462,8 +1511,7 @@ static void scst_lvb_upd_work(struct work_struct *work)
res = scst_dlm_lock_wait(ls, DLM_LOCK_EX, &lksb, 0, PR_LOCK, NULL);
if (res >= 0)
scst_trigger_lvb_update(pr_dlm, ls);
if (lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &lksb);
scst_dlm_remove_lock(ls, &lksb, PR_LOCK);
unlock_ls:
mutex_unlock(&pr_dlm->ls_mutex);
@@ -1588,7 +1636,7 @@ static void scst_pr_dlm_cleanup(struct scst_device *dev)
mutex_lock(&pr_dlm->ls_mutex);
scst_dlm_lock_wait(ls, DLM_LOCK_EX, &pr_lksb, 0, PR_LOCK, NULL);
scst_dlm_remove_locks(pr_dlm, ls);
scst_dlm_unlock_wait(ls, &pr_lksb);
scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK);
pr_dlm->ls = NULL;
mutex_unlock(&pr_dlm->ls_mutex);
@@ -1735,18 +1783,15 @@ out:
}
/*
* scst_dlm_rm_rem_ua_ls - Unlock the lock associated with a remote unit attention
* scst_dlm_rm_rem_ua_ls - Remove the DLM lock associated with a remote unit
* attention. After this returns, no further AST can fire for &ua->lksb, so
* the rem_ua may be freed.
*/
static void scst_dlm_rm_rem_ua_ls(dlm_lockspace_t *ls, struct scst_dlm_rem_ua *ua)
{
int res;
if (!ua->lksb.lksb.sb_lkid)
return;
res = scst_dlm_unlock_wait(ls, &ua->lksb);
WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)",
ua->lksb.lksb.sb_lkid, res);
ua->lksb.lksb.sb_lkid = 0;
scst_dlm_remove_lock(ls, &ua->lksb, NULL);
}
/*
@@ -1805,8 +1850,7 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d
return ua;
unlock_cancel:
if (ua->lksb.lksb.sb_lkid)
scst_dlm_unlock_wait(ls, &ua->lksb);
scst_dlm_rm_rem_ua_ls(ls, ua);
cancel:
scst_dlm_free_rem_ua(ua);