From 143e9c96844122cce3f4b5a2060e337fb6e939bc Mon Sep 17 00:00:00 2001 From: Bart Van Assche Date: Tue, 3 Nov 2020 04:56:01 +0000 Subject: [PATCH] ib_srpt: Use a workqueue for processing RDMA completions Use a workqueue for processing RDMA completions instead of creating one kernel thread per RDMA channel. This change improves performance if the number of RDMA channels is large by reducing the number of context switches between kernel threads while processing RDMA completions. An additional change is that srpt_set_ch_state() no longer wakes up the RDMA completion context but that srpt_cm_rtu_recv() wakes up the RDMA completion context instead. See also https://github.com/bvanassche/scst/issues/33 . git-svn-id: http://svn.code.sf.net/p/scst/svn/trunk@9178 d57e44dd-8a1f-0410-8b47-8ef2f437770f --- scst/include/backport.h | 18 +++++ srpt/session-management.txt | 28 ++++---- srpt/src/ib_srpt.c | 138 +++++++++++++++--------------------- srpt/src/ib_srpt.h | 4 +- 4 files changed, 91 insertions(+), 97 deletions(-) diff --git a/scst/include/backport.h b/scst/include/backport.h index b0fbdb55a..17b0e5ec5 100644 --- a/scst/include/backport.h +++ b/scst/include/backport.h @@ -1955,6 +1955,24 @@ static inline struct workqueue_struct *alloc_workqueue(const char *fmt, } #endif +/* + * See also commits 18aa9effad4a ("workqueue: implement WQ_NON_REENTRANT"; + * v2.6.36) and commits dbf2576e37da ("workqueue: make all workqueues + * non-reentrant"; v3.7). + */ +#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 36) || \ + LINUX_VERSION_CODE >= KERNEL_VERSION(3, 7, 0) +#define WQ_NON_REENTRANT 0 +#endif + +/* + * See also commit 226223ab3c41 ("workqueue: implement sysfs interface for + * workqueues"; v3.10). + */ +#if LINUX_VERSION_CODE < KERNEL_VERSION(3, 10, 0) +#define WQ_SYSFS 0 +#endif + /* */ /* commit ed082d36 */ diff --git a/srpt/session-management.txt b/srpt/session-management.txt index 53dae0f35..d5a4b88db 100644 --- a/srpt/session-management.txt +++ b/srpt/session-management.txt @@ -7,36 +7,32 @@ The following actions related to SRP sessions can all occur concurrently: * HCA driver invokes the queue pair (QP) completion handler srpt_completion(). * HCA driver invokes the QP async event handler srpt_qp_event(). * HCA transfers data between initiator and target via RDMA. -* srpt_compl_thread() polls the QP. +* srpt_do_compl_work() polls the QP. * SCST core invokes one of the callback functions defined in srpt_template. The actions that occur over the lifetime of a session are as follows: - A REQ message is received from the initiator. -- srpt_cm_req_recv() is invoked, allocates a queue pair and creates a - completion thread. +- srpt_cm_req_recv() is invoked, allocates an I/O context ring and a queue pair + and creates an SCST session. - If the connection request is not accepted, a REJ message is sent and - srpt_close_ch() is invoked. The srpt_close_ch() call causes the completion - thread to stop and the allocated resources to be freed asynchronously. + srpt_close_ch() is invoked. The srpt_close_ch() call causes the allocated + resources to be freed asynchronously. - If the connection request is accepted a REP message is sent to the initiator. - Once an RTU message is received from the initiator, srpt_cm_rtu_recv() is invoked. That function changes the queue pair state into RTS, the channel - state into CH_LIVE and wakes up the completion thread. + state into CH_LIVE and triggers processing of the wait list. - RDMA communication starts and continues until either a DREQ message is received or sent. A DREQ is sent either because a target port is disabled or - from inside the srpt_close_session() function. -- After a DREQ has been sent either a DREP will be received - (srpt_cm_drep_recv()) or the TimeWait state will be reached and will be left - (srpt_cm_timewait_exit()). -- srpt_cm_drep_recv() and srpt_cm_timewait_exit() invoke srpt_close_ch(). -- srpt_close_ch() changes the channel state into CH_DISCONNECTING, the + because the SCST core requested to close the session. +- After a DREQ has been sent either a DREP will be received or the TimeWait + state will be reached. Both trigger a srpt_close_ch() call. +- srpt_close_ch() changes the channel state into CH_DRAINING, the queue pair state into IB_QPS_ERR and queues a zero-length write. - Upon receipt of the zero-length write completion the channel state is changed into CH_DISCONNECTED. This is the last work completion so once the channel state has reached CH_DISCONNECTED it is guaranteed that the queue - pair completion handler won't be invoked again and also that the completion - handler won't call wake_up_process(ch->thread) anymore. -- That channel state change causes the polling loop in the completion thread - to stop and also triggers a call of scst_unregister_session(). + pair completion handler won't be invoked again. +- scst_unregister_session() is called. - Once scst_unregister_session() has finished srpt_unreg_sess() is invoked. - srpt_unreg_sess() destroys the CM ID and decrements the channel refcount. - Once the channel refcount drops to zero srpt_free_ch() is invoked which diff --git a/srpt/src/ib_srpt.c b/srpt/src/ib_srpt.c index 763c4804e..97b4a4fd5 100644 --- a/srpt/src/ib_srpt.c +++ b/srpt/src/ib_srpt.c @@ -179,12 +179,14 @@ static const enum scst_exec_context srpt_send_context = SCST_CONTEXT_DIRECT; static struct ib_client srpt_client; static struct scst_tgt_template srpt_template; +static struct workqueue_struct *srpt_wq; static struct net *srpt_net_ns; static struct rdma_cm_id *rdma_cm_id; static void srpt_unmap_sg_to_ib_sge(struct srpt_rdma_ch *ch, struct srpt_send_ioctx *ioctx); static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch); +static void srpt_unregister_ch(struct srpt_rdma_ch *ch); /* * The only allowed channel state changes are those that change the channel @@ -200,7 +202,6 @@ static bool srpt_set_ch_state(struct srpt_rdma_ch *ch, enum rdma_ch_state new) prev = ch->state; if (new > prev) { ch->state = new; - wake_up_process(ch->thread); changed = true; } spin_unlock_irqrestore(&ch->spinlock, flags); @@ -2099,9 +2100,10 @@ static void srpt_process_send_completion(struct ib_cq *cq, srpt_handle_rdma_comp(ch, ch->ioctx_ring[index], opcode, srpt_xmt_rsp_context); } else if (opcode == SRPT_RDMA_ZEROLENGTH_WRITE) { - WARN_ONCE(true, "%s-%d: QP not in error state\n", + WARN_ONCE(ch->state != CH_LIVE, + "%s-%d: QP not in 'live' state\n", ch->sess_name, ch->qp->qp_num); - WARN_ON_ONCE(!srpt_set_ch_state(ch, CH_DISCONNECTED)); + srpt_process_wait_list(ch); } else { WARN(true, "unexpected opcode %d\n", opcode); } @@ -2123,7 +2125,7 @@ static void srpt_process_send_completion(struct ib_cq *cq, srpt_handle_rdma_err_comp(ch, ch->ioctx_ring[index], opcode, srpt_xmt_rsp_context); } else if (opcode == SRPT_RDMA_ZEROLENGTH_WRITE) { - WARN_ON_ONCE(!srpt_set_ch_state(ch, CH_DISCONNECTED)); + srpt_unregister_ch(ch); } else if (opcode != SRPT_RDMA_MID) { WARN(true, "unexpected opcode %d\n", opcode); } @@ -2162,18 +2164,13 @@ static int srpt_poll(struct srpt_rdma_ch *ch, int budget) return processed; } -static int srpt_process_completion(struct srpt_rdma_ch *ch, int budget, - bool thread_context) +static int srpt_process_completion(struct srpt_rdma_ch *ch, int budget) { struct ib_cq *const cq = ch->cq; int processed = 0, n = budget; do { - if (thread_context) - set_current_state(TASK_RUNNING); processed += srpt_poll(ch, n); - if (thread_context) - set_current_state(TASK_INTERRUPTIBLE); n = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); } while (n > 0); @@ -2188,7 +2185,7 @@ static void srpt_completion(struct ib_cq *cq, void *ctx) { struct srpt_rdma_ch *ch = ctx; - wake_up_process(ch->thread); + queue_work_on(raw_smp_processor_id(), srpt_wq, &ch->compl); } static void srpt_free_ch(struct kref *kref) @@ -2200,12 +2197,17 @@ static void srpt_free_ch(struct kref *kref) kfree_rcu(ch, rcu); } +/* + * Called indirectly by scst_unregister_session() after the last command + * associated with a session has finished. + */ static void srpt_unreg_ch(struct srpt_rdma_ch *ch) { struct srpt_port *sport = ch->sport; struct srpt_device *sdev = sport->sdev; - kthread_stop(ch->thread); + WARN_ON_ONCE(ch->state != CH_DISCONNECTED); + flush_work(&ch->compl); srpt_free_ioctx_ring((struct srpt_ioctx **)ch->ioctx_recv_ring, sdev, ch->rq_size, @@ -2233,58 +2235,32 @@ static void srpt_unreg_ch(struct srpt_rdma_ch *ch) kref_put(&ch->kref, srpt_free_ch); } +/* + * Called by scst_unregister_session() after the last command associated with + * a session has finished. + */ static void srpt_unreg_sess(struct scst_session *sess) { srpt_unreg_ch(scst_sess_get_tgt_priv(sess)); } -static int srpt_compl_thread(void *arg) +static void srpt_unregister_ch(struct srpt_rdma_ch *ch) { - enum { poll_budget = 65536 }; - struct srpt_rdma_ch *ch; + WARN_ON_ONCE(!srpt_set_ch_state(ch, CH_DISCONNECTED)); + pr_debug("%s-%d: about to unregister this session\n", ch->sess_name, + ch->qp->qp_num); + scst_unregister_session(ch->sess, false, srpt_unreg_sess); +} + +static void srpt_do_compl_work(struct work_struct *work) +{ + struct srpt_rdma_ch *ch = container_of(work, typeof(*ch), compl); + enum { poll_budget = 256 }; int n; - /* Hibernation / freezing of the SRPT kernel thread is not supported. */ - current->flags |= PF_NOFREEZE; - - ch = arg; - BUG_ON(!ch); - - while (ch->state < CH_LIVE) { - n = srpt_process_completion(ch, poll_budget, true); - if (ch->state >= CH_LIVE) - break; - if (n >= poll_budget) - cond_resched(); - else - schedule(); - } - - srpt_process_wait_list(ch); - - while (ch->state < CH_DISCONNECTED) { - n = srpt_process_completion(ch, poll_budget, true); - if (ch->state >= CH_DISCONNECTED) - break; - if (n >= poll_budget) - cond_resched(); - else - schedule(); - } - set_current_state(TASK_RUNNING); - - pr_debug("%s-%d: about to unregister this session\n", - ch->sess_name, ch->qp->qp_num); - scst_unregister_session(ch->sess, false, srpt_unreg_sess); - - set_current_state(TASK_INTERRUPTIBLE); - while (!kthread_should_stop()) { - schedule(); - set_current_state(TASK_INTERRUPTIBLE); - } - __set_current_state(TASK_RUNNING); - - return 0; + n = srpt_process_completion(ch, poll_budget); + if (n >= poll_budget) + schedule_work(work); } /** @@ -2328,6 +2304,8 @@ retry: goto out; } + ib_req_notify_cq(ch->cq, IB_CQ_NEXT_COMP); + qp_init->qp_context = (void *)ch; qp_init->event_handler = (void(*)(struct ib_event *, void*))srpt_qp_event; @@ -2436,7 +2414,7 @@ static bool srpt_close_ch(struct srpt_rdma_ch *ch) if (ret < 0) { pr_err("%s-%d: queuing zero-length write failed: %d\n", ch->sess_name, ch->qp->qp_num, ret); - WARN_ON_ONCE(!srpt_set_ch_state(ch, CH_DISCONNECTED)); + srpt_unregister_ch(ch); } kref_put(&ch->kref, srpt_free_ch); @@ -2621,7 +2599,6 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev, struct ib_cm_rep_param ib_cm; } *rep_param = NULL; struct srpt_rdma_ch *ch = NULL; - struct task_struct *thread; u32 it_iu_len; int i, ret; @@ -2709,6 +2686,7 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev, ch->rq_size = min(MAX_SRPT_RQ_SIZE, scst_get_max_lun_commands(NULL, 0)); spin_lock_init(&ch->spinlock); ch->state = CH_CONNECTING; + INIT_WORK(&ch->compl, srpt_do_compl_work); INIT_LIST_HEAD(&ch->cmd_wait_list); ch->max_rsp_size = max_t(uint32_t, srp_max_rsp_size, MIN_MAX_RSP_SIZE); @@ -2793,16 +2771,6 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev, goto destroy_ib; } - thread = kthread_run(srpt_compl_thread, ch, "srpt_%s-%d", - dev_name(&ch->sport->sdev->device->dev), - ch->sport->port); - if (IS_ERR(thread)) { - rej->reason = cpu_to_be32(SRP_LOGIN_REJ_INSUFFICIENT_RESOURCES); - ret = PTR_ERR(thread); - pr_err("failed to create kernel thread: %d\n", ret); - goto release_channel; - } - mutex_lock(&sport->mutex); if ((req->req_flags & SRP_MTCH_ACTION) == SRP_MULTICHAN_SINGLE) { @@ -2820,7 +2788,6 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev, } list_add_tail_rcu(&ch->list, &nexus->ch_list); - ch->thread = thread; if (!sport->enabled) { rej->reason = cpu_to_be32( @@ -2906,9 +2873,6 @@ static int srpt_cm_req_recv(struct srpt_device *const sdev, goto out; -release_channel: - scst_unregister_session(ch->sess, true, NULL); - destroy_ib: srpt_destroy_ch_ib(ch); @@ -2955,11 +2919,11 @@ reject: ib_send_cm_rej(ib_cm_id, IB_CM_REJ_CONSUMER_DEFINED, NULL, 0, rej, sizeof(*rej)); - if (ch && ch->thread) { + if (ch && ch->qp) { srpt_close_ch(ch); /* - * Tell the caller not to free cm_id since - * srpt_compl_thread() will do that. + * Tell the caller not to free cm_id since srpt_do_compl_work() + * will do that. */ ret = 0; } @@ -3124,6 +3088,10 @@ static void srpt_cm_rtu_recv(struct srpt_rdma_ch *ch) if (!srpt_set_ch_state(ch, CH_LIVE)) pr_err("%s-%d: channel transition to LIVE state failed\n", ch->sess_name, ch->qp->qp_num); + + /* Trigger wait list processing. */ + ret = srpt_zerolength_write(ch); + WARN_ONCE(ret < 0, "%d\n", ret); } /** @@ -3136,7 +3104,7 @@ static void srpt_cm_rtu_recv(struct srpt_rdma_ch *ch) * Note: srpt_cm_handler() must only return a non-zero value when transferring * ownership of the cm_id to a channel by srpt_cm_req_recv() failed. Returning * a non-zero value in any other case will trigger a race with the - * ib_destroy_cm_id() call in srpt_compl_thread(). + * ib_destroy_cm_id() call triggered indirectly by srpt_do_compl_work(). */ static int srpt_cm_handler(struct ib_cm_id *cm_id, CM_HANDLER_EVENT_MODIFIER struct ib_cm_event *event) @@ -4699,10 +4667,17 @@ static int __init srpt_init_module(void) goto out; } + srpt_wq = alloc_workqueue("srpt", WQ_SYSFS | WQ_NON_REENTRANT, 0); + if (!srpt_wq) { + pr_err("Couldn't allocate the ib_srpt workqueue\n"); + ret = -ENOMEM; + goto out_unregister_target; + } + ret = ib_register_client(&srpt_client); if (ret) { pr_err("couldn't register IB client\n"); - goto out_unregister_target; + goto destroy_wq; } srpt_net_ns = kobj_ns_grab_current(KOBJ_NS_TYPE_NET); @@ -4757,6 +4732,9 @@ drop_ns: srpt_net_ns = NULL; ib_unregister_client(&srpt_client); +destroy_wq: + destroy_workqueue(srpt_wq); + out_unregister_target: scst_unregister_target_template(&srpt_template); @@ -4766,8 +4744,6 @@ out: static void __exit srpt_cleanup_module(void) { - rcu_barrier(); - if (rdma_cm_id) rdma_destroy_id(rdma_cm_id); @@ -4776,6 +4752,10 @@ static void __exit srpt_cleanup_module(void) ib_unregister_client(&srpt_client); + destroy_workqueue(srpt_wq); + + rcu_barrier(); + scst_unregister_target_template(&srpt_template); } diff --git a/srpt/src/ib_srpt.h b/srpt/src/ib_srpt.h index 30b07adfb..a6a1849e5 100644 --- a/srpt/src/ib_srpt.h +++ b/srpt/src/ib_srpt.h @@ -373,7 +373,7 @@ enum rdma_ch_state { /** * struct srpt_rdma_ch - RDMA channel - * @thread: Kernel thread that processes the IB queues associated with + * @compl: Work structure used for scheduling completion work. * the channel. * @nexus: I_T nexus this channel is associated with. * @qp: IB queue pair used for communicating over this channel. @@ -411,7 +411,7 @@ enum rdma_ch_state { * @sess_name: Session name. */ struct srpt_rdma_ch { - struct task_struct *thread; + struct work_struct compl; struct srpt_nexus *nexus; struct ib_qp *qp; union {