From 936f0803107bc1199862e6d859c83d1bdfb369c3 Mon Sep 17 00:00:00 2001 From: Bart Van Assche Date: Sun, 4 Jul 2010 19:44:33 +0000 Subject: [PATCH] Fixed a race condition that could cause SRP commands to be processed out of order on a multicore system. Also, split send and receive completion queues such that the amount of code that is executed under spinlock and with IRQs disabled is minimal. git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1801 d57e44dd-8a1f-0410-8b47-8ef2f437770f --- srpt/src/ib_srpt.c | 190 ++++++++++++++++++++++++++++++++------------- srpt/src/ib_srpt.h | 8 +- 2 files changed, 139 insertions(+), 59 deletions(-) diff --git a/srpt/src/ib_srpt.c b/srpt/src/ib_srpt.c index fda01383e..b01fb166f 100644 --- a/srpt/src/ib_srpt.c +++ b/srpt/src/ib_srpt.c @@ -1643,9 +1643,75 @@ err: } /** - * srpt_completion() - InfiniBand completion queue callback function. + * srpt_rcv_completion() - IB receive completion queue callback function. + * + * Note: + * The purpose of the recv_lock spinlock is to ensure that on a multiprocessor + * SRP commands are processed in the same order as they were received over the + * RDMA channel. Without that spinlock the following race can occur, with the + * result that the SCST core processes two received SRP_CMD information units + * in the wrong order: + * CPU 1 CPU 2 + * ----- ----- + * - SRP_CMD reception triggers an + * IB interrupt + * - srpt_rcv_completion() is invoked + * - ib_req_notify_cq() reenables the + * IB interrupt + * - SRP_CMD reception triggers an + * IB interrupt + * - srpt_rcv_completion() is invoked + * - ib_req_notify_cq() reenables the + * IB interrupt + * - srpt_handle_cmd() invokes + * scst_rx_cmd() and scst_cmd_init_done() + * - srpt_handle_cmd() invokes + * scst_rx_cmd() and scst_cmd_init_done() */ -static void srpt_completion(struct ib_cq *cq, void *ctx) +static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) +{ + struct srpt_rdma_ch *ch = ctx; + struct srpt_device *sdev = ch->sport->sdev; + struct ib_wc wc; + struct srpt_ioctx *ioctx; + enum scst_exec_context context; + unsigned long flags; + + context = thread ? SCST_CONTEXT_THREAD : SCST_CONTEXT_TASKLET; + + spin_lock_irqsave(&ch->recv_lock, flags); + ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP); + while (ib_poll_cq(ch->rcq, 1, &wc) > 0) { + int req_lim; + + if (unlikely(wc.status)) { + spin_unlock_irqrestore(&ch->recv_lock, flags); + PRINT_INFO("receiving wr_id %u failed with status %d", + (unsigned)(wc.wr_id & ~SRPT_OP_RECV), + wc.status); + srpt_handle_err_comp(ch, &wc, context); + spin_lock_irqsave(&ch->recv_lock, flags); + continue; + } + + req_lim = atomic_dec_return(&ch->req_lim); + if (unlikely(req_lim < 0)) + PRINT_ERROR("req_lim = %d < 0", req_lim); + ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV]; + srpt_handle_new_iu(ch, ioctx); + +#if defined(CONFIG_SCST_DEBUG) + if (interrupt_processing_delay_in_us <= MAX_UDELAY_MS * 1000) + udelay(interrupt_processing_delay_in_us); +#endif + } + spin_unlock_irqrestore(&ch->recv_lock, flags); +} + +/** + * srpt_send_completion() - IB send completion queue callback function. + */ +static void srpt_send_completion(struct ib_cq *cq, void *ctx) { struct srpt_rdma_ch *ch = ctx; struct srpt_device *sdev = ch->sport->sdev; @@ -1655,41 +1721,29 @@ static void srpt_completion(struct ib_cq *cq, void *ctx) context = thread ? SCST_CONTEXT_THREAD : SCST_CONTEXT_TASKLET; - ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP); - while (ib_poll_cq(ch->cq, 1, &wc) > 0) { + ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP); + while (ib_poll_cq(ch->scq, 1, &wc) > 0) { if (unlikely(wc.status)) { - PRINT_INFO("%s for wr_id %u failed with status %d", - wc.wr_id & SRPT_OP_RECV - ? "receiving" - : "sending response", + PRINT_INFO("sending response for wr_id %u failed with" + " status %d", (unsigned)(wc.wr_id & ~SRPT_OP_RECV), wc.status); srpt_handle_err_comp(ch, &wc, context); continue; } - if (wc.wr_id & SRPT_OP_RECV) { - int req_lim; - - req_lim = atomic_dec_return(&ch->req_lim); - if (unlikely(req_lim < 0)) - PRINT_ERROR("req_lim = %d < 0", req_lim); - ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV]; - srpt_handle_new_iu(ch, ioctx); + ioctx = sdev->ioctx_ring[wc.wr_id]; + if (wc.opcode == IB_WC_SEND) { + atomic_inc(&ch->qp_wr_avail); + srpt_handle_send_comp(ch, ioctx, context); } else { - ioctx = sdev->ioctx_ring[wc.wr_id]; - if (wc.opcode == IB_WC_SEND) { - atomic_inc(&ch->qp_wr_avail); - srpt_handle_send_comp(ch, ioctx, context); - } else { #if defined(CONFIG_SCST_DEBUG) - WARN_ON(wc.opcode != IB_WC_RDMA_READ); - WARN_ON(ioctx->n_rdma <= 0); + WARN_ON(wc.opcode != IB_WC_RDMA_READ); + WARN_ON(ioctx->n_rdma <= 0); #endif - atomic_add(ioctx->n_rdma, - &ch->qp_wr_avail); - srpt_handle_rdma_comp(ch, ioctx, context); - } + atomic_add(ioctx->n_rdma, + &ch->qp_wr_avail); + srpt_handle_rdma_comp(ch, ioctx, context); } #if defined(CONFIG_SCST_DEBUG) @@ -1700,44 +1754,60 @@ static void srpt_completion(struct ib_cq *cq, void *ctx) } /** - * srpt_create_ch_ib() - Create a completion queue on the specified device. + * srpt_create_ch_ib() - Create receive and send completion queues. */ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch) { struct ib_qp_init_attr *qp_init; struct srpt_device *sdev = ch->sport->sdev; - int cqe; int ret; + ret = -ENOMEM; qp_init = kzalloc(sizeof *qp_init, GFP_KERNEL); if (!qp_init) - return -ENOMEM; - - /* Create a completion queue (CQ). */ - - cqe = SRPT_RQ_SIZE + SRPT_SQ_SIZE - 1; -#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) && ! defined(RHEL_RELEASE_CODE) - ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch, cqe); -#else - ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch, cqe, 0); -#endif - if (IS_ERR(ch->cq)) { - ret = PTR_ERR(ch->cq); - PRINT_ERROR("failed to create_cq cqe= %d ret= %d", cqe, ret); goto out; + +#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \ + && ! defined(RHEL_RELEASE_CODE) + ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch, + SRPT_RQ_SIZE); +#else + ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch, + SRPT_RQ_SIZE, 0); +#endif + if (IS_ERR(ch->rcq)) { + ret = PTR_ERR(ch->rcq); + PRINT_ERROR("failed to create CQ cqe= %d ret= %d", + SRPT_RQ_SIZE, ret); + goto out; + } +#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \ + && ! defined(RHEL_RELEASE_CODE) + ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch, + SRPT_SQ_SIZE); +#else + ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch, + SRPT_SQ_SIZE, 0); +#endif + if (IS_ERR(ch->scq)) { + ret = PTR_ERR(ch->scq); + PRINT_ERROR("failed to create CQ cqe= %d ret= %d", + SRPT_SQ_SIZE, ret); + goto out_destroy_rcq; } /* Request completion notification. */ - ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP); + ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP); + ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP); /* Create a queue pair (QP). */ qp_init->qp_context = (void *)ch; qp_init->event_handler = (void(*)(struct ib_event *, void*))srpt_qp_event; - qp_init->send_cq = ch->cq; - qp_init->recv_cq = ch->cq; + qp_init->send_cq = ch->scq; + qp_init->recv_cq = ch->rcq; qp_init->srq = sdev->srq; qp_init->sq_sig_type = IB_SIGNAL_REQ_WR; qp_init->qp_type = IB_QPT_RC; @@ -1747,29 +1817,33 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch) ch->qp = ib_create_qp(sdev->pd, qp_init); if (IS_ERR(ch->qp)) { ret = PTR_ERR(ch->qp); - ib_destroy_cq(ch->cq); PRINT_ERROR("failed to create_qp ret= %d", ret); - goto out; + goto out_destroy_scq; } atomic_set(&ch->qp_wr_avail, qp_init->cap.max_send_wr); - TRACE_DBG("%s: max_cqe= %d max_sge= %d cm_id= %p", - __func__, ch->cq->cqe, qp_init->cap.max_send_sge, + TRACE_DBG("%s: max_cqe= %d r max_sge= %d s max_sge= %d cm_id= %p", + __func__, ch->rcq->cqe, ch->scq->cqe, qp_init->cap.max_send_sge, ch->cm_id); /* Modify the attributes and the state of queue pair ch->qp. */ ret = srpt_init_ch_qp(ch, ch->qp); - if (ret) { - ib_destroy_qp(ch->qp); - ib_destroy_cq(ch->cq); - goto out; - } + if (ret) + goto out_destroy_qp; out: kfree(qp_init); return ret; + +out_destroy_qp: + ib_destroy_qp(ch->qp); +out_destroy_scq: + ib_destroy_cq(ch->scq); +out_destroy_rcq: + ib_destroy_cq(ch->rcq); + goto out; } /** @@ -1882,7 +1956,8 @@ static void srpt_release_channel(struct scst_session *scst_sess) ib_destroy_cm_id(ch->cm_id); ib_destroy_qp(ch->qp); - ib_destroy_cq(ch->cq); + ib_destroy_cq(ch->scq); + ib_destroy_cq(ch->rcq); kfree(ch); TRACE_EXIT(); @@ -2088,6 +2163,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING); INIT_LIST_HEAD(&ch->cmd_wait_list); + spin_lock_init(&ch->recv_lock); + ret = srpt_create_ch_ib(ch); if (ret) { rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES); @@ -2187,7 +2264,8 @@ release_channel: destroy_ib: ib_destroy_qp(ch->qp); - ib_destroy_cq(ch->cq); + ib_destroy_cq(ch->scq); + ib_destroy_cq(ch->rcq); free_ch: kfree(ch); diff --git a/srpt/src/ib_srpt.h b/srpt/src/ib_srpt.h index 302944d08..262250a47 100644 --- a/srpt/src/ib_srpt.h +++ b/srpt/src/ib_srpt.h @@ -212,8 +212,8 @@ enum rdma_ch_state { * @cm_id: IB CM ID associated with the channel. * @qp: IB queue pair used for communicating over this channel. * @qp_wr_avail: number of WRs in qp that are not being processed by the HCA. - * @cq: completion queue for send and receive operations over this - * channel. + * @rcq: completion queue for receive operations over this channel. + * @scq: completion queue for send operations over this channel. * @sport: pointer to the information of the HCA port used by this * channel. * @i_port_id: 128-bit initiator port identifier copied from SRP_LOGIN_REQ. @@ -236,7 +236,9 @@ struct srpt_rdma_ch { struct ib_cm_id *cm_id; struct ib_qp *qp; atomic_t qp_wr_avail; - struct ib_cq *cq; + spinlock_t recv_lock; + struct ib_cq *rcq; + struct ib_cq *scq; struct srpt_port *sport; u8 i_port_id[16]; u8 t_port_id[16];