Micro-optimization: replaced a spinlock by an atomic variable.

git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1882 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-07-28 10:15:33 +00:00
parent ecaabfc08c
commit f299b1d2b3
2 changed files with 26 additions and 53 deletions

View File

@@ -1717,28 +1717,13 @@ err:
/**
* srpt_rcv_completion() - IB receive completion queue callback function.
*
* Note:
* The purpose of the recv_lock spinlock is to ensure that on a multiprocessor
* SRP commands are processed in the same order as they were received over the
* RDMA channel. Without that spinlock the following race can occur, with the
* result that the SCST core processes two received SRP_CMD information units
* in the wrong order:
* CPU 1 CPU 2
* ----- -----
* - SRP_CMD reception triggers an
* IB interrupt
* - srpt_rcv_completion() is invoked
* - ib_req_notify_cq() reenables the
* IB interrupt
* - SRP_CMD reception triggers an
* IB interrupt
* - srpt_rcv_completion() is invoked
* - ib_req_notify_cq() reenables the
* IB interrupt
* - srpt_handle_cmd() invokes
* scst_rx_cmd() and scst_cmd_init_done()
* - srpt_handle_cmd() invokes
* scst_rx_cmd() and scst_cmd_init_done()
* Notes:
* - It is guaranteed that a completion handler will never be invoked
* concurrently on two different CPUs for the same completion queue. See also
* Documentation/infiniband/core_locking.txt and the implementation of
* handle_edge_irq() in kernel/irq/chip.c.
* - When threaded IRQs are enabled, completion handlers are invoked in thread
* context instead of interrupt context.
*/
static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
{
@@ -1746,25 +1731,19 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
unsigned long flags;
EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED);
if (atomic_read(&ch->state) == RDMA_CHANNEL_DISCONNECTING)
return;
spin_lock_irqsave(&ch->recv_lock, flags);
atomic_inc(&ch->processing_recv_compl);
do {
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
int req_lim;
if (unlikely(wc.status)) {
spin_unlock_irqrestore(&ch->recv_lock, flags);
PRINT_INFO("receiving wr_id %u failed with"
" status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
spin_lock_irqsave(&ch->recv_lock, flags);
continue;
}
@@ -1776,7 +1755,7 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
}
} while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS) > 0);
spin_unlock_irqrestore(&ch->recv_lock, flags);
atomic_dec(&ch->processing_recv_compl);
}
/**
@@ -1854,14 +1833,10 @@ static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx)
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
unsigned long flags;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
if (atomic_read(&ch->state) == RDMA_CHANNEL_DISCONNECTING)
return;
spin_lock_irqsave(&ch->recv_lock, flags);
atomic_inc(&ch->processing_recv_compl);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
@@ -1871,7 +1846,7 @@ static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx)
ioctx->opcode = wc.opcode;
srpt_schedule_thread(ioctx);
}
spin_unlock_irqrestore(&ch->recv_lock, flags);
atomic_dec(&ch->processing_recv_compl);
}
/**
@@ -2133,18 +2108,12 @@ static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
__releases(&ch->sport->sdev->spinlock)
{
struct srpt_device *sdev;
unsigned long flags;
sdev = ch->sport->sdev;
list_del(&ch->list);
atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING);
spin_unlock_irq(&sdev->spinlock);
TRACE_DBG("%s", "disabling receive notifications");
spin_lock_irqsave(&ch->recv_lock, flags);
ib_req_notify_cq(ch->rcq, 0);
spin_unlock_irqrestore(&ch->recv_lock, flags);
if (thread == MODE_SINGLE_THREADED)
flush_scheduled_work();
@@ -2237,8 +2206,9 @@ static struct srpt_rdma_ch *srpt_find_channel(struct srpt_device *sdev,
*/
static void srpt_release_channel(struct scst_session *scst_sess)
{
struct ib_qp_attr qp_attr;
struct srpt_rdma_ch *ch;
struct ib_wc wc;
int ret;
TRACE_ENTRY();
@@ -2250,19 +2220,21 @@ static void srpt_release_channel(struct scst_session *scst_sess)
BUG_ON(!ch->cm_id);
ib_destroy_cm_id(ch->cm_id);
qp_attr.qp_state = IB_QPS_RESET;
ret = ib_modify_qp(ch->qp, &qp_attr, IB_QP_STATE);
if (ret < 0)
PRINT_ERROR("Resetting queue pair state failed: %d", ret);
while (atomic_read(&ch->processing_recv_compl))
;
/*
* At this point it is guaranteed that no new completions will be
* received and also that no completion handler is still accessing ch.
*/
ib_destroy_qp(ch->qp);
while (ib_poll_cq(ch->scq, 1, &wc) > 0)
;
ib_destroy_cq(ch->scq);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0)
;
ib_destroy_cq(ch->rcq);
kfree(ch);
@@ -2474,11 +2446,10 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
ch->sport = &sdev->port[param->port - 1];
ch->cm_id = cm_id;
ch->rq_size = SRPT_RQ_SIZE;
atomic_set(&ch->processing_recv_compl, 0);
atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING);
INIT_LIST_HEAD(&ch->cmd_wait_list);
spin_lock_init(&ch->recv_lock);
ret = srpt_create_ch_ib(ch);
if (ret) {
rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);

View File

@@ -216,6 +216,8 @@ enum rdma_ch_state {
* struct srpt_rdma_ch - RDMA channel.
* @cm_id: IB CM ID associated with the channel.
* @rq_size: IB receive queue size.
* @processing_recv_compl: whether or not a receive completion is being
* processed.
* @qp: IB queue pair used for communicating over this channel.
* @sq_wr_avail: number of work requests available in the send queue.
* @rcq: completion queue for receive operations over this channel.
@@ -245,9 +247,9 @@ struct srpt_rdma_ch {
struct ib_cm_id *cm_id;
struct ib_qp *qp;
int rq_size;
atomic_t sq_wr_avail;
spinlock_t recv_lock;
atomic_t processing_recv_compl;
struct ib_cq *rcq;
atomic_t sq_wr_avail;
struct ib_cq *scq;
struct srpt_port *sport;
u8 i_port_id[16];