- Made sure that in mode thread=1 not only send completions but also receive completions are processed in thread context.
- Merged IB send and receive completion queues again such that both types of completions are processed in a fairly.

git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1904 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-08-02 16:50:14 +00:00
parent 0cb5524700
commit e30376945f
2 changed files with 78 additions and 129 deletions

View File

@@ -1878,96 +1878,77 @@ err:
static void srpt_process_rcv_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
enum scst_exec_context context)
enum scst_exec_context context,
struct ib_wc *wc)
{
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(cq != ch->rcq);
EXTRACHECKS_WARN_ON((wc->wr_id & SRPT_OP_RECV) == 0);
EXTRACHECKS_WARN_ON((wc->wr_id & SRPT_OP_TTI) != 0);
do {
while (ib_poll_cq(cq, 1, &wc) > 0) {
EXTRACHECKS_WARN_ON(wc.wr_id & SRPT_OP_TTI);
if (wc.status == IB_WC_SUCCESS) {
ioctx = sdev->ioctx_ring[wc.wr_id
& ~SRPT_OP_RECV];
#if 0
PRINT_INFO("recv completion, opcode 0x%02x\n",
*(u8 *)ioctx->buf);
print_hex_dump(KERN_INFO, "",
DUMP_PREFIX_OFFSET, 16, 1,
ioctx->buf, wc.byte_len, true);
#endif
srpt_handle_new_iu(ch, ioctx, context);
} else {
PRINT_INFO("receiving wr_id %u failed with"
" status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
}
}
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS) > 0);
if (wc->status == IB_WC_SUCCESS) {
ioctx = sdev->ioctx_ring[wc->wr_id & ~SRPT_OP_RECV];
srpt_handle_new_iu(ch, ioctx, context);
} else {
PRINT_INFO("receiving wr_id %u failed with status %d",
(unsigned)(wc->wr_id & ~SRPT_OP_RECV), wc->status);
}
}
static void srpt_process_send_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
enum scst_exec_context context)
enum scst_exec_context context,
struct ib_wc *wc)
{
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
int processed;
EXTRACHECKS_WARN_ON(cq != ch->scq);
EXTRACHECKS_WARN_ON((wc->wr_id & SRPT_OP_RECV) != 0);
if (wc->status == IB_WC_SUCCESS) {
if ((wc->wr_id & SRPT_OP_TTI) == 0) {
ioctx = sdev->ioctx_ring[wc->wr_id];
if (wc->opcode == IB_WC_SEND)
srpt_handle_send_comp(ch, ioctx, context);
else {
EXTRACHECKS_WARN_ON(wc->opcode
!= IB_WC_RDMA_READ);
srpt_handle_rdma_comp(ch, ioctx, context);
}
} else
srpt_put_tti_ioctx(ch);
} else {
PRINT_INFO("sending %s for wr_id %u failed with status %d",
wc->wr_id & SRPT_OP_TTI ? "request" : "response",
(unsigned)(wc->wr_id & ~SRPT_OP_FLAGS), wc->status);
srpt_handle_send_err_comp(ch, wc->wr_id, context);
}
}
static void srpt_process_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
enum scst_exec_context context)
{
struct ib_wc wc;
EXTRACHECKS_WARN_ON(cq != ch->cq);
processed = 0;
do {
while (ib_poll_cq(cq, 1, &wc) > 0) {
if (wc.status == IB_WC_SUCCESS) {
if ((wc.wr_id & SRPT_OP_TTI) == 0) {
ioctx = sdev->ioctx_ring[wc.wr_id];
if (wc.opcode == IB_WC_SEND)
srpt_handle_send_comp(ch, ioctx,
context);
else {
EXTRACHECKS_WARN_ON(wc.opcode
!= IB_WC_RDMA_READ);
srpt_handle_rdma_comp(ch, ioctx,
context);
}
/*
* Modes thread == 0 and thread == 2
* may cause unfair treatment of the
* receive queue. The code below makes
* sure that that queue is processed
* in time.
*/
if (thread != MODE_IB_COMPLETION_IN_SIRQ
&& (++processed % ch->rq_size) == 0)
srpt_process_rcv_completion
(ch->rcq, ch, context);
} else
srpt_put_tti_ioctx(ch);
} else {
PRINT_INFO("sending %s for wr_id %u"
" failed with status %d",
wc.wr_id & SRPT_OP_TTI
? "request" : "response",
(unsigned)(wc.wr_id & ~SRPT_OP_FLAGS),
wc.status);
srpt_handle_send_err_comp(ch, wc.wr_id,
context);
}
if (wc.wr_id & SRPT_OP_RECV)
srpt_process_rcv_completion(cq, ch, context,
&wc);
else
srpt_process_send_completion(cq, ch, context,
&wc);
}
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
/**
* srpt_rcv_completion() - IB receive completion queue callback function.
* srpt_completion() - IB completion queue callback function.
*
* Notes:
* - It is guaranteed that a completion handler will never be invoked
@@ -1977,33 +1958,23 @@ static void srpt_process_send_completion(struct ib_cq *cq,
* - When threaded IRQs are enabled, completion handlers are invoked in thread
* context instead of interrupt context.
*/
static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
static void srpt_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
atomic_inc(&ch->processing_recv_compl);
atomic_inc(&ch->processing_compl);
switch (thread) {
case MODE_IB_COMPLETION_IN_THREAD:
wake_up_interruptible(&ch->wait_queue);
break;
case MODE_IB_COMPLETION_IN_SIRQ:
srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_THREAD);
srpt_process_completion(cq, ch, SCST_CONTEXT_THREAD);
break;
case MODE_ALL_IN_SIRQ:
srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_TASKLET);
srpt_process_completion(cq, ch, SCST_CONTEXT_TASKLET);
break;
}
atomic_dec(&ch->processing_recv_compl);
}
/**
* srpt_send_completion() - IB send completion queue callback function.
*/
static void srpt_send_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
srpt_process_send_completion(cq, ch, SCST_CONTEXT_TASKLET);
atomic_dec(&ch->processing_compl);
}
static int srpt_compl_thread(void *arg)
@@ -2019,8 +1990,8 @@ static int srpt_compl_thread(void *arg)
ch->sess_name, ch->thread->comm, current->pid);
while (!kthread_should_stop()) {
wait_event_interruptible(ch->wait_queue,
(srpt_process_rcv_completion(ch->rcq, ch,
SCST_CONTEXT_THREAD),
(srpt_process_completion(ch->cq, ch,
SCST_CONTEXT_THREAD),
kthread_should_stop()));
}
PRINT_INFO("Session %s: kernel thread %s (PID %d) stopped",
@@ -2046,38 +2017,24 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& !defined(RHEL_RELEASE_CODE)
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
ch->rq_size);
ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch,
ch->rq_size + srpt_sq_size);
#else
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
ch->rq_size, 0);
ch->cq = ib_create_cq(sdev->device, srpt_completion, NULL, ch,
ch->rq_size + srpt_sq_size, 0);
#endif
if (IS_ERR(ch->rcq)) {
ret = PTR_ERR(ch->rcq);
if (IS_ERR(ch->cq)) {
ret = PTR_ERR(ch->cq);
PRINT_ERROR("failed to create CQ cqe= %d ret= %d",
ch->rq_size, ret);
ch->rq_size + srpt_sq_size, ret);
goto out;
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& !defined(RHEL_RELEASE_CODE)
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
srpt_sq_size);
#else
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
srpt_sq_size, 0);
#endif
if (IS_ERR(ch->scq)) {
ret = PTR_ERR(ch->scq);
PRINT_ERROR("failed to create CQ cqe= %d ret= %d",
srpt_sq_size, ret);
goto err_destroy_rcq;
}
qp_init->qp_context = (void *)ch;
qp_init->event_handler
= (void(*)(struct ib_event *, void*))srpt_qp_event;
qp_init->send_cq = ch->scq;
qp_init->recv_cq = ch->rcq;
qp_init->send_cq = ch->cq;
qp_init->recv_cq = ch->cq;
qp_init->srq = sdev->srq;
qp_init->sq_sig_type = IB_SIGNAL_REQ_WR;
qp_init->qp_type = IB_QPT_RC;
@@ -2088,13 +2045,13 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
if (IS_ERR(ch->qp)) {
ret = PTR_ERR(ch->qp);
PRINT_ERROR("failed to create_qp ret= %d", ret);
goto err_destroy_scq;
goto err_destroy_cq;
}
atomic_set(&ch->sq_wr_avail, qp_init->cap.max_send_wr);
TRACE_DBG("%s: max_cqe= %d r max_sge= %d s max_sge= %d sq_size = %d"
" cm_id= %p", __func__, ch->rcq->cqe, ch->scq->cqe,
TRACE_DBG("%s: max_cqe= %d max_sge= %d sq_size = %d"
" cm_id= %p", __func__, ch->cq->cqe,
qp_init->cap.max_send_sge, qp_init->cap.max_send_wr,
ch->cm_id);
@@ -2117,9 +2074,7 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
goto err_destroy_qp;
}
} else
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP);
out:
kfree(qp_init);
@@ -2127,10 +2082,8 @@ out:
err_destroy_qp:
ib_destroy_qp(ch->qp);
err_destroy_scq:
ib_destroy_cq(ch->scq);
err_destroy_rcq:
ib_destroy_cq(ch->rcq);
err_destroy_cq:
ib_destroy_cq(ch->cq);
goto out;
}
@@ -2147,12 +2100,11 @@ static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch)
if (ret < 0)
PRINT_ERROR("Resetting queue pair state failed: %d", ret);
while (atomic_read(&ch->processing_recv_compl))
while (atomic_read(&ch->processing_compl))
;
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->scq);
ib_destroy_cq(ch->rcq);
ib_destroy_cq(ch->cq);
}
/**
@@ -2478,7 +2430,7 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
ch->sport = &sdev->port[param->port - 1];
ch->cm_id = cm_id;
ch->rq_size = max(SRPT_RQ_SIZE, scst_get_max_lun_commands(NULL, 0));
atomic_set(&ch->processing_recv_compl, 0);
atomic_set(&ch->processing_compl, 0);
atomic_set(&ch->state, RDMA_CHANNEL_CONNECTING);
INIT_LIST_HEAD(&ch->cmd_wait_list);

View File

@@ -244,12 +244,10 @@ enum rdma_ch_state {
* the channel.
* @cm_id: IB CM ID associated with the channel.
* @rq_size: IB receive queue size.
* @processing_recv_compl: whether or not a receive completion is being
* processed.
* @processing_compl: whether or not an IB completion is being processed.
* @qp: IB queue pair used for communicating over this channel.
* @sq_wr_avail: number of work requests available in the send queue.
* @rcq: completion queue for receive operations over this channel.
* @scq: completion queue for send operations over this channel.
* @cq: IB completion queue for this channel.
* @sport: pointer to the information of the HCA port used by this
* channel.
* @i_port_id: 128-bit initiator port identifier copied from SRP_LOGIN_REQ.
@@ -281,10 +279,9 @@ struct srpt_rdma_ch {
struct ib_cm_id *cm_id;
struct ib_qp *qp;
int rq_size;
atomic_t processing_recv_compl;
struct ib_cq *rcq;
atomic_t processing_compl;
struct ib_cq *cq;
atomic_t sq_wr_avail;
struct ib_cq *scq;
struct srpt_port *sport;
u8 i_port_id[16];
u8 t_port_id[16];