diff --git a/srpt/src/ib_srpt.c b/srpt/src/ib_srpt.c index 3fbdb244d..c88ea73e7 100644 --- a/srpt/src/ib_srpt.c +++ b/srpt/src/ib_srpt.c @@ -74,18 +74,9 @@ MODULE_LICENSE("Dual BSD/GPL"); */ enum threading_mode { - MODE_SIRQ = 0, - MODE_SINGLE_THREADED = 1, - MODE_MULTITHREADED = 2, -}; - -struct srpt_thread { - /* Protects thread_ioctx_list. */ - spinlock_t thread_lock; - /* I/O contexts to be processed by the kernel thread. */ - struct list_head thread_ioctx_list; - /* SRPT kernel thread. */ - struct task_struct *thread; + MODE_ALL_IN_SIRQ = 0, + MODE_IB_COMPLETION_IN_THREAD = 1, + MODE_IB_COMPLETION_IN_SIRQ = 2, }; @@ -96,7 +87,6 @@ struct srpt_thread { static u64 srpt_service_guid; /* List of srpt_device structures. */ static atomic_t srpt_device_count; -static DECLARE_WAIT_QUEUE_HEAD(ioctx_list_waitQ); #if defined(CONFIG_SCST_DEBUG) || defined(CONFIG_SCST_TRACING) static unsigned long trace_flag = DEFAULT_SRPT_TRACE_FLAGS; module_param(trace_flag, long, 0644); @@ -110,9 +100,7 @@ MODULE_PARM_DESC(processing_delay_in_us, "SRP_CMD processing delay in microseconds."); #endif -static enum scst_exec_context srpt_context; static int thread; -static struct srpt_thread srpt_thread; module_param(thread, int, 0444); MODULE_PARM_DESC(thread, "Execute SCSI commands in thread context. Defaults to zero," @@ -1356,7 +1344,7 @@ static void srpt_handle_send_err_comp(struct srpt_rdma_ch *ch, u64 wr_id, enum srpt_command_state state; struct scst_cmd *scmnd; - WARN_ON(wr_id & SRPT_OP_RECV); + EXTRACHECKS_WARN_ON(wr_id & SRPT_OP_RECV); ioctx = sdev->ioctx_ring[wr_id & ~SRPT_OP_FLAGS]; @@ -1719,8 +1707,8 @@ static void srpt_handle_cred_rsp(struct srpt_rdma_ch *ch, max_lun_commands = scst_get_max_lun_commands(NULL, 0); if (4 <= max_lun_commands && max_lun_commands < ch->rq_size) { req_lim_delta = ch->rq_size - max_lun_commands; - PRINT_INFO("Decreasing request limit by %d", - req_lim_delta); + PRINT_INFO("Decreasing initiator request limit from %d" + " to %d", ch->rq_size, max_lun_commands); /* * Note: at least in theory this may make the req_lim * variable managed by the initiator temporarily @@ -1738,7 +1726,8 @@ static void srpt_handle_cred_rsp(struct srpt_rdma_ch *ch, * @ioctx: SRPT I/O context associated with the information unit. */ static void srpt_handle_new_iu(struct srpt_rdma_ch *ch, - struct srpt_ioctx *ioctx) + struct srpt_ioctx *ioctx, + enum scst_exec_context context) { struct srp_cmd *srp_cmd; struct scst_cmd *scmnd; @@ -1747,7 +1736,6 @@ static void srpt_handle_new_iu(struct srpt_rdma_ch *ch, u8 srp_tsk_mgmt_status; int len; int send_rsp_res; - enum scst_exec_context context = srpt_context; ch_state = atomic_read(&ch->state); if (ch_state == RDMA_CHANNEL_CONNECTING) { @@ -1882,27 +1870,14 @@ err: } } -/** - * srpt_rcv_completion() - IB receive completion queue callback function. - * - * Notes: - * - It is guaranteed that a completion handler will never be invoked - * concurrently on two different CPUs for the same completion queue. See also - * Documentation/infiniband/core_locking.txt and the implementation of - * handle_edge_irq() in kernel/irq/chip.c. - * - When threaded IRQs are enabled, completion handlers are invoked in thread - * context instead of interrupt context. - */ -static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) +static void srpt_process_rcv_completion(struct ib_cq *cq, + struct srpt_rdma_ch *ch, + enum scst_exec_context context) { - struct srpt_rdma_ch *ch = ctx; struct srpt_device *sdev = ch->sport->sdev; struct ib_wc wc; struct srpt_ioctx *ioctx; - EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED); - - atomic_inc(&ch->processing_recv_compl); do { while (ib_poll_cq(ch->rcq, 1, &wc) > 0) { WARN_ON(wc.wr_id & SRPT_OP_TTI); @@ -1915,26 +1890,20 @@ static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) } else { ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_RECV]; - srpt_handle_new_iu(ch, ioctx); + srpt_handle_new_iu(ch, ioctx, context); } } } while (ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS) > 0); - atomic_dec(&ch->processing_recv_compl); } -/** - * srpt_send_completion() - IB send completion queue callback function. - */ -static void srpt_send_completion(struct ib_cq *cq, void *ctx) +static void srpt_process_send_completion(struct ib_cq *cq, + struct srpt_rdma_ch *ch, + enum scst_exec_context context) { - struct srpt_rdma_ch *ch = ctx; struct srpt_device *sdev = ch->sport->sdev; struct ib_wc wc; struct srpt_ioctx *ioctx; - enum scst_exec_context context = srpt_context; - - EXTRACHECKS_WARN_ON(thread == MODE_SINGLE_THREADED); do { while (ib_poll_cq(ch->scq, 1, &wc) > 0) { @@ -1967,197 +1936,74 @@ static void srpt_send_completion(struct ib_cq *cq, void *ctx) } /** - * srpt_schedule_thread() - Add 'ioctx' to the tail of the ioctx list. + * srpt_rcv_completion() - IB receive completion queue callback function. * - * @pre thread == MODE_SINGLE_THREADED + * Notes: + * - It is guaranteed that a completion handler will never be invoked + * concurrently on two different CPUs for the same completion queue. See also + * Documentation/infiniband/core_locking.txt and the implementation of + * handle_edge_irq() in kernel/irq/chip.c. + * - When threaded IRQs are enabled, completion handlers are invoked in thread + * context instead of interrupt context. */ -static inline void srpt_schedule_thread(struct srpt_ioctx *ioctx) -{ - unsigned long flags; -#ifdef CONFIG_SCST_EXTRACHECKS - struct srpt_ioctx *e; -#endif - - EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); - - spin_lock_irqsave(&srpt_thread.thread_lock, flags); -#ifdef CONFIG_SCST_EXTRACHECKS - list_for_each_entry(e, &srpt_thread.thread_ioctx_list, comp_list) - EXTRACHECKS_WARN_ON(e == ioctx); -#endif - list_add_tail(&ioctx->comp_list, &srpt_thread.thread_ioctx_list); - spin_unlock_irqrestore(&srpt_thread.thread_lock, flags); - wake_up(&ioctx_list_waitQ); -} - -/** - * srpt_rcv_completion_st() - Single-threaded mode receive completion handler. - */ -static void srpt_rcv_completion_st(struct ib_cq *cq, void *ctx) +static void srpt_rcv_completion(struct ib_cq *cq, void *ctx) { struct srpt_rdma_ch *ch = ctx; - struct srpt_device *sdev = ch->sport->sdev; - struct ib_wc wc; - struct srpt_ioctx *ioctx; - - EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); atomic_inc(&ch->processing_recv_compl); - ib_req_notify_cq(ch->rcq, IB_CQ_NEXT_COMP); - while (ib_poll_cq(ch->rcq, 1, &wc) > 0) { - WARN_ON(wc.wr_id & SRPT_OP_TTI); - - ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_FLAGS]; - ioctx->ch = ch; - ioctx->wr_id = wc.wr_id; - ioctx->status = wc.status; - ioctx->opcode = wc.opcode; - srpt_schedule_thread(ioctx); + switch (thread) { + case MODE_IB_COMPLETION_IN_THREAD: + wake_up_interruptible(&ch->wait_queue); + break; + case MODE_IB_COMPLETION_IN_SIRQ: + srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_THREAD); + break; + case MODE_ALL_IN_SIRQ: + srpt_process_rcv_completion(cq, ch, SCST_CONTEXT_TASKLET); + break; } atomic_dec(&ch->processing_recv_compl); } /** - * srpt_rcv_completion_st() - Single-threaded mode send completion handler. + * srpt_send_completion() - IB send completion queue callback function. */ -static void srpt_send_completion_st(struct ib_cq *cq, void *ctx) +static void srpt_send_completion(struct ib_cq *cq, void *ctx) { struct srpt_rdma_ch *ch = ctx; - struct srpt_device *sdev = ch->sport->sdev; - struct ib_wc wc; - struct srpt_ioctx *ioctx; - EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); - - ib_req_notify_cq(ch->scq, IB_CQ_NEXT_COMP); - while (ib_poll_cq(ch->scq, 1, &wc) > 0) { - WARN_ON(wc.wr_id & SRPT_OP_RECV); - - ioctx = sdev->ioctx_ring[wc.wr_id & ~SRPT_OP_FLAGS]; - ioctx->ch = ch; - ioctx->wr_id = wc.wr_id; - ioctx->status = wc.status; - ioctx->opcode = wc.opcode; - srpt_schedule_thread(ioctx); +#if 0 + switch (thread) { + case MODE_IB_COMPLETION_IN_THREAD: + case MODE_IB_COMPLETION_IN_SIRQ: + srpt_process_send_completion(cq, ch, SCST_CONTEXT_THREAD); + break; + case MODE_ALL_IN_SIRQ: + srpt_process_send_completion(cq, ch, SCST_CONTEXT_TASKLET); + break; } +#else + srpt_process_send_completion(cq, ch, SCST_CONTEXT_TASKLET); +#endif } -/** - * srpt_test_ioctx_list() - Returns false if the kernel thread should sleep. - * - * @pre thread == MODE_SINGLE_THREADED - * @pre the caller holds a lock on srpt_thread.thread_lock - */ -static inline int srpt_test_ioctx_list(void) +static int srpt_compl_thread(void *arg) { - EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); - - return !list_empty(&srpt_thread.thread_ioctx_list) - || unlikely(kthread_should_stop()); -} - -/** - * srpt_ioctx_thread() - ib_srpt kernel thread entry point. - * - * This kernel thread is only created when the module parameter 'thread' equals - * one. This thread processes the ioctx list srpt_thread.thread_ioctx_list. - * - * @pre thread == MODE_SINGLE_THREADED - */ -static int srpt_ioctx_thread(void *arg) -{ - EXTRACHECKS_WARN_ON(thread != MODE_SINGLE_THREADED); + struct srpt_rdma_ch* ch; /* Hibernation / freezing of the SRPT kernel thread is not supported. */ current->flags |= PF_NOFREEZE; - spin_lock_irq(&srpt_thread.thread_lock); + ch = arg; + BUG_ON(!ch); + PRINT_INFO("Kernel thread %s started", ch->thread->comm); while (!kthread_should_stop()) { - wait_queue_t wait; - init_waitqueue_entry(&wait, current); - - if (!srpt_test_ioctx_list()) { - add_wait_queue_exclusive(&ioctx_list_waitQ, &wait); - - for (;;) { - set_current_state(TASK_INTERRUPTIBLE); - if (srpt_test_ioctx_list()) - break; - spin_unlock_irq(&srpt_thread.thread_lock); - schedule(); - spin_lock_irq(&srpt_thread.thread_lock); - } - set_current_state(TASK_RUNNING); - remove_wait_queue(&ioctx_list_waitQ, &wait); - } - - while (!list_empty(&srpt_thread.thread_ioctx_list)) { - struct srpt_ioctx *ioctx; - struct srpt_rdma_ch *ch; - u64 wr_id; - enum ib_wc_status status; - enum ib_wc_opcode opcode; - - ioctx = list_first_entry(&srpt_thread.thread_ioctx_list, - typeof(*ioctx), comp_list); - BUG_ON(!ioctx); - list_del(&ioctx->comp_list); - spin_unlock_irq(&srpt_thread.thread_lock); - - ch = ioctx->ch; - wr_id = ioctx->wr_id; - status = ioctx->status; - opcode = ioctx->opcode; - BUG_ON(!ch); - - if (wr_id & SRPT_OP_RECV) { - if (unlikely(status)) { - PRINT_INFO("receiving wr_id %u failed" - " with status %d", - (unsigned)(wr_id & ~SRPT_OP_FLAGS), - status); - goto continue_processing; - } - } else { - if (unlikely(status)) { - PRINT_INFO("sending %s for wr_id" - " %u failed with status %d", - wr_id & SRPT_OP_TTI - ? "request" : "response", - (unsigned) - (wr_id & ~SRPT_OP_FLAGS), - status); - srpt_handle_send_err_comp(ch, - wr_id, srpt_context); - goto continue_processing; - } - } - - switch (opcode) { - case IB_WC_SEND: - if (wr_id & SRPT_OP_TTI) - srpt_put_tti_ioctx(ch); - else - srpt_handle_send_comp(ch, ioctx, - srpt_context); - break; - case IB_WC_RDMA_READ: - srpt_handle_rdma_comp(ch, ioctx, srpt_context); - break; - case IB_WC_RECV: - srpt_handle_new_iu(ch, ioctx); - break; - default: - PRINT_ERROR("received unrecognized WC opcode" - " %d", opcode); - break; - } -continue_processing: - spin_lock_irq(&srpt_thread.thread_lock); - } + wait_event_interruptible(ch->wait_queue, + (srpt_process_rcv_completion(ch->scq, ch, + SCST_CONTEXT_THREAD), + kthread_should_stop())); } - spin_unlock_irq(&srpt_thread.thread_lock); - + PRINT_INFO("Kernel thread %s stopped", ch->thread->comm); return 0; } @@ -2179,15 +2025,11 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch) #if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \ && ! defined(RHEL_RELEASE_CODE) - ch->rcq = ib_create_cq(sdev->device, - thread == MODE_SINGLE_THREADED - ? srpt_rcv_completion_st : srpt_rcv_completion, - NULL, ch, ch->rq_size); + ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch, + ch->rq_size); #else - ch->rcq = ib_create_cq(sdev->device, - thread == MODE_SINGLE_THREADED - ? srpt_rcv_completion_st : srpt_rcv_completion, - NULL, ch, ch->rq_size, 0); + ch->rcq = ib_create_cq(sdev->device, srpt_rcv_completion, NULL, ch, + ch->rq_size, 0); #endif if (IS_ERR(ch->rcq)) { ret = PTR_ERR(ch->rcq); @@ -2197,15 +2039,11 @@ static int srpt_create_ch_ib(struct srpt_rdma_ch *ch) } #if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 20) \ && ! defined(RHEL_RELEASE_CODE) - ch->scq = ib_create_cq(sdev->device, - thread == MODE_SINGLE_THREADED - ? srpt_send_completion_st : srpt_send_completion, - NULL, ch, srpt_sq_size); + ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch, + srpt_sq_size); #else - ch->scq = ib_create_cq(sdev->device, - thread == MODE_SINGLE_THREADED - ? srpt_send_completion_st : srpt_send_completion, - NULL, ch, srpt_sq_size, 0); + ch->scq = ib_create_cq(sdev->device, srpt_send_completion, NULL, ch, + srpt_sq_size, 0); #endif if (IS_ERR(ch->scq)) { ret = PTR_ERR(ch->scq); @@ -2281,9 +2119,6 @@ static void srpt_unregister_channel(struct srpt_rdma_ch *ch) atomic_set(&ch->state, RDMA_CHANNEL_DISCONNECTING); spin_unlock_irq(&sdev->spinlock); - if (thread == MODE_SINGLE_THREADED) - flush_scheduled_work(); - /* * At this point it is guaranteed that no new commands will be sent to * the SCST core for channel ch, which is a requirement for @@ -2395,6 +2230,11 @@ static void srpt_release_channel(struct scst_session *scst_sess) while (atomic_read(&ch->processing_recv_compl)) ; + if (ch->thread) { + kthread_stop(ch->thread); + ch->thread = NULL; + } + /* * At this point it is guaranteed that no new completions will be * received and also that no completion handler is still accessing ch. @@ -2669,6 +2509,22 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, be64_to_cpu(*(__be64 *)(ch->i_port_id + 8))); } + if (thread == MODE_IB_COMPLETION_IN_THREAD) { + init_waitqueue_head(&ch->wait_queue); + + TRACE_DBG("creating IB completion thread for session %s", + ch->sess_name); + + ch->thread = kthread_run(srpt_compl_thread, ch, + "ib_srpt_compl"); + if (IS_ERR(ch->thread)) { + PRINT_ERROR("failed to create kernel thread %ld", + PTR_ERR(ch->thread)); + ch->thread = NULL; + goto destroy_ib; + } + } + TRACE_DBG("registering session %s", ch->sess_name); BUG_ON(!sdev->scst_tgt); @@ -2677,7 +2533,7 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, if (!ch->scst_sess) { rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES); TRACE_DBG("%s", "Failed to create scst sess"); - goto destroy_ib; + goto destroy_thread; } TRACE_DBG("Establish connection sess=%p name=%s cm_id=%p", @@ -2727,6 +2583,10 @@ release_channel: scst_unregister_session(ch->scst_sess, 0, NULL); ch->scst_sess = NULL; +destroy_thread: + kthread_stop(ch->thread); + ch->thread = NULL; + destroy_ib: ib_destroy_qp(ch->qp); ib_destroy_cq(ch->scq); @@ -2789,7 +2649,7 @@ static void srpt_cm_rtu_recv(struct ib_cm_id *cm_id) list_for_each_entry_safe(ioctx, ioctx_tmp, &ch->cmd_wait_list, wait_list) { list_del(&ioctx->wait_list); - srpt_handle_new_iu(ch, ioctx); + srpt_handle_new_iu(ch, ioctx, SCST_CONTEXT_THREAD); } if (ret && srpt_test_and_set_channel_state(ch, RDMA_CHANNEL_LIVE, @@ -3424,7 +3284,7 @@ static int srpt_xmit_response(struct scst_cmd *scmnd) new_state = srpt_set_cmd_state(ioctx, SRPT_STATE_CMD_RSP_SENT); if (unlikely(scst_cmd_aborted(scmnd))) { - srpt_abort_scst_cmd(ioctx, srpt_context); + srpt_abort_scst_cmd(ioctx, SCST_CONTEXT_SAME); goto out; } @@ -4142,20 +4002,28 @@ static int __init srpt_init_module(void) } switch (thread) { - case MODE_SIRQ: - /* IRQ context */ - srpt_context = SCST_CONTEXT_TASKLET; + case MODE_ALL_IN_SIRQ: + /* + * Process both IB completions and SCST commands in SIRQ + * context. May lead to soft lockups and other scary behavior + * under sufficient load. + */ srpt_template.rdy_to_xfer_atomic = true; break; - case MODE_SINGLE_THREADED: - /* single kernel thread (created by ib_srpt) */ - srpt_context = SCST_CONTEXT_DIRECT; + case MODE_IB_COMPLETION_IN_THREAD: + /* + * Process IB completions in the kernel thread associated with + * the RDMA channel, and process SCST commands in the kernel + * threads created by the SCST core. + */ srpt_template.rdy_to_xfer_atomic = false; break; - case MODE_MULTITHREADED: + case MODE_IB_COMPLETION_IN_SIRQ: default: - /* multiple kernel threads (created by the SCST core) */ - srpt_context = SCST_CONTEXT_THREAD; + /* + * Process IB completions in SIRQ context and SCST commands in + * the kernel threads created by the SCST core. + */ srpt_template.rdy_to_xfer_atomic = false; break; } @@ -4181,23 +4049,8 @@ static int __init srpt_init_module(void) goto out_unregister_target; } - if (thread == 1) { - spin_lock_init(&srpt_thread.thread_lock); - INIT_LIST_HEAD(&srpt_thread.thread_ioctx_list); - srpt_thread.thread = kthread_run(srpt_ioctx_thread, - NULL, "srpt_thread"); - if (IS_ERR(srpt_thread.thread)) { - srpt_thread.thread = NULL; - thread = 0; - PRINT_ERROR("%s", "kernel thread creation failed"); - goto out_unregister_client; - } - } - return 0; -out_unregister_client: - ib_unregister_client(&srpt_client); out_unregister_target: #ifdef CONFIG_SCST_PROC /* diff --git a/srpt/src/ib_srpt.h b/srpt/src/ib_srpt.h index b8fd34cb0..bccd4eef4 100644 --- a/srpt/src/ib_srpt.h +++ b/srpt/src/ib_srpt.h @@ -192,7 +192,6 @@ enum srpt_command_state { * @buf: Pointer to the message transferred via this I/O context. * @dma: DMA address of buf. * @wait_list: Node for insertion in srpt_rdma_ch::cmd_wait_list. - * @comp_list: Node for insertion in the srpt_thread::thread_ioctx_list. * @state: I/O context state. See also enum srpt_command_state. */ struct srpt_ioctx { @@ -211,7 +210,6 @@ struct srpt_ioctx { u64 wr_id; enum ib_wc_status status; enum ib_wc_opcode opcode; - struct list_head comp_list; struct srpt_rdma_ch *ch; struct scst_cmd *scmnd; scst_data_direction dir; @@ -241,6 +239,9 @@ enum rdma_ch_state { /** * struct srpt_rdma_ch - RDMA channel. + * @thread: Kernel thread that processes the IB queues associated with + * the channel. + * @wait_queue: Allows the kernel thread to wait for more work. * @cm_id: IB CM ID associated with the channel. * @rq_size: IB receive queue size. * @processing_recv_compl: whether or not a receive completion is being @@ -275,6 +276,8 @@ enum rdma_ch_state { * @sess_name: SCST session name. */ struct srpt_rdma_ch { + struct task_struct *thread; + wait_queue_head_t wait_queue; struct ib_cm_id *cm_id; struct ib_qp *qp; int rq_size;