Fixed a race condition that could cause SRP commands to be processed out of

order on a multicore system. Also, split send and receive completion queues
such that the amount of code that is executed under spinlock and with IRQs
disabled is minimal.


git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1801 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-07-04 19:44:33 +00:00
parent 4438a65592
commit 936f080310
2 changed files with 139 additions and 59 deletions

View File

@@ -1643,9 +1643,75 @@ err:
}
/**
* srpt_completion() - InfiniBand completion queue callback function.
* srpt_rcv_completion() - IB receive completion queue callback function.
*
* Note:
* The purpose of the recv_lock spinlock is to ensure that on a multiprocessor
* SRP commands are processed in the same order as they were received over the
* RDMA channel. Without that spinlock the following race can occur, with the
* result that the SCST core processes two received SRP_CMD information units
* in the wrong order:
* CPU 1 CPU 2
* ----- -----
* - SRP_CMD reception triggers an
* IB interrupt
* - srpt_rcv_completion() is invoked
* - ib_req_notify_cq() reenables the
* IB interrupt
* - SRP_CMD reception triggers an
* IB interrupt
* - srpt_rcv_completion() is invoked
* - ib_req_notify_cq() reenables the
* IB interrupt
* - srpt_handle_cmd() invokes
* scst_rx_cmd() and scst_cmd_init_done()
* - srpt_handle_cmd() invokes
* scst_rx_cmd() and scst_cmd_init_done()
*/
static void srpt_completion(struct ib_cq *cq, void *ctx)
static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
enum scst_exec_context context;
unsigned long flags;
context = thread ? SCST_CONTEXT_THREAD : SCST_CONTEXT_TASKLET;
spin_lock_irqsave(&ch->recv_lock, flags);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
int req_lim;
if (unlikely(wc.status)) {
spin_unlock_irqrestore(&ch->recv_lock, flags);
PRINT_INFO("receiving wr_id %u failed with status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
srpt_handle_err_comp(ch, &wc, context);
spin_lock_irqsave(&ch->recv_lock, flags);
continue;
}
req_lim = atomic_dec_return(&ch->req_lim);
if (unlikely(req_lim < 0))
PRINT_ERROR("req_lim = %d < 0", req_lim);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
srpt_handle_new_iu(ch, ioctx);
#if defined(CONFIG_SCST_DEBUG)
if (interrupt_processing_delay_in_us <= MAX_UDELAY_MS * 1000)
udelay(interrupt_processing_delay_in_us);
#endif
}
spin_unlock_irqrestore(&ch->recv_lock, flags);
}
/**
* srpt_send_completion() - IB send completion queue callback function.
*/
static void srpt_send_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
@@ -1655,41 +1721,29 @@ static void srpt_completion(struct ib_cq *cq, void *ctx)
context = thread ? SCST_CONTEXT_THREAD : SCST_CONTEXT_TASKLET;
ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->cq, 1, &wc) > 0) {
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->scq, 1, &wc) > 0) {
if (unlikely(wc.status)) {
PRINT_INFO("%s for wr_id %u failed with status %d",
wc.wr_id & SRPT_OP_RECV
? "receiving"
: "sending response",
PRINT_INFO("sending response for wr_id %u failed with"
" status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
srpt_handle_err_comp(ch, &wc, context);
continue;
}
if (wc.wr_id & SRPT_OP_RECV) {
int req_lim;
req_lim = atomic_dec_return(&ch->req_lim);
if (unlikely(req_lim < 0))
PRINT_ERROR("req_lim = %d < 0", req_lim);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
srpt_handle_new_iu(ch, ioctx);
ioctx = sdev->ioctx_ring[wc.wr_id];
if (wc.opcode == IB_WC_SEND) {
atomic_inc(&ch->qp_wr_avail);
srpt_handle_send_comp(ch, ioctx, context);
} else {
ioctx = sdev->ioctx_ring[wc.wr_id];
if (wc.opcode == IB_WC_SEND) {
atomic_inc(&ch->qp_wr_avail);
srpt_handle_send_comp(ch, ioctx, context);
} else {
#if defined(CONFIG_SCST_DEBUG)
WARN_ON(wc.opcode != IB_WC_RDMA_READ);
WARN_ON(ioctx->n_rdma <= 0);
WARN_ON(wc.opcode != IB_WC_RDMA_READ);
WARN_ON(ioctx->n_rdma <= 0);
#endif
atomic_add(ioctx->n_rdma,
&ch->qp_wr_avail);
srpt_handle_rdma_comp(ch, ioctx, context);
}
atomic_add(ioctx->n_rdma,
&ch->qp_wr_avail);
srpt_handle_rdma_comp(ch, ioctx, context);
}
#if defined(CONFIG_SCST_DEBUG)
@@ -1700,44 +1754,60 @@ static void srpt_completion(struct ib_cq *cq, void *ctx)
}
/**
* srpt_create_ch_ib() - Create a completion queue on the specified device.
* srpt_create_ch_ib() - Create receive and send completion queues.
*/
static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
{
struct ib_qp_init_attr *qp_init;
struct srpt_device *sdev = ch->sport->sdev;
int cqe;
int ret;
ret = -ENOMEM;
qp_init = kzalloc(sizeof *qp_init, GFP_KERNEL);
if (!qp_init)
return -ENOMEM;
/* Create a completion queue (CQ). */
cqe = SRPT_RQ_SIZE + SRPT_SQ_SIZE - 1;
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) && ! defined(RHEL_RELEASE_CODE)
ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch, cqe);
#else
ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch, cqe, 0);
#endif
if (IS_ERR(ch->cq)) {
ret = PTR_ERR(ch->cq);
PRINT_ERROR("failed to create_cq cqe= %d ret= %d", cqe, ret);
goto out;
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
SRPT_RQ_SIZE);
#else
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
SRPT_RQ_SIZE, 0);
#endif
if (IS_ERR(ch->rcq)) {
ret = PTR_ERR(ch->rcq);
PRINT_ERROR("failed to create CQ cqe= %d ret= %d",
SRPT_RQ_SIZE, ret);
goto out;
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
SRPT_SQ_SIZE);
#else
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
SRPT_SQ_SIZE, 0);
#endif
if (IS_ERR(ch->scq)) {
ret = PTR_ERR(ch->scq);
PRINT_ERROR("failed to create CQ cqe= %d ret= %d",
SRPT_SQ_SIZE, ret);
goto out_destroy_rcq;
}
/* Request completion notification. */
ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
/* Create a queue pair (QP). */
qp_init->qp_context = (void *)ch;
qp_init->event_handler
= (void(*)(struct ib_event *, void*))srpt_qp_event;
qp_init->send_cq = ch->cq;
qp_init->recv_cq = ch->cq;
qp_init->send_cq = ch->scq;
qp_init->recv_cq = ch->rcq;
qp_init->srq = sdev->srq;
qp_init->sq_sig_type = IB_SIGNAL_REQ_WR;
qp_init->qp_type = IB_QPT_RC;
@@ -1747,29 +1817,33 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
ch->qp = ib_create_qp(sdev->pd, qp_init);
if (IS_ERR(ch->qp)) {
ret = PTR_ERR(ch->qp);
ib_destroy_cq(ch->cq);
PRINT_ERROR("failed to create_qp ret= %d", ret);
goto out;
goto out_destroy_scq;
}
atomic_set(&ch->qp_wr_avail, qp_init->cap.max_send_wr);
TRACE_DBG("%s: max_cqe= %d max_sge= %d cm_id= %p",
__func__, ch->cq->cqe, qp_init->cap.max_send_sge,
TRACE_DBG("%s: max_cqe= %d r max_sge= %d s max_sge= %d cm_id= %p",
__func__, ch->rcq->cqe, ch->scq->cqe, qp_init->cap.max_send_sge,
ch->cm_id);
/* Modify the attributes and the state of queue pair ch->qp. */
ret = srpt_init_ch_qp(ch, ch->qp);
if (ret) {
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->cq);
goto out;
}
if (ret)
goto out_destroy_qp;
out:
kfree(qp_init);
return ret;
out_destroy_qp:
ib_destroy_qp(ch->qp);
out_destroy_scq:
ib_destroy_cq(ch->scq);
out_destroy_rcq:
ib_destroy_cq(ch->rcq);
goto out;
}
/**
@@ -1882,7 +1956,8 @@ static void srpt_release_channel(struct scst_session *scst_sess)
ib_destroy_cm_id(ch->cm_id);
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->cq);
ib_destroy_cq(ch->scq);
ib_destroy_cq(ch->rcq);
kfree(ch);
TRACE_EXIT();
@@ -2088,6 +2163,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING);
INIT_LIST_HEAD(&ch->cmd_wait_list);
spin_lock_init(&ch->recv_lock);
ret = srpt_create_ch_ib(ch);
if (ret) {
rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
@@ -2187,7 +2264,8 @@ release_channel:
destroy_ib:
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->cq);
ib_destroy_cq(ch->scq);
ib_destroy_cq(ch->rcq);
free_ch:
kfree(ch);

View File

@@ -212,8 +212,8 @@ enum rdma_ch_state {
* @cm_id: IB CM ID associated with the channel.
* @qp: IB queue pair used for communicating over this channel.
* @qp_wr_avail: number of WRs in qp that are not being processed by the HCA.
* @cq: completion queue for send and receive operations over this
* channel.
* @rcq: completion queue for receive operations over this channel.
* @scq: completion queue for send operations over this channel.
* @sport: pointer to the information of the HCA port used by this
* channel.
* @i_port_id: 128-bit initiator port identifier copied from SRP_LOGIN_REQ.
@@ -236,7 +236,9 @@ struct srpt_rdma_ch {
struct ib_cm_id *cm_id;
struct ib_qp *qp;
atomic_t qp_wr_avail;
struct ib_cq *cq;
spinlock_t recv_lock;
struct ib_cq *rcq;
struct ib_cq *scq;
struct srpt_port *sport;
u8 i_port_id[16];
u8 t_port_id[16];