From c7472c2aa654f0410065efbf2363033a5ce0efed Mon Sep 17 00:00:00 2001 From: Bart Van Assche Date: Fri, 23 Jul 2010 12:23:39 +0000 Subject: [PATCH] Made sure that ib_srp initiator cannot lock up even when its I/O depth > 1. git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1867 d57e44dd-8a1f-0410-8b47-8ef2f437770f --- srpt/src/ib_srpt.c | 125 ++++++++++++++++++++++++++++----------------- srpt/src/ib_srpt.h | 5 +- 2 files changed, 81 insertions(+), 49 deletions(-) diff --git a/srpt/src/ib_srpt.c b/srpt/src/ib_srpt.c index a11906e0c..5d3becdf2 100644 --- a/srpt/src/ib_srpt.c +++ b/srpt/src/ib_srpt.c @@ -1069,11 +1069,27 @@ out: static int srpt_req_lim_delta(struct srpt_rdma_ch *ch) { int req_lim; - int last_rsp_req_lim; + unsigned long flags; + int res; req_lim = atomic_read(&ch->req_lim); - last_rsp_req_lim = atomic_xchg(&ch->last_response_req_lim, req_lim); - return req_lim - last_rsp_req_lim; + spin_lock_irqsave(&ch->cred_lock, flags); + res = req_lim - ch->last_response_req_lim; + ch->last_response_req_lim = req_lim; + spin_unlock_irqrestore(&ch->cred_lock, flags); + return res; +} + +/** + * srpt_undo_req_lim_delta() - Undo the side effect of srpt_req_lim_delta(). + */ +static void srpt_undo_req_lim_delta(struct srpt_rdma_ch *ch, int delta) +{ + unsigned long flags; + + spin_lock_irqsave(&ch->cred_lock, flags); + ch->last_response_req_lim -= delta; + spin_unlock_irqrestore(&ch->cred_lock, flags); } /** @@ -1672,7 +1688,7 @@ err: send_rsp_res = srpt_post_send(ch, ioctx, len); if (send_rsp_res) { PRINT_ERROR("%s", "Sending SRP_RSP response failed."); - atomic_sub(req_lim_delta, &ch->last_response_req_lim); + srpt_undo_req_lim_delta(ch, req_lim_delta - 1); } } if (send_rsp_res) { @@ -1775,12 +1791,6 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) PRINT_ERROR("req_lim = %d < 0", req_lim); ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV]; srpt_handle_new_iu(ch, ioctx); - -#if defined(CONFIG_SCST_DEBUG) - if (interrupt_processing_delay_in_us - <= MAX_UDELAY_MS * 1000) - udelay(interrupt_processing_delay_in_us); -#endif } } while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); @@ -2496,7 +2506,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, cpu_to_be16(SRP_BUF_FORMAT_DIRECT | SRP_BUF_FORMAT_INDIRECT); rsp->req_lim_delta = cpu_to_be32(SRPT_RQ_SIZE); atomic_set(&ch->req_lim, SRPT_RQ_SIZE); - atomic_set(&ch->last_response_req_lim, SRPT_RQ_SIZE); + spin_lock_init(&ch->cred_lock); + ch->last_response_req_lim = SRPT_RQ_SIZE; /* create cm reply */ rep_param->qp_num = ch->qp->qp_num; @@ -3074,36 +3085,64 @@ out: /** * srpt_must_wait_for_cred() - Whether or not the target must wait with * sending a response towards the initiator in order to avoid that the - * initiator locks up. The Linux SRP initiator locks up when: - * initiator.req_lim < req_lim_min (req_lim_min equals 2 for SRP_CMD and - * equals 1 for SRP_TSK_MGMT), no new SRP_RSP will be received by the - * initiator, or none of the received SRP_RSP responses increases + * initiator locks up. The Linux SRP initiator locks up when + * initiator.req_lim <= req_lim_min (req_lim_min equals 1 for SRP_CMD and + * equals 0 for SRP_TSK_MGMT) and either no new SRP_RSP will be received by the + * initiator or none of the received SRP_RSP responses increases * initiator.req_lim. One possible strategy to avoid an initiator lockup is - * that the target does not send an SRP_RSP that makes initiator.req_lim < + * that the target does not send an SRP_RSP that makes initiator.req_lim <= * req_lim_min. While the target does not know the value of initiator.req_lim, * one can deduce from the credit mechanism specified in the SRP standard that - * when target.req_lim == req_lim_min - 1, initiator.req_lim must also equal - * req_lim_min - 1. Hence wait with sending a response when target.req_lim < + * when target.req_lim == req_lim_min, initiator.req_lim must also equal + * req_lim_min. Hence wait with sending a response when target.req_lim <= * req_lim_min if that response would not increase initiator.req_lim. The last - * condition is equivalent to srpt_req_lim_delta(ch) + 1 <= 0, which is also - * equivalent to ch->req_lim - ch->last_response_req_lim + 1 <= 0. + * condition is equivalent to srpt_req_lim_delta(ch) + 1 <= 0. * * See also: For more information about how to reproduce the initiator lockup, * see also http://bugzilla.kernel.org/show_bug.cgi?id=14235. + * + * Note: The constant 'compensate_for_incoming_requests' compensates for the + * fact that if the initiator uses an I/O depth that is larger than one, + * req_lim may have decreased towards req_lim_min after this function returned + * and before the initiator has received and processed the SRP_RSP whose + * REQUEST LIMIT DELTA field will have been set to *req_lim_delta. */ -static bool srpt_must_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min) +static bool srpt_must_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min, + int *req_lim_delta) { int req_lim; - req_lim = atomic_read(&ch->req_lim); + int delta; + unsigned long flags; + bool res; + enum { compensate_for_incoming_requests = 1 }; - return req_lim < req_lim_min - && req_lim - atomic_read(&ch->last_response_req_lim) + 1 <= 0; + EXTRACHECKS_BUG_ON(!req_lim_delta); + + res = true; + req_lim = atomic_read(&ch->req_lim); + if (req_lim > req_lim_min + compensate_for_incoming_requests) { + res = false; + *req_lim_delta = srpt_req_lim_delta(ch); + } else { + spin_lock_irqsave(&ch->cred_lock, flags); + delta = req_lim - ch->last_response_req_lim; + if (delta + 1 > 0) { + res = false; + ch->last_response_req_lim = req_lim; + *req_lim_delta = delta; + } + spin_unlock_irqrestore(&ch->cred_lock, flags); + } + return res; } -static void srpt_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min) +static int srpt_wait_for_cred(struct srpt_rdma_ch *ch, int req_lim_min) { - while (unlikely(srpt_must_wait_for_cred(ch, req_lim_min))) + int delta; + + while (unlikely(srpt_must_wait_for_cred(ch, req_lim_min, &delta))) schedule(); + return delta; } /** @@ -3134,12 +3173,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd) goto out; } - if (scst_cmd_atomic(scmnd) && srpt_must_wait_for_cred(ch, 2)) { - ret = SCST_TGT_RES_NEED_THREAD_CTX; - goto out; - } - - srpt_wait_for_cred(ch, 2); + EXTRACHECKS_BUG_ON(scst_cmd_atomic(scmnd)); new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_CMD_RSP_SENT); WARN_ON(new_state == SRPT_STATE_DONE); @@ -3148,13 +3182,6 @@ static int srpt_xmit_response(struct scst_cmd *scmnd) ret = SCST_TGT_RES_SUCCESS; - req_lim_delta = srpt_req_lim_delta(ch) + 1; - resp_len = srpt_build_cmd_rsp(ch, ioctx, req_lim_delta, - scst_cmd_get_tag(scmnd), - scst_cmd_get_status(scmnd), - scst_cmd_get_sense_buffer(scmnd), - scst_cmd_get_sense_buffer_len(scmnd)); - /* For read commands, transfer the data to the initiator. */ if (dir == SCST_DATA_READ && scst_cmd_get_resp_data_len(scmnd)) { ret = srpt_xfer_data(ch, ioctx, scmnd); @@ -3167,6 +3194,14 @@ static int srpt_xmit_response(struct scst_cmd *scmnd) } } + req_lim_delta = srpt_wait_for_cred(ch, 1) + 1; + + resp_len = srpt_build_cmd_rsp(ch, ioctx, req_lim_delta, + scst_cmd_get_tag(scmnd), + scst_cmd_get_status(scmnd), + scst_cmd_get_sense_buffer(scmnd), + scst_cmd_get_sense_buffer_len(scmnd)); + if (srpt_post_send(ch, ioctx, resp_len)) { srpt_unmap_sg_to_ib_sge(ch, ioctx); srpt_set_cmd_state(ioctx, SRPT_STATE_DONE); @@ -3174,7 +3209,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd) PRINT_ERROR("%s[%d]: ch->state= %d tag= %lld", __func__, __LINE__, atomic_read(&ch->state), (unsigned long long)scst_cmd_get_tag(scmnd)); - atomic_sub(req_lim_delta, &ch->last_response_req_lim); + srpt_undo_req_lim_delta(ch, req_lim_delta - 1); ret = SCST_TGT_RES_QUEUE_FULL; } @@ -3212,12 +3247,11 @@ static void srpt_tsk_mgmt_done(struct scst_mgmt_cmd *mcmnd) WARN_ON(in_irq()); - srpt_wait_for_cred(ch, 1); - new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_MGMT_RSP_SENT); WARN_ON(new_state == SRPT_STATE_DONE); - req_lim_delta = srpt_req_lim_delta(ch) + 1; + req_lim_delta = srpt_wait_for_cred(ch, 0) + 1; + rsp_len = srpt_build_tskmgmt_rsp(ch, ioctx, req_lim_delta, (scst_mgmt_cmd_get_status(mcmnd) == SCST_MGMT_STATUS_SUCCESS) ? @@ -3232,7 +3266,7 @@ static void srpt_tsk_mgmt_done(struct scst_mgmt_cmd *mcmnd) */ if (srpt_post_send(ch, ioctx, rsp_len)) { PRINT_ERROR("%s", "Sending SRP_RSP response failed."); - atomic_sub(req_lim_delta, &ch->last_response_req_lim); + srpt_undo_req_lim_delta(ch, req_lim_delta - 1); } scst_mgmt_cmd_set_tgt_priv(mcmnd, NULL); @@ -3855,20 +3889,17 @@ static int __init srpt_init_module(void) /* IRQ context */ srpt_context = SCST_CONTEXT_TASKLET; srpt_template.rdy_to_xfer_atomic = true; - srpt_template.xmit_response_atomic = true; break; case MODE_SINGLE_THREADED: /* single kernel thread (created by ib_srpt) */ srpt_context = SCST_CONTEXT_DIRECT; srpt_template.rdy_to_xfer_atomic = false; - srpt_template.xmit_response_atomic = false; break; case MODE_MULTITHREADED: default: /* multiple kernel threads (created by the SCST core) */ srpt_context = SCST_CONTEXT_THREAD; srpt_template.rdy_to_xfer_atomic = false; - srpt_template.xmit_response_atomic = false; break; } diff --git a/srpt/src/ib_srpt.h b/srpt/src/ib_srpt.h index 76ffe1467..0fa1d6c72 100644 --- a/srpt/src/ib_srpt.h +++ b/srpt/src/ib_srpt.h @@ -38,7 +38,6 @@ #include #include #include -#include #include #include @@ -227,6 +226,7 @@ enum rdma_ch_state { * @req_lim: request limit: maximum number of requests that may be sent * by the initiator without having received a response or * SRP_CRED_REQ. + * @cred_lock: protects last_response_req_lim. * @last_response_req_lim: value of req_lim the last time a response or * SRP_CRED_REQ was sent. * @state: channel state. See also enum rdma_ch_state. @@ -249,7 +249,8 @@ struct srpt_rdma_ch { u8 t_port_id[16]; int max_ti_iu_len; atomic_t req_lim; - atomic_t last_response_req_lim; + spinlock_t cred_lock; + int32_t last_response_req_lim; atomic_t state; struct list_head list; struct list_head cmd_wait_list;