- Implemented a workaround for a race between ib_destroy_cq() and the IB
  completion handlers. This race could trigger a kernel crash or kernel lockup
  when unloading the ib_srpt module while I/O was ongoing or by performing
  a relogin from an SRP initiator while I/O was ongoing.
- Moved the code for handling IB completions in single-threaded mode to
  separate functions.


git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1848 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-07-21 10:52:36 +00:00
parent 2c663262b1
commit 5ded0b7660
2 changed files with 162 additions and 63 deletions

View File

@@ -1209,35 +1209,38 @@ out:
TRACE_EXIT();
}
static void srpt_handle_err_comp(struct srpt_rdma_ch *ch, struct ib_wc *wc,
enum scst_exec_context context)
/**
* srpt_handle_send_err_comp() - Process an IB_WC_SEND or RDMA error completion.
*/
static void srpt_handle_send_err_comp(struct srpt_rdma_ch *ch, u64 wr_id,
enum scst_exec_context context)
{
struct srpt_ioctx *ioctx;
struct srpt_device *sdev = ch->sport->sdev;
enum srpt_command_state state;
struct scst_cmd *scmnd;
if (!(wc->wr_id & SRPT_OP_RECV)) {
ioctx = sdev->ioctx_ring[wc->wr_id];
state = srpt_get_cmd_state(ioctx);
scmnd = ioctx->scmnd;
WARN_ON(wr_id & SRPT_OP_RECV);
EXTRACHECKS_WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
&& state != SRPT_STATE_MGMT_RSP_SENT
&& state != SRPT_STATE_NEED_DATA
&& state != SRPT_STATE_DONE);
EXTRACHECKS_WARN_ON((state == SRPT_STATE_MGMT_RSP_SENT)
!= (scmnd == NULL));
ioctx = sdev->ioctx_ring[wr_id];
state = srpt_get_cmd_state(ioctx);
scmnd = ioctx->scmnd;
if (state == SRPT_STATE_DONE)
PRINT_ERROR("Received more than one IB error completion"
" for wr_id = %u.", (unsigned)(wc->wr_id));
else if (scmnd)
srpt_abort_scst_cmd(ioctx, context);
else {
srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
srpt_reset_ioctx(ch, ioctx);
}
EXTRACHECKS_WARN_ON(state != SRPT_STATE_CMD_RSP_SENT
&& state != SRPT_STATE_MGMT_RSP_SENT
&& state != SRPT_STATE_NEED_DATA
&& state != SRPT_STATE_DONE);
EXTRACHECKS_WARN_ON((state == SRPT_STATE_MGMT_RSP_SENT)
!= (scmnd == NULL));
if (state == SRPT_STATE_DONE)
PRINT_ERROR("Received more than one IB error completion"
" for wr_id = %u.", (unsigned)wr_id);
else if (scmnd)
srpt_abort_scst_cmd(ioctx, context);
else {
srpt_set_cmd_state(ioctx, SRPT_STATE_DONE);
srpt_reset_ioctx(ch, ioctx);
}
}
@@ -1745,9 +1748,10 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
enum scst_exec_context context = srpt_context;
unsigned long flags;
EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED);
spin_lock_irqsave(&ch->recv_lock, flags);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
@@ -1758,7 +1762,6 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
PRINT_INFO("receiving wr_id %u failed with status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
srpt_handle_err_comp(ch, &wc, context);
spin_lock_irqsave(&ch->recv_lock, flags);
continue;
}
@@ -1767,12 +1770,7 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
if (unlikely(req_lim < 0))
PRINT_ERROR("req_lim = %d < 0", req_lim);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
if (thread == MODE_SINGLE_THREADED) {
ioctx->ch = ch;
ioctx->op = wc.opcode;
srpt_schedule_thread(ioctx);
} else
srpt_handle_new_iu(ch, ioctx);
srpt_handle_new_iu(ch, ioctx);
#if defined(CONFIG_SCST_DEBUG)
if (interrupt_processing_delay_in_us <= MAX_UDELAY_MS * 1000)
@@ -1793,6 +1791,8 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx)
struct srpt_ioctx *ioctx;
enum scst_exec_context context = srpt_context;
EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED);
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->scq, 1, &wc) > 0) {
if (unlikely(wc.status)) {
@@ -1800,27 +1800,21 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx)
" status %d",
(unsigned)(wc.wr_id & ~SRPT_OP_RECV),
wc.status);
srpt_handle_err_comp(ch, &wc, context);
srpt_handle_send_err_comp(ch, wc.wr_id, context);
continue;
}
ioctx = sdev->ioctx_ring[wc.wr_id];
atomic_add(wc.opcode == IB_WC_SEND ? 1 : ioctx->n_rdma,
&ch->sq_wr_avail);
if (thread == MODE_SINGLE_THREADED) {
ioctx->ch = ch;
ioctx->op = wc.opcode;
srpt_schedule_thread(ioctx);
} else {
if (wc.opcode == IB_WC_SEND)
srpt_handle_send_comp(ch, ioctx, context);
else {
if (wc.opcode == IB_WC_SEND)
srpt_handle_send_comp(ch, ioctx, context);
else {
#if defined(CONFIG_SCST_DEBUG)
WARN_ON(wc.opcode != IB_WC_RDMA_READ);
WARN_ON(ioctx->n_rdma <= 0);
WARN_ON(wc.opcode != IB_WC_RDMA_READ);
WARN_ON(ioctx->n_rdma <= 0);
#endif
srpt_handle_rdma_comp(ch, ioctx, context);
}
srpt_handle_rdma_comp(ch, ioctx, context);
}
#if defined(CONFIG_SCST_DEBUG)
@@ -1830,6 +1824,55 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx)
}
}
/**
* srpt_rcv_completion_st() - Single-threaded mode receive completion handler.
*/
static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
unsigned long flags;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
spin_lock_irqsave(&ch->recv_lock, flags);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV];
ioctx->ch = ch;
ioctx->wr_id = wc.wr_id;
ioctx->status = wc.status;
ioctx->opcode = wc.opcode;
srpt_schedule_thread(ioctx);
}
spin_unlock_irqrestore(&ch->recv_lock, flags);
}
/**
* srpt_rcv_completion_st() - Single-threaded mode send completion handler.
*/
static void srpt_send_completion_st(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->scq, 1, &wc) > 0) {
ioctx = sdev->ioctx_ring[wc.wr_id];
ioctx->ch = ch;
ioctx->wr_id = wc.wr_id;
ioctx->status = wc.status;
ioctx->opcode = wc.opcode;
srpt_schedule_thread(ioctx);
}
}
/**
* srpt_ioctx_thread() - ib_srpt kernel thread entry point.
*
@@ -1840,8 +1883,6 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx)
*/
static int srpt_ioctx_thread(void *arg)
{
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
/* Hibernation / freezing of the SRPT kernel thread is not supported. */
@@ -1868,29 +1909,68 @@ static int srpt_ioctx_thread(void *arg)
}
while (!list_empty(&srpt_thread.thread_ioctx_list)) {
ioctx = list_entry(srpt_thread.thread_ioctx_list.next,
struct srpt_ioctx, comp_list);
struct srpt_ioctx *ioctx;
struct srpt_rdma_ch *ch;
u64 wr_id;
enum ib_wc_status status;
enum ib_wc_opcode opcode;
ioctx = list_first_entry(&srpt_thread.thread_ioctx_list,
typeof(*ioctx), comp_list);
list_del(&ioctx->comp_list);
spin_unlock_irq(&srpt_thread.thread_lock);
switch (ioctx->op) {
ch = ioctx->ch;
wr_id = ioctx->wr_id;
status = ioctx->status;
opcode = ioctx->opcode;
if (wr_id & SRPT_OP_RECV) {
int req_lim;
if (unlikely(status)) {
PRINT_INFO("receiving wr_id %u failed"
" with status %d",
(unsigned)(wr_id & ~SRPT_OP_RECV),
status);
goto continue_processing;
}
req_lim = atomic_dec_return(&ch->req_lim);
if (unlikely(req_lim < 0))
PRINT_ERROR("req_lim = %d < 0",
req_lim);
} else {
if (unlikely(status)) {
PRINT_INFO("sending response for wr_id"
" %u failed with status %d",
(unsigned)wr_id,
status);
srpt_handle_send_err_comp(ch, wr_id,
srpt_context);
goto continue_processing;
}
atomic_add(opcode == IB_WC_SEND
? 1 : ioctx->n_rdma,
&ch->sq_wr_avail);
}
switch (opcode) {
case IB_WC_SEND:
srpt_handle_send_comp(ioctx->ch, ioctx,
srpt_context);
srpt_handle_send_comp(ch, ioctx, srpt_context);
break;
case IB_WC_RDMA_READ:
srpt_handle_rdma_comp(ioctx->ch, ioctx,
srpt_context);
srpt_handle_rdma_comp(ch, ioctx, srpt_context);
break;
case IB_WC_RECV:
srpt_handle_new_iu(ioctx->ch, ioctx);
srpt_handle_new_iu(ch, ioctx);
break;
default:
PRINT_ERROR("received unrecognized WC opcode"
" %d", ioctx->op);
" %d", opcode);
break;
}
continue_processing:
spin_lock_irq(&srpt_thread.thread_lock);
}
}
@@ -1915,11 +1995,15 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
SRPT_RQ_SIZE);
ch->rcq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_rcv_completion_st : srpt_rcv_completion,
NULL, ch, SRPT_RQ_SIZE);
#else
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
SRPT_RQ_SIZE, 0);
ch->rcq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_rcv_completion_st : srpt_rcv_completion,
NULL, ch, SRPT_RQ_SIZE, 0);
#endif
if (IS_ERR(ch->rcq)) {
ret = PTR_ERR(ch->rcq);
@@ -1929,11 +2013,15 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
SRPT_SQ_SIZE);
ch->scq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_send_completion_st : srpt_send_completion,
NULL, ch, SRPT_SQ_SIZE);
#else
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
SRPT_SQ_SIZE, 0);
ch->scq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_send_completion_st : srpt_send_completion,
NULL, ch, SRPT_SQ_SIZE, 0);
#endif
if (IS_ERR(ch->scq)) {
ret = PTR_ERR(ch->scq);
@@ -2102,6 +2190,15 @@ static void srpt_release_channel(struct scst_session *scst_sess)
BUG_ON(!ch->cm_id);
ib_destroy_cm_id(ch->cm_id);
/*
* After having destroyed the CM ID, wait with destroying the
* completion queues and freeing ch until all completion handlers
* have finished. Concurrent execution of ib_destroy_cq() and a
* completion handler will trigger a kernel lockup or kernel crash.
*/
msleep(250);
PRINT_INFO("%s", "destroying QP and CQ's");
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->scq);
ib_destroy_cq(ch->rcq);

View File

@@ -181,7 +181,9 @@ struct srpt_ioctx {
u8 n_rdma;
u8 n_rbuf;
enum ib_wc_opcode op;
u64 wr_id;
enum ib_wc_status status;
enum ib_wc_opcode opcode;
struct list_head comp_list;
struct srpt_rdma_ch *ch;
struct scst_cmd *scmnd;