diff --git a/scst/src/scst_dlm.c b/scst/src/scst_dlm.c index 0578ae5cf..a3f00acaf 100644 --- a/scst/src/scst_dlm.c +++ b/scst/src/scst_dlm.c @@ -68,6 +68,8 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d dlm_lockspace_t *ls, struct scst_dev_registrant *reg, int index); +static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb, + const char *name); static inline void compile_time_size_checks(void) { @@ -141,29 +143,58 @@ out: } /* - * scst_dlm_unlock_wait - Discard a DLM lock + * scst_dlm_force_unlock_and_drain - Destroy the lkb backing @lksb and wait + * until the destroying CAST has been observed. + * + * After this returns, DLM no longer holds a reference to @lksb: no further + * AST can fire for it, so the containing storage may be freed. + * + * DLM_LKF_FORCEUNLOCK guarantees that a destroying CAST with sb_status + * -DLM_EUNLOCK will arrive even if an earlier operation is still in flight + * or a peer is departing (recovery synthesizes the reply in that case via + * dlm_recover_waiters_pre()). Other (non-destroying) CASTs that may have + * been pending — e.g. a stale reply for a canceled convert — are logged + * and the wait continues. + * + * The 60s watchdog logs progress but never returns early. The only + * "hang forever" case is a permanently broken lockspace; md-cluster and + * gfs2 accept the same constraint. */ -static int scst_dlm_unlock_wait(dlm_lockspace_t *ls, struct scst_lksb *lksb) +static void scst_dlm_force_unlock_and_drain(dlm_lockspace_t *ls, + struct scst_lksb *lksb, + const char *name) { + unsigned long start = jiffies; + u32 lkid = lksb->lksb.sb_lkid; int res; - sBUG_ON(!ls); - init_completion(&lksb->compl); - res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb); - if (res < 0) - goto out; - res = wait_for_completion_timeout(&lksb->compl, 60 * HZ); - if (res > 0) { - res = lksb->lksb.sb_status; - if (res == -DLM_EUNLOCK || res == -DLM_ECANCEL) - res = 0; - } else if (res == 0) { - res = -ETIMEDOUT; + res = dlm_unlock(ls, lkid, DLM_LKF_FORCEUNLOCK, &lksb->lksb, lksb); + if (res < 0) { + PRINT_ERROR("dlm_unlock(FORCEUNLOCK, lkid=%08x, name=%s) submission failed: %d", + lkid, name ? : "?", res); + return; } -out: - return res; + while (true) { + long wait = wait_for_completion_timeout(&lksb->compl, 60 * HZ); + + if (wait > 0) { + int status = lksb->lksb.sb_status; + + if (status == -DLM_EUNLOCK) + return; + + PRINT_WARNING("stale CAST (lkid=%08x name=%s status=%d %ums)", + lkid, name ? : "?", status, + jiffies_to_msecs(jiffies - start)); + continue; + } + + PRINT_WARNING("draining FORCEUNLOCK (lkid=%08x name=%s, %ums)", + lkid, name ? : "?", + jiffies_to_msecs(jiffies - start)); + } } /* Number of persistent reservation registrants. */ @@ -197,8 +228,6 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls, struct scst_dev_registrant *reg, bool include_rem_ua) { - int res; - if (include_rem_ua) { struct scst_dlm_rem_ua *ua, *tmp; @@ -217,10 +246,7 @@ static void scst_dlm_pr_rm_reg_ls(dlm_lockspace_t *ls, if (!reg->lksb.lksb.sb_lkid) return; - res = scst_dlm_unlock_wait(ls, ®->lksb); - WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)", - reg->lksb.lksb.sb_lkid, res); - reg->lksb.lksb.sb_lkid = 0; + scst_dlm_remove_lock(ls, ®->lksb, NULL); reg->dlm_idx = -1; } @@ -393,9 +419,12 @@ static int scst_copy_from_dlm(struct scst_device *dev, dlm_lockspace_t *ls, lvb->pr_type); } } else { - PRINT_ERROR("pr_add_registrant %s." PR_REG_LOCK - " failed\n", dev->virt_name, i); - scst_dlm_unlock_wait(ls, ®_lksb[i]); + char reg_name[32]; + + snprintf(reg_name, sizeof(reg_name), PR_REG_LOCK, i); + PRINT_ERROR("pr_add_registrant %s.%s failed\n", + dev->virt_name, reg_name); + scst_dlm_remove_lock(ls, ®_lksb[i], reg_name); continue; } @@ -444,8 +473,7 @@ out: cancel: for (i = 0; i < nr_registrants; i++) - if (reg_lksb[i].lksb.sb_lkid) - scst_dlm_unlock_wait(ls, ®_lksb[i]); + scst_dlm_remove_lock(ls, ®_lksb[i], NULL); goto out; } @@ -811,21 +839,43 @@ static void scst_pr_toggle_lock(struct scst_pr_dlm_data *pr_dlm, pr_dlm->dev->virt_name, lock_name, res); if (!lksb.lksb.sb_lkid) continue; - scst_dlm_lock_wait(ls, DLM_LOCK_NL, &lksb, - DLM_LKF_CONVERT, lock_name, NULL); - scst_dlm_unlock_wait(ls, &lksb); + scst_dlm_remove_lock(ls, &lksb, lock_name); } } -/* Remove a lock from the local DLM lockspace instance. */ +/* + * scst_dlm_remove_lock - Remove a lock from the local DLM lockspace instance. + * + * Contract: when this function returns, the lkb has been destroyed and DLM + * no longer holds a reference to @lksb. No further AST can fire for it, so + * the containing storage may be freed. + * + * The clean-release idiom for a lock that may be held in EX or PW is to + * convert to NL first (signals "I'm done writing; LVB state is settled") + * and then unlock. The convert is best-effort: on failure we proceed + * to a FORCEUNLOCK regardless, so the contract is still met. The final + * scst_dlm_force_unlock_and_drain() waits for the destroying CAST before + * returning. + */ static void scst_dlm_remove_lock(dlm_lockspace_t *ls, struct scst_lksb *lksb, const char *name) { + unsigned long start; + int res; + if (!lksb->lksb.sb_lkid) return; - scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name, - NULL); - scst_dlm_unlock_wait(ls, lksb); + + start = jiffies; + + res = scst_dlm_lock_wait(ls, DLM_LOCK_NL, lksb, DLM_LKF_CONVERT, name, + NULL); + if (res < 0) + PRINT_WARNING("convert-to-NL failed (lkid=%08x, name=%s, res=%d, %ums)", + lksb->lksb.sb_lkid, name ? : "?", res, + jiffies_to_msecs(jiffies - start)); + + scst_dlm_force_unlock_and_drain(ls, lksb, name); lksb->lksb.sb_lkid = 0; } @@ -1066,7 +1116,7 @@ static dlm_lockspace_t *get_lockspace(struct scst_device *dev) else if (modified_lvb) scst_trigger_reread_lvb(pr_dlm, ls); - scst_dlm_unlock_wait(ls, &pr_lksb); + scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK); /* * Only store the lockspace pointer in pr_dlm->ls after the lockspace @@ -1153,7 +1203,7 @@ static void scst_dlm_pr_write_unlock(struct scst_device *dev, DLM_LKF_CONVERT | DLM_LKF_VALBLK, PR_DATA_LOCK, NULL); scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK); - scst_dlm_unlock_wait(ls, pr_lksb); + scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK); } static bool scst_dlm_reserved(struct scst_device *dev) @@ -1214,7 +1264,7 @@ static void scst_dlm_res_unlock(struct scst_device *dev, scst_pr_toggle_lock(pr_dlm, ls, PR_PRE_UPDATE_LOCK); scst_pr_toggle_lock(pr_dlm, ls, PR_POST_UPDATE_LOCK); } - scst_dlm_unlock_wait(ls, pr_lksb); + scst_dlm_remove_lock(ls, pr_lksb, PR_LOCK); } static bool scst_dlm_is_rsv_holder(struct scst_device *dev, @@ -1438,8 +1488,7 @@ static void scst_reread_lvb_work(struct work_struct *work) NULL); if (res >= 0) scst_trigger_reread_lvb(pr_dlm, ls); - if (pr_lksb.lksb.sb_lkid) - scst_dlm_unlock_wait(ls, &pr_lksb); + scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK); unlock_ls: mutex_unlock(&pr_dlm->ls_mutex); @@ -1462,8 +1511,7 @@ static void scst_lvb_upd_work(struct work_struct *work) res = scst_dlm_lock_wait(ls, DLM_LOCK_EX, &lksb, 0, PR_LOCK, NULL); if (res >= 0) scst_trigger_lvb_update(pr_dlm, ls); - if (lksb.lksb.sb_lkid) - scst_dlm_unlock_wait(ls, &lksb); + scst_dlm_remove_lock(ls, &lksb, PR_LOCK); unlock_ls: mutex_unlock(&pr_dlm->ls_mutex); @@ -1588,7 +1636,7 @@ static void scst_pr_dlm_cleanup(struct scst_device *dev) mutex_lock(&pr_dlm->ls_mutex); scst_dlm_lock_wait(ls, DLM_LOCK_EX, &pr_lksb, 0, PR_LOCK, NULL); scst_dlm_remove_locks(pr_dlm, ls); - scst_dlm_unlock_wait(ls, &pr_lksb); + scst_dlm_remove_lock(ls, &pr_lksb, PR_LOCK); pr_dlm->ls = NULL; mutex_unlock(&pr_dlm->ls_mutex); @@ -1735,18 +1783,15 @@ out: } /* - * scst_dlm_rm_rem_ua_ls - Unlock the lock associated with a remote unit attention + * scst_dlm_rm_rem_ua_ls - Remove the DLM lock associated with a remote unit + * attention. After this returns, no further AST can fire for &ua->lksb, so + * the rem_ua may be freed. */ static void scst_dlm_rm_rem_ua_ls(dlm_lockspace_t *ls, struct scst_dlm_rem_ua *ua) { - int res; - if (!ua->lksb.lksb.sb_lkid) return; - res = scst_dlm_unlock_wait(ls, &ua->lksb); - WARN(res < 0, "scst_dlm_unlock_wait(%08x) failed (%d)", - ua->lksb.lksb.sb_lkid, res); - ua->lksb.lksb.sb_lkid = 0; + scst_dlm_remove_lock(ls, &ua->lksb, NULL); } /* @@ -1805,8 +1850,7 @@ static struct scst_dlm_rem_ua *scst_dlm_pr_reg_recv_rem_ua(struct scst_device *d return ua; unlock_cancel: - if (ua->lksb.lksb.sb_lkid) - scst_dlm_unlock_wait(ls, &ua->lksb); + scst_dlm_rm_rem_ua_ls(ls, ua); cancel: scst_dlm_free_rem_ua(ua);