Made sure that ib_srp initiator cannot lock up even when its I/O depth > 1.

git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1867 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-07-23 12:23:39 +00:00
parent baa4b4ee29
commit c7472c2aa6
2 changed files with 81 additions and 49 deletions

View File

@@ -1069,11 +1069,27 @@ out:
static int srpt_req_lim_delta(struct srpt_rdma_ch *ch)
{
int req_lim;
int last_rsp_req_lim;
unsigned long flags;
int res;
req_lim = atomic_read(&ch->req_lim);
last_rsp_req_lim = atomic_xchg(&ch->last_response_req_lim, req_lim);
return req_lim - last_rsp_req_lim;
spin_lock_irqsave(&ch->cred_lock, flags);
res = req_lim - ch->last_response_req_lim;
ch->last_response_req_lim = req_lim;
spin_unlock_irqrestore(&ch->cred_lock, flags);
return res;
}
/**
* srpt_undo_req_lim_delta() - Undo the side effect of srpt_req_lim_delta().
*/
static void srpt_undo_req_lim_delta(struct srpt_rdma_ch *ch, int delta)
{
unsigned long flags;
spin_lock_irqsave(&ch->cred_lock, flags);
ch->last_response_req_lim -= delta;
spin_unlock_irqrestore(&ch->cred_lock, flags);
}
/**
@@ -1672,7 +1688,7 @@ err:
send_rsp_res = srpt_post_send(ch, ioctx, len);
if (send_rsp_res) {
PRINT_ERROR("%s", "Sending SRP_RSP response failed.");
atomic_sub(req_lim_delta, &ch->last_response_req_lim);
srpt_undo_req_lim_delta(ch, req_lim_delta - 1);
}
}
if (send_rsp_res) {
@@ -1775,12 +1791,6 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
PRINT_ERROR("req_lim = %d < 0", req_lim);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
srpt_handle_new_iu(ch, ioctx);
#if defined(CONFIG_SCST_DEBUG)
if (interrupt_processing_delay_in_us
<= MAX_UDELAY_MS * 1000)
udelay(interrupt_processing_delay_in_us);
#endif
}
} while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS) > 0);
@@ -2496,7 +2506,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
cpu_to_be16(SRP_BUF_FORMAT_DIRECT | SRP_BUF_FORMAT_INDIRECT);
rsp->req_lim_delta = cpu_to_be32(SRPT_RQ_SIZE);
atomic_set(&ch->req_lim, SRPT_RQ_SIZE);
atomic_set(&ch->last_response_req_lim, SRPT_RQ_SIZE);
spin_lock_init(&ch->cred_lock);
ch->last_response_req_lim = SRPT_RQ_SIZE;
/* create cm reply */
rep_param->qp_num = ch->qp->qp_num;
@@ -3074,36 +3085,64 @@ out:
/**
* srpt_must_wait_for_cred() - Whether or not the target must wait with
* sending a response towards the initiator in order to avoid that the
* initiator locks up. The Linux SRP initiator locks up when:
* initiator.req_lim < req_lim_min (req_lim_min equals 2 for SRP_CMD and
* equals 1 for SRP_TSK_MGMT), no new SRP_RSP will be received by the
* initiator, or none of the received SRP_RSP responses increases
* initiator locks up. The Linux SRP initiator locks up when
* initiator.req_lim <= req_lim_min (req_lim_min equals 1 for SRP_CMD and
* equals 0 for SRP_TSK_MGMT) and either no new SRP_RSP will be received by the
* initiator or none of the received SRP_RSP responses increases
* initiator.req_lim. One possible strategy to avoid an initiator lockup is
* that the target does not send an SRP_RSP that makes initiator.req_lim <
* that the target does not send an SRP_RSP that makes initiator.req_lim <=
* req_lim_min. While the target does not know the value of initiator.req_lim,
* one can deduce from the credit mechanism specified in the SRP standard that
* when target.req_lim == req_lim_min - 1, initiator.req_lim must also equal
* req_lim_min - 1. Hence wait with sending a response when target.req_lim <
* when target.req_lim == req_lim_min, initiator.req_lim must also equal
* req_lim_min. Hence wait with sending a response when target.req_lim <=
* req_lim_min if that response would not increase initiator.req_lim. The last
* condition is equivalent to srpt_req_lim_delta(ch) + 1 <= 0, which is also
* equivalent to ch->req_lim - ch->last_response_req_lim + 1 <= 0.
* condition is equivalent to srpt_req_lim_delta(ch) + 1 <= 0.
*
* See also: For more information about how to reproduce the initiator lockup,
* see also http://bugzilla.kernel.org/show_bug.cgi?id=14235.
*
* Note: The constant 'compensate_for_incoming_requests' compensates for the
* fact that if the initiator uses an I/O depth that is larger than one,
* req_lim may have decreased towards req_lim_min after this function returned
* and before the initiator has received and processed the SRP_RSP whose
* REQUEST LIMIT DELTA field will have been set to *req_lim_delta.
*/
static bool srpt_must_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min)
static bool srpt_must_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min,
int *req_lim_delta)
{
int req_lim;
req_lim = atomic_read(&ch->req_lim);
int delta;
unsigned long flags;
bool res;
enum { compensate_for_incoming_requests = 1 };
return req_lim < req_lim_min
&& req_lim - atomic_read(&ch->last_response_req_lim) + 1 <= 0;
EXTRACHECKS_BUG_ON(!req_lim_delta);
res = true;
req_lim = atomic_read(&ch->req_lim);
if (req_lim > req_lim_min + compensate_for_incoming_requests) {
res = false;
*req_lim_delta = srpt_req_lim_delta(ch);
} else {
spin_lock_irqsave(&ch->cred_lock, flags);
delta = req_lim - ch->last_response_req_lim;
if (delta + 1 > 0) {
res = false;
ch->last_response_req_lim = req_lim;
*req_lim_delta = delta;
}
spin_unlock_irqrestore(&ch->cred_lock, flags);
}
return res;
}
static void srpt_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min)
static int srpt_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min)
{
while (unlikely(srpt_must_wait_for_cred(ch, req_lim_min)))
int delta;
while (unlikely(srpt_must_wait_for_cred(ch, req_lim_min, &delta)))
schedule();
return delta;
}
/**
@@ -3134,12 +3173,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd)
goto out;
}
if (scst_cmd_atomic(scmnd) && srpt_must_wait_for_cred(ch, 2)) {
ret = SCST_TGT_RES_NEED_THREAD_CTX;
goto out;
}
srpt_wait_for_cred(ch, 2);
EXTRACHECKS_BUG_ON(scst_cmd_atomic(scmnd));
new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_CMD_RSP_SENT);
WARN_ON(new_state == SRPT_STATE_DONE);
@@ -3148,13 +3182,6 @@ static int srpt_xmit_response(struct scst_cmd *scmnd)
ret = SCST_TGT_RES_SUCCESS;
req_lim_delta = srpt_req_lim_delta(ch) + 1;
resp_len = srpt_build_cmd_rsp(ch, ioctx, req_lim_delta,
scst_cmd_get_tag(scmnd),
scst_cmd_get_status(scmnd),
scst_cmd_get_sense_buffer(scmnd),
scst_cmd_get_sense_buffer_len(scmnd));
/* For read commands, transfer the data to the initiator. */
if (dir == SCST_DATA_READ && scst_cmd_get_resp_data_len(scmnd)) {
ret = srpt_xfer_data(ch, ioctx, scmnd);
@@ -3167,6 +3194,14 @@ static int srpt_xmit_response(struct scst_cmd *scmnd)
}
}
req_lim_delta = srpt_wait_for_cred(ch, 1) + 1;
resp_len = srpt_build_cmd_rsp(ch, ioctx, req_lim_delta,
scst_cmd_get_tag(scmnd),
scst_cmd_get_status(scmnd),
scst_cmd_get_sense_buffer(scmnd),
scst_cmd_get_sense_buffer_len(scmnd));
if (srpt_post_send(ch, ioctx, resp_len)) {
srpt_unmap_sg_to_ib_sge(ch, ioctx);
srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
@@ -3174,7 +3209,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd)
PRINT_ERROR("%s[%d]: ch->state= %d tag= %lld",
__func__, __LINE__, atomic_read(&ch->state),
(unsigned long long)scst_cmd_get_tag(scmnd));
atomic_sub(req_lim_delta, &ch->last_response_req_lim);
srpt_undo_req_lim_delta(ch, req_lim_delta - 1);
ret = SCST_TGT_RES_QUEUE_FULL;
}
@@ -3212,12 +3247,11 @@ static void srpt_tsk_mgmt_done(struct scst_mgmt_cmd *mcmnd)
WARN_ON(in_irq());
srpt_wait_for_cred(ch, 1);
new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_MGMT_RSP_SENT);
WARN_ON(new_state == SRPT_STATE_DONE);
req_lim_delta = srpt_req_lim_delta(ch) + 1;
req_lim_delta = srpt_wait_for_cred(ch, 0) + 1;
rsp_len = srpt_build_tskmgmt_rsp(ch, ioctx, req_lim_delta,
(scst_mgmt_cmd_get_status(mcmnd) ==
SCST_MGMT_STATUS_SUCCESS) ?
@@ -3232,7 +3266,7 @@ static void srpt_tsk_mgmt_done(struct scst_mgmt_cmd *mcmnd)
*/
if (srpt_post_send(ch, ioctx, rsp_len)) {
PRINT_ERROR("%s", "Sending SRP_RSP response failed.");
atomic_sub(req_lim_delta, &ch->last_response_req_lim);
srpt_undo_req_lim_delta(ch, req_lim_delta - 1);
}
scst_mgmt_cmd_set_tgt_priv(mcmnd, NULL);
@@ -3855,20 +3889,17 @@ static int __init srpt_init_module(void)
/* IRQ context */
srpt_context = SCST_CONTEXT_TASKLET;
srpt_template.rdy_to_xfer_atomic = true;
srpt_template.xmit_response_atomic = true;
break;
case MODE_SINGLE_THREADED:
/* single kernel thread (created by ib_srpt) */
srpt_context = SCST_CONTEXT_DIRECT;
srpt_template.rdy_to_xfer_atomic = false;
srpt_template.xmit_response_atomic = false;
break;
case MODE_MULTITHREADED:
default:
/* multiple kernel threads (created by the SCST core) */
srpt_context = SCST_CONTEXT_THREAD;
srpt_template.rdy_to_xfer_atomic = false;
srpt_template.xmit_response_atomic = false;
break;
}

View File

@@ -38,7 +38,6 @@
#include <linux/version.h>
#include <linux/types.h>
#include <linux/list.h>
#include <linux/mutex.h>
#include <rdma/ib_verbs.h>
#include <rdma/ib_sa.h>
@@ -227,6 +226,7 @@ enum rdma_ch_state {
* @req_lim: request limit: maximum number of requests that may be sent
* by the initiator without having received a response or
* SRP_CRED_REQ.
* @cred_lock: protects last_response_req_lim.
* @last_response_req_lim: value of req_lim the last time a response or
* SRP_CRED_REQ was sent.
* @state: channel state. See also enum rdma_ch_state.
@@ -249,7 +249,8 @@ struct srpt_rdma_ch {
u8 t_port_id[16];
int max_ti_iu_len;
atomic_t req_lim;
atomic_t last_response_req_lim;
spinlock_t cred_lock;
int32_t last_response_req_lim;
atomic_t state;
struct list_head list;
struct list_head cmd_wait_list;