diff --git a/srpt/src/ib_srpt.c b/srpt/src/ib_srpt.c index 73d786ba8..01cdd9680 100644 --- a/srpt/src/ib_srpt.c +++ b/srpt/src/ib_srpt.c @@ -1717,28 +1717,13 @@ err: /** * srpt_rcv_completion() - IB receive completion queue callback function. * - * Note: - * The purpose of the recv_lock spinlock is to ensure that on a multiprocessor - * SRP commands are processed in the same order as they were received over the - * RDMA channel. Without that spinlock the following race can occur, with the - * result that the SCST core processes two received SRP_CMD information units - * in the wrong order: - * CPU 1 CPU 2 - * ----- ----- - * - SRP_CMD reception triggers an - * IB interrupt - * - srpt_rcv_completion() is invoked - * - ib_req_notify_cq() reenables the - * IB interrupt - * - SRP_CMD reception triggers an - * IB interrupt - * - srpt_rcv_completion() is invoked - * - ib_req_notify_cq() reenables the - * IB interrupt - * - srpt_handle_cmd() invokes - * scst_rx_cmd() and scst_cmd_init_done() - * - srpt_handle_cmd() invokes - * scst_rx_cmd() and scst_cmd_init_done() + * Notes: + * - It is guaranteed that a completion handler will never be invoked + * concurrently on two different CPUs for the same completion queue. See also + * Documentation/infiniband/core_locking.txt and the implementation of + * handle_edge_irq() in kernel/irq/chip.c. + * - When threaded IRQs are enabled, completion handlers are invoked in thread + * context instead of interrupt context. */ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) { @@ -1746,25 +1731,19 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) struct srpt_device *sdev = ch->sport->sdev; struct ib_wc wc; struct srpt_ioctx *ioctx; - unsigned long flags; EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED); - if (atomic_read(&ch->state) == RDMA_CHANNEL_DISCONNECTING) - return; - - spin_lock_irqsave(&ch->recv_lock, flags); + atomic_inc(&ch->processing_recv_compl); do { while (ib_poll_cq(ch->rcq, 1, &wc) > 0) { int req_lim; if (unlikely(wc.status)) { - spin_unlock_irqrestore(&ch->recv_lock, flags); PRINT_INFO("receiving wr_id %u failed with" " status %d", (unsigned)(wc.wr_id & ~SRPT_OP_RECV), wc.status); - spin_lock_irqsave(&ch->recv_lock, flags); continue; } @@ -1776,7 +1755,7 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) } } while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); - spin_unlock_irqrestore(&ch->recv_lock, flags); + atomic_dec(&ch->processing_recv_compl); } /** @@ -1854,14 +1833,10 @@ static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx) struct srpt_device *sdev = ch->sport->sdev; struct ib_wc wc; struct srpt_ioctx *ioctx; - unsigned long flags; EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); - if (atomic_read(&ch->state) == RDMA_CHANNEL_DISCONNECTING) - return; - - spin_lock_irqsave(&ch->recv_lock, flags); + atomic_inc(&ch->processing_recv_compl); ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP); while (ib_poll_cq(ch->rcq, 1, &wc) > 0) { ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV]; @@ -1871,7 +1846,7 @@ static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx) ioctx->opcode = wc.opcode; srpt_schedule_thread(ioctx); } - spin_unlock_irqrestore(&ch->recv_lock, flags); + atomic_dec(&ch->processing_recv_compl); } /** @@ -2133,18 +2108,12 @@ static void srpt_unregister_channel(struct srpt_rdma_ch *ch) __releases(&ch->sport->sdev->spinlock) { struct srpt_device *sdev; - unsigned long flags; sdev = ch->sport->sdev; list_del(&ch->list); atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING); spin_unlock_irq(&sdev->spinlock); - TRACE_DBG("%s", "disabling receive notifications"); - spin_lock_irqsave(&ch->recv_lock, flags); - ib_req_notify_cq(ch->rcq, 0); - spin_unlock_irqrestore(&ch->recv_lock, flags); - if (thread == MODE_SINGLE_THREADED) flush_scheduled_work(); @@ -2237,8 +2206,9 @@ static struct srpt_rdma_ch *srpt_find_channel(struct srpt_device *sdev, */ static void srpt_release_channel(struct scst_session *scst_sess) { + struct ib_qp_attr qp_attr; struct srpt_rdma_ch *ch; - struct ib_wc wc; + int ret; TRACE_ENTRY(); @@ -2250,19 +2220,21 @@ static void srpt_release_channel(struct scst_session *scst_sess) BUG_ON(!ch->cm_id); ib_destroy_cm_id(ch->cm_id); + qp_attr.qp_state = IB_QPS_RESET; + ret = ib_modify_qp(ch->qp, &qp_attr, IB_QP_STATE); + if (ret < 0) + PRINT_ERROR("Resetting queue pair state failed: %d", ret); + + while (atomic_read(&ch->processing_recv_compl)) + ; + /* * At this point it is guaranteed that no new completions will be * received and also that no completion handler is still accessing ch. */ ib_destroy_qp(ch->qp); - - while (ib_poll_cq(ch->scq, 1, &wc) > 0) - ; ib_destroy_cq(ch->scq); - - while (ib_poll_cq(ch->rcq, 1, &wc) > 0) - ; ib_destroy_cq(ch->rcq); kfree(ch); @@ -2474,11 +2446,10 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, ch->sport = &sdev->port[param->port - 1]; ch->cm_id = cm_id; ch->rq_size = SRPT_RQ_SIZE; + atomic_set(&ch->processing_recv_compl, 0); atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING); INIT_LIST_HEAD(&ch->cmd_wait_list); - spin_lock_init(&ch->recv_lock); - ret = srpt_create_ch_ib(ch); if (ret) { rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES); diff --git a/srpt/src/ib_srpt.h b/srpt/src/ib_srpt.h index 0f22f1d60..b86d80d5b 100644 --- a/srpt/src/ib_srpt.h +++ b/srpt/src/ib_srpt.h @@ -216,6 +216,8 @@ enum rdma_ch_state { * struct srpt_rdma_ch - RDMA channel. * @cm_id: IB CM ID associated with the channel. * @rq_size: IB receive queue size. + * @processing_recv_compl: whether or not a receive completion is being + * processed. * @qp: IB queue pair used for communicating over this channel. * @sq_wr_avail: number of work requests available in the send queue. * @rcq: completion queue for receive operations over this channel. @@ -245,9 +247,9 @@ struct srpt_rdma_ch { struct ib_cm_id *cm_id; struct ib_qp *qp; int rq_size; - atomic_t sq_wr_avail; - spinlock_t recv_lock; + atomic_t processing_recv_compl; struct ib_cq *rcq; + atomic_t sq_wr_avail; struct ib_cq *scq; struct srpt_port *sport; u8 i_port_id[16];