Changed behavior of mode thread=1: IB completions are now entirely processed on thread context and there is now one thread per session instead of one thread for all sessions.

git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@1895 d57e44dd-8a1f-0410-8b47-8ef2f437770f
This commit is contained in:
Bart Van Assche
2010-07-30 14:32:44 +00:00
parent 49cbea848a
commit ffe7409533
2 changed files with 116 additions and 260 deletions

View File

@@ -74,18 +74,9 @@ MODULE_LICENSE("Dual BSD/GPL");
*/
enum threading_mode {
MODE_SIRQ = 0,
MODE_SINGLE_THREADED = 1,
MODE_MULTITHREADED = 2,
};
struct srpt_thread {
/* Protects thread_ioctx_list. */
spinlock_t thread_lock;
/* I/O contexts to be processed by the kernel thread. */
struct list_head thread_ioctx_list;
/* SRPT kernel thread. */
struct task_struct *thread;
MODE_ALL_IN_SIRQ = 0,
MODE_IB_COMPLETION_IN_THREAD = 1,
MODE_IB_COMPLETION_IN_SIRQ = 2,
};
@@ -96,7 +87,6 @@ struct srpt_thread {
static u64 srpt_service_guid;
/* List of srpt_device structures. */
static atomic_t srpt_device_count;
static DECLARE_WAIT_QUEUE_HEAD(ioctx_list_waitQ);
#if defined(CONFIG_SCST_DEBUG) || defined(CONFIG_SCST_TRACING)
static unsigned long trace_flag = DEFAULT_SRPT_TRACE_FLAGS;
module_param(trace_flag, long, 0644);
@@ -110,9 +100,7 @@ MODULE_PARM_DESC(processing_delay_in_us,
"SRP_CMD processing delay in microseconds.");
#endif
static enum scst_exec_context srpt_context;
static int thread;
static struct srpt_thread srpt_thread;
module_param(thread, int, 0444);
MODULE_PARM_DESC(thread,
"Execute SCSI commands in thread context. Defaults to zero,"
@@ -1356,7 +1344,7 @@ static void srpt_handle_send_err_comp(struct srpt_rdma_ch *ch, u64 wr_id,
enum srpt_command_state state;
struct scst_cmd *scmnd;
WARN_ON(wr_id & SRPT_OP_RECV);
EXTRACHECKS_WARN_ON(wr_id & SRPT_OP_RECV);
ioctx = sdev->ioctx_ring[wr_id & ~SRPT_OP_FLAGS];
@@ -1719,8 +1707,8 @@ static void srpt_handle_cred_rsp(struct srpt_rdma_ch *ch,
max_lun_commands = scst_get_max_lun_commands(NULL, 0);
if (4 <= max_lun_commands && max_lun_commands < ch->rq_size) {
req_lim_delta = ch->rq_size - max_lun_commands;
PRINT_INFO("Decreasing request limit by %d",
req_lim_delta);
PRINT_INFO("Decreasing initiator request limit from %d"
" to %d", ch->rq_size, max_lun_commands);
/*
* Note: at least in theory this may make the req_lim
* variable managed by the initiator temporarily
@@ -1738,7 +1726,8 @@ static void srpt_handle_cred_rsp(struct srpt_rdma_ch *ch,
* @ioctx: SRPT I/O context associated with the information unit.
*/
static void srpt_handle_new_iu(struct srpt_rdma_ch *ch,
struct srpt_ioctx *ioctx)
struct srpt_ioctx *ioctx,
enum scst_exec_context context)
{
struct srp_cmd *srp_cmd;
struct scst_cmd *scmnd;
@@ -1747,7 +1736,6 @@ static void srpt_handle_new_iu(struct srpt_rdma_ch *ch,
u8 srp_tsk_mgmt_status;
int len;
int send_rsp_res;
enum scst_exec_context context = srpt_context;
ch_state = atomic_read(&ch->state);
if (ch_state == RDMA_CHANNEL_CONNECTING) {
@@ -1882,27 +1870,14 @@ err:
}
}
/**
* srpt_rcv_completion() - IB receive completion queue callback function.
*
* Notes:
* - It is guaranteed that a completion handler will never be invoked
* concurrently on two different CPUs for the same completion queue. See also
* Documentation/infiniband/core_locking.txt and the implementation of
* handle_edge_irq() in kernel/irq/chip.c.
* - When threaded IRQs are enabled, completion handlers are invoked in thread
* context instead of interrupt context.
*/
static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
static void srpt_process_rcv_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
enum scst_exec_context context)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED);
atomic_inc(&ch->processing_recv_compl);
do {
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
WARN_ON(wc.wr_id & SRPT_OP_TTI);
@@ -1915,26 +1890,20 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
} else {
ioctx = sdev->ioctx_ring[wc.wr_id
& ~SRPT_OP_RECV];
srpt_handle_new_iu(ch, ioctx);
srpt_handle_new_iu(ch, ioctx, context);
}
}
} while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP
| IB_CQ_REPORT_MISSED_EVENTS) > 0);
atomic_dec(&ch->processing_recv_compl);
}
/**
* srpt_send_completion() - IB send completion queue callback function.
*/
static void srpt_send_completion(struct ib_cq *cq, void *ctx)
static void srpt_process_send_completion(struct ib_cq *cq,
struct srpt_rdma_ch *ch,
enum scst_exec_context context)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
enum scst_exec_context context = srpt_context;
EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED);
do {
while (ib_poll_cq(ch->scq, 1, &wc) > 0) {
@@ -1967,197 +1936,74 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx)
}
/**
* srpt_schedule_thread() - Add 'ioctx' to the tail of the ioctx list.
* srpt_rcv_completion() - IB receive completion queue callback function.
*
* @pre thread == MODE_SINGLE_THREADED
* Notes:
* - It is guaranteed that a completion handler will never be invoked
* concurrently on two different CPUs for the same completion queue. See also
* Documentation/infiniband/core_locking.txt and the implementation of
* handle_edge_irq() in kernel/irq/chip.c.
* - When threaded IRQs are enabled, completion handlers are invoked in thread
* context instead of interrupt context.
*/
static inline void srpt_schedule_thread(struct srpt_ioctx *ioctx)
{
unsigned long flags;
#ifdef CONFIG_SCST_EXTRACHECKS
struct srpt_ioctx *e;
#endif
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
spin_lock_irqsave(&srpt_thread.thread_lock, flags);
#ifdef CONFIG_SCST_EXTRACHECKS
list_for_each_entry(e, &srpt_thread.thread_ioctx_list, comp_list)
EXTRACHECKS_WARN_ON(e == ioctx);
#endif
list_add_tail(&ioctx->comp_list, &srpt_thread.thread_ioctx_list);
spin_unlock_irqrestore(&srpt_thread.thread_lock, flags);
wake_up(&ioctx_list_waitQ);
}
/**
* srpt_rcv_completion_st() - Single-threaded mode receive completion handler.
*/
static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx)
static void srpt_rcv_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
atomic_inc(&ch->processing_recv_compl);
ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->rcq, 1, &wc) > 0) {
WARN_ON(wc.wr_id & SRPT_OP_TTI);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_FLAGS];
ioctx->ch = ch;
ioctx->wr_id = wc.wr_id;
ioctx->status = wc.status;
ioctx->opcode = wc.opcode;
srpt_schedule_thread(ioctx);
switch (thread) {
case MODE_IB_COMPLETION_IN_THREAD:
wake_up_interruptible(&ch->wait_queue);
break;
case MODE_IB_COMPLETION_IN_SIRQ:
srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_THREAD);
break;
case MODE_ALL_IN_SIRQ:
srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_TASKLET);
break;
}
atomic_dec(&ch->processing_recv_compl);
}
/**
* srpt_rcv_completion_st() - Single-threaded mode send completion handler.
* srpt_send_completion() - IB send completion queue callback function.
*/
static void srpt_send_completion_st(struct ib_cq *cq, void *ctx)
static void srpt_send_completion(struct ib_cq *cq, void *ctx)
{
struct srpt_rdma_ch *ch = ctx;
struct srpt_device *sdev = ch->sport->sdev;
struct ib_wc wc;
struct srpt_ioctx *ioctx;
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP);
while (ib_poll_cq(ch->scq, 1, &wc) > 0) {
WARN_ON(wc.wr_id & SRPT_OP_RECV);
ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_FLAGS];
ioctx->ch = ch;
ioctx->wr_id = wc.wr_id;
ioctx->status = wc.status;
ioctx->opcode = wc.opcode;
srpt_schedule_thread(ioctx);
#if 0
switch (thread) {
case MODE_IB_COMPLETION_IN_THREAD:
case MODE_IB_COMPLETION_IN_SIRQ:
srpt_process_send_completion(cq, ch, SCST_CONTEXT_THREAD);
break;
case MODE_ALL_IN_SIRQ:
srpt_process_send_completion(cq, ch, SCST_CONTEXT_TASKLET);
break;
}
#else
srpt_process_send_completion(cq, ch, SCST_CONTEXT_TASKLET);
#endif
}
/**
* srpt_test_ioctx_list() - Returns false if the kernel thread should sleep.
*
* @pre thread == MODE_SINGLE_THREADED
* @pre the caller holds a lock on srpt_thread.thread_lock
*/
static inline int srpt_test_ioctx_list(void)
static int srpt_compl_thread(void *arg)
{
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
return !list_empty(&srpt_thread.thread_ioctx_list)
|| unlikely(kthread_should_stop());
}
/**
* srpt_ioctx_thread() - ib_srpt kernel thread entry point.
*
* This kernel thread is only created when the module parameter 'thread' equals
* one. This thread processes the ioctx list srpt_thread.thread_ioctx_list.
*
* @pre thread == MODE_SINGLE_THREADED
*/
static int srpt_ioctx_thread(void *arg)
{
EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED);
struct srpt_rdma_ch* ch;
/* Hibernation / freezing of the SRPT kernel thread is not supported. */
current->flags |= PF_NOFREEZE;
spin_lock_irq(&srpt_thread.thread_lock);
ch = arg;
BUG_ON(!ch);
PRINT_INFO("Kernel thread %s started", ch->thread->comm);
while (!kthread_should_stop()) {
wait_queue_t wait;
init_waitqueue_entry(&wait, current);
if (!srpt_test_ioctx_list()) {
add_wait_queue_exclusive(&ioctx_list_waitQ, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (srpt_test_ioctx_list())
break;
spin_unlock_irq(&srpt_thread.thread_lock);
schedule();
spin_lock_irq(&srpt_thread.thread_lock);
}
set_current_state(TASK_RUNNING);
remove_wait_queue(&ioctx_list_waitQ, &wait);
}
while (!list_empty(&srpt_thread.thread_ioctx_list)) {
struct srpt_ioctx *ioctx;
struct srpt_rdma_ch *ch;
u64 wr_id;
enum ib_wc_status status;
enum ib_wc_opcode opcode;
ioctx = list_first_entry(&srpt_thread.thread_ioctx_list,
typeof(*ioctx), comp_list);
BUG_ON(!ioctx);
list_del(&ioctx->comp_list);
spin_unlock_irq(&srpt_thread.thread_lock);
ch = ioctx->ch;
wr_id = ioctx->wr_id;
status = ioctx->status;
opcode = ioctx->opcode;
BUG_ON(!ch);
if (wr_id & SRPT_OP_RECV) {
if (unlikely(status)) {
PRINT_INFO("receiving wr_id %u failed"
" with status %d",
(unsigned)(wr_id & ~SRPT_OP_FLAGS),
status);
goto continue_processing;
}
} else {
if (unlikely(status)) {
PRINT_INFO("sending %s for wr_id"
" %u failed with status %d",
wr_id & SRPT_OP_TTI
? "request" : "response",
(unsigned)
(wr_id & ~SRPT_OP_FLAGS),
status);
srpt_handle_send_err_comp(ch,
wr_id, srpt_context);
goto continue_processing;
}
}
switch (opcode) {
case IB_WC_SEND:
if (wr_id & SRPT_OP_TTI)
srpt_put_tti_ioctx(ch);
else
srpt_handle_send_comp(ch, ioctx,
srpt_context);
break;
case IB_WC_RDMA_READ:
srpt_handle_rdma_comp(ch, ioctx, srpt_context);
break;
case IB_WC_RECV:
srpt_handle_new_iu(ch, ioctx);
break;
default:
PRINT_ERROR("received unrecognized WC opcode"
" %d", opcode);
break;
}
continue_processing:
spin_lock_irq(&srpt_thread.thread_lock);
}
wait_event_interruptible(ch->wait_queue,
(srpt_process_rcv_completion(ch->scq, ch,
SCST_CONTEXT_THREAD),
kthread_should_stop()));
}
spin_unlock_irq(&srpt_thread.thread_lock);
PRINT_INFO("Kernel thread %s stopped", ch->thread->comm);
return 0;
}
@@ -2179,15 +2025,11 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->rcq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_rcv_completion_st : srpt_rcv_completion,
NULL, ch, ch->rq_size);
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
ch->rq_size);
#else
ch->rcq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_rcv_completion_st : srpt_rcv_completion,
NULL, ch, ch->rq_size, 0);
ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch,
ch->rq_size, 0);
#endif
if (IS_ERR(ch->rcq)) {
ret = PTR_ERR(ch->rcq);
@@ -2197,15 +2039,11 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch)
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \
&& ! defined(RHEL_RELEASE_CODE)
ch->scq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_send_completion_st : srpt_send_completion,
NULL, ch, srpt_sq_size);
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
srpt_sq_size);
#else
ch->scq = ib_create_cq(sdev->device,
thread == MODE_SINGLE_THREADED
? srpt_send_completion_st : srpt_send_completion,
NULL, ch, srpt_sq_size, 0);
ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch,
srpt_sq_size, 0);
#endif
if (IS_ERR(ch->scq)) {
ret = PTR_ERR(ch->scq);
@@ -2281,9 +2119,6 @@ static void srpt_unregister_channel(struct srpt_rdma_ch *ch)
atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING);
spin_unlock_irq(&sdev->spinlock);
if (thread == MODE_SINGLE_THREADED)
flush_scheduled_work();
/*
* At this point it is guaranteed that no new commands will be sent to
* the SCST core for channel ch, which is a requirement for
@@ -2395,6 +2230,11 @@ static void srpt_release_channel(struct scst_session *scst_sess)
while (atomic_read(&ch->processing_recv_compl))
;
if (ch->thread) {
kthread_stop(ch->thread);
ch->thread = NULL;
}
/*
* At this point it is guaranteed that no new completions will be
* received and also that no completion handler is still accessing ch.
@@ -2669,6 +2509,22 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
be64_to_cpu(*(__be64 *)(ch->i_port_id + 8)));
}
if (thread == MODE_IB_COMPLETION_IN_THREAD) {
init_waitqueue_head(&ch->wait_queue);
TRACE_DBG("creating IB completion thread for session %s",
ch->sess_name);
ch->thread = kthread_run(srpt_compl_thread, ch,
"ib_srpt_compl");
if (IS_ERR(ch->thread)) {
PRINT_ERROR("failed to create kernel thread %ld",
PTR_ERR(ch->thread));
ch->thread = NULL;
goto destroy_ib;
}
}
TRACE_DBG("registering session %s", ch->sess_name);
BUG_ON(!sdev->scst_tgt);
@@ -2677,7 +2533,7 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
if (!ch->scst_sess) {
rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES);
TRACE_DBG("%s", "Failed to create scst sess");
goto destroy_ib;
goto destroy_thread;
}
TRACE_DBG("Establish connection sess=%p name=%s cm_id=%p",
@@ -2727,6 +2583,10 @@ release_channel:
scst_unregister_session(ch->scst_sess, 0, NULL);
ch->scst_sess = NULL;
destroy_thread:
kthread_stop(ch->thread);
ch->thread = NULL;
destroy_ib:
ib_destroy_qp(ch->qp);
ib_destroy_cq(ch->scq);
@@ -2789,7 +2649,7 @@ static void srpt_cm_rtu_recv(struct ib_cm_id *cm_id)
list_for_each_entry_safe(ioctx, ioctx_tmp, &ch->cmd_wait_list,
wait_list) {
list_del(&ioctx->wait_list);
srpt_handle_new_iu(ch, ioctx);
srpt_handle_new_iu(ch, ioctx, SCST_CONTEXT_THREAD);
}
if (ret && srpt_test_and_set_channel_state(ch,
RDMA_CHANNEL_LIVE,
@@ -3424,7 +3284,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd)
new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_CMD_RSP_SENT);
if (unlikely(scst_cmd_aborted(scmnd))) {
srpt_abort_scst_cmd(ioctx, srpt_context);
srpt_abort_scst_cmd(ioctx, SCST_CONTEXT_SAME);
goto out;
}
@@ -4142,20 +4002,28 @@ static int __init srpt_init_module(void)
}
switch (thread) {
case MODE_SIRQ:
/* IRQ context */
srpt_context = SCST_CONTEXT_TASKLET;
case MODE_ALL_IN_SIRQ:
/*
* Process both IB completions and SCST commands in SIRQ
* context. May lead to soft lockups and other scary behavior
* under sufficient load.
*/
srpt_template.rdy_to_xfer_atomic = true;
break;
case MODE_SINGLE_THREADED:
/* single kernel thread (created by ib_srpt) */
srpt_context = SCST_CONTEXT_DIRECT;
case MODE_IB_COMPLETION_IN_THREAD:
/*
* Process IB completions in the kernel thread associated with
* the RDMA channel, and process SCST commands in the kernel
* threads created by the SCST core.
*/
srpt_template.rdy_to_xfer_atomic = false;
break;
case MODE_MULTITHREADED:
case MODE_IB_COMPLETION_IN_SIRQ:
default:
/* multiple kernel threads (created by the SCST core) */
srpt_context = SCST_CONTEXT_THREAD;
/*
* Process IB completions in SIRQ context and SCST commands in
* the kernel threads created by the SCST core.
*/
srpt_template.rdy_to_xfer_atomic = false;
break;
}
@@ -4181,23 +4049,8 @@ static int __init srpt_init_module(void)
goto out_unregister_target;
}
if (thread == 1) {
spin_lock_init(&srpt_thread.thread_lock);
INIT_LIST_HEAD(&srpt_thread.thread_ioctx_list);
srpt_thread.thread = kthread_run(srpt_ioctx_thread,
NULL, "srpt_thread");
if (IS_ERR(srpt_thread.thread)) {
srpt_thread.thread = NULL;
thread = 0;
PRINT_ERROR("%s", "kernel thread creation failed");
goto out_unregister_client;
}
}
return 0;
out_unregister_client:
ib_unregister_client(&srpt_client);
out_unregister_target:
#ifdef CONFIG_SCST_PROC
/*

View File

@@ -192,7 +192,6 @@ enum srpt_command_state {
* @buf: Pointer to the message transferred via this I/O context.
* @dma: DMA address of buf.
* @wait_list: Node for insertion in srpt_rdma_ch::cmd_wait_list.
* @comp_list: Node for insertion in the srpt_thread::thread_ioctx_list.
* @state: I/O context state. See also enum srpt_command_state.
*/
struct srpt_ioctx {
@@ -211,7 +210,6 @@ struct srpt_ioctx {
u64 wr_id;
enum ib_wc_status status;
enum ib_wc_opcode opcode;
struct list_head comp_list;
struct srpt_rdma_ch *ch;
struct scst_cmd *scmnd;
scst_data_direction dir;
@@ -241,6 +239,9 @@ enum rdma_ch_state {
/**
* struct srpt_rdma_ch - RDMA channel.
* @thread: Kernel thread that processes the IB queues associated with
* the channel.
* @wait_queue: Allows the kernel thread to wait for more work.
* @cm_id: IB CM ID associated with the channel.
* @rq_size: IB receive queue size.
* @processing_recv_compl: whether or not a receive completion is being
@@ -275,6 +276,8 @@ enum rdma_ch_state {
* @sess_name: SCST session name.
*/
struct srpt_rdma_ch {
struct task_struct *thread;
wait_queue_head_t wait_queue;
struct ib_cm_id *cm_id;
struct ib_qp *qp;
int rq_size;