From c313b71b2e9555d3e5e890958ecbe4ed94ef2ccd Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Fri, 26 Sep 2025 09:52:31 -0700 Subject: [PATCH] Process client lock messages in ordered work The lock client has a requirement that it can't handle some messages being processed out of order. Previously it had detected message ordering itself, but had missed some cases. Recieve processing was then changed to always call lock message processing from the recv work to globally order all lock messages. This inline processing was contributing to excessive latencies in making our way through the incoming receive queue, delaying work that would otherwise be parallel once we got it off the recv queue. This was seen in practice as a giant flood of lock shrink messages arrived at the client. It processed each in turn, starving a statfs response long enough to trigger the hung task warning. This fix does two things. First, it moves ordered recv processing out of the recv work. It lets the recv work drain the socket quickly and turn it into a list that the ordered work is consuming. Other messages will have a chance to be received and queued to their processing work without having to wait for the ordered work to be processed. Secondly, it adds parallelism to the ordered processing. The incoming lock messages don't need global ordering, they need ordering within each lock. We add an arbitrary but reasonable number of ordered workers and hash lock messages to each worker based on the lock's key. Signed-off-by: Zach Brown --- kmod/src/net.c | 107 +++++++++++++++++++++++++++++++++++++++---------- kmod/src/net.h | 10 +++++ 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/kmod/src/net.c b/kmod/src/net.c index 8636452e..51e4c468 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "format.h" #include "counters.h" @@ -134,6 +135,7 @@ struct message_send { struct message_recv { struct scoutfs_tseq_entry tseq_entry; struct work_struct proc_work; + struct list_head ordered_head; struct scoutfs_net_connection *conn; struct scoutfs_net_header nh; }; @@ -498,6 +500,51 @@ static void scoutfs_net_proc_worker(struct work_struct *work) trace_scoutfs_net_proc_work_exit(sb, 0, ret); } +static void scoutfs_net_ordered_proc_worker(struct work_struct *work) +{ + struct scoutfs_work_list *wlist = container_of(work, struct scoutfs_work_list, work); + struct message_recv *mrecv; + struct message_recv *mrecv__; + LIST_HEAD(list); + + spin_lock(&wlist->lock); + list_splice_init(&wlist->list, &list); + spin_unlock(&wlist->lock); + + list_for_each_entry_safe(mrecv, mrecv__, &list, ordered_head) { + list_del_init(&mrecv->ordered_head); + scoutfs_net_proc_worker(&mrecv->proc_work); + } +} + +/* + * Some messages require in-order processing. But the scope of the + * ordering isn't global. In the case of lock messages, it's per lock. + * So for these messages we hash them to a number of ordered workers who + * walk a list and call the usual work function in order. This replaced + * first the proc work detecting OOO and re-ordering, and then only + * calling proc from the one recv work context. + */ +static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct message_recv *mrecv) +{ + struct scoutfs_work_list *wlist; + struct scoutfs_net_lock *nl; + u32 h; + + if (WARN_ON_ONCE(mrecv->nh.cmd != SCOUTFS_NET_CMD_LOCK || + le16_to_cpu(mrecv->nh.data_len) != sizeof(struct scoutfs_net_lock))) + return scoutfs_net_proc_worker(&mrecv->proc_work); + + nl = (void *)mrecv->nh.data; + h = jhash(&nl->key, sizeof(struct scoutfs_key), 0x6fdd3cd5); + wlist = &conn->ordered_proc_wlists[h % conn->ordered_proc_nr]; + + spin_lock(&wlist->lock); + list_add_tail(&mrecv->ordered_head, &wlist->list); + spin_unlock(&wlist->lock); + queue_work(conn->workq, &wlist->work); +} + /* * Free live responses up to and including the seq by marking them dead * and moving them to the send queue to be freed. @@ -651,6 +698,7 @@ static void scoutfs_net_recv_worker(struct work_struct *work) mrecv->conn = conn; INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); + INIT_LIST_HEAD(&mrecv->ordered_head); mrecv->nh = nh; /* receive the data payload */ @@ -681,15 +729,17 @@ static void scoutfs_net_recv_worker(struct work_struct *work) scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); /* - * Initial received greetings are processed - * synchronously before any other incoming messages. + * Initial received greetings are processed inline + * before any other incoming messages. * - * Incoming requests or responses to the lock client are - * called synchronously to avoid reordering. + * Incoming requests or responses to the lock client + * can't handle re-ordering, so they're queued to + * ordered receive processing work. */ - if (nh.cmd == SCOUTFS_NET_CMD_GREETING || - (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)) + if (nh.cmd == SCOUTFS_NET_CMD_GREETING) scoutfs_net_proc_worker(&mrecv->proc_work); + else if (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn) + queue_ordered_proc(conn, mrecv); else queue_work(conn->workq, &mrecv->proc_work); } @@ -862,6 +912,7 @@ static void scoutfs_net_destroy_worker(struct work_struct *work) destroy_workqueue(conn->workq); scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry); kfree(conn->info); + kfree(conn->ordered_proc_wlists); trace_scoutfs_conn_destroy_free(conn); kfree(conn); @@ -1330,25 +1381,30 @@ scoutfs_net_alloc_conn(struct super_block *sb, { struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct scoutfs_net_connection *conn; + unsigned int nr; + unsigned int i; + + nr = min_t(unsigned int, num_possible_cpus(), + PAGE_SIZE / sizeof(struct scoutfs_work_list)); conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS); - if (!conn) - return NULL; - - if (info_size) { - conn->info = kzalloc(info_size, GFP_NOFS); - if (!conn->info) { - kfree(conn); - return NULL; - } + if (conn) { + if (info_size) + conn->info = kzalloc(info_size, GFP_NOFS); + conn->ordered_proc_wlists = kmalloc_array(nr, sizeof(struct scoutfs_work_list), + GFP_NOFS); + conn->workq = alloc_workqueue("scoutfs_net_%s", + WQ_UNBOUND | WQ_NON_REENTRANT, 0, + name_suffix); } - - conn->workq = alloc_workqueue("scoutfs_net_%s", - WQ_UNBOUND | WQ_NON_REENTRANT, 0, - name_suffix); - if (!conn->workq) { - kfree(conn->info); - kfree(conn); + if (!conn || (info_size && !conn->info) || !conn->workq || !conn->ordered_proc_wlists) { + if (conn) { + kfree(conn->info); + kfree(conn->ordered_proc_wlists); + if (conn->workq) + destroy_workqueue(conn->workq); + kfree(conn); + } return NULL; } @@ -1378,6 +1434,13 @@ scoutfs_net_alloc_conn(struct super_block *sb, INIT_DELAYED_WORK(&conn->reconn_free_dwork, scoutfs_net_reconn_free_worker); + conn->ordered_proc_nr = nr; + for (i = 0; i < nr; i++) { + INIT_WORK(&conn->ordered_proc_wlists[i].work, scoutfs_net_ordered_proc_worker); + spin_lock_init(&conn->ordered_proc_wlists[i].lock); + INIT_LIST_HEAD(&conn->ordered_proc_wlists[i].list); + } + scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry); trace_scoutfs_conn_alloc(conn); diff --git a/kmod/src/net.h b/kmod/src/net.h index fe2c29e4..096f12d8 100644 --- a/kmod/src/net.h +++ b/kmod/src/net.h @@ -1,10 +1,18 @@ #ifndef _SCOUTFS_NET_H_ #define _SCOUTFS_NET_H_ +#include +#include #include #include "endian_swap.h" #include "tseq.h" +struct scoutfs_work_list { + struct work_struct work; + spinlock_t lock; + struct list_head list; +}; + struct scoutfs_net_connection; /* These are called in their own blocking context */ @@ -61,6 +69,8 @@ struct scoutfs_net_connection { struct list_head resend_queue; atomic64_t recv_seq; + unsigned int ordered_proc_nr; + struct scoutfs_work_list *ordered_proc_wlists; struct workqueue_struct *workq; struct work_struct listen_work;