diff --git a/kmod/src/net.c b/kmod/src/net.c index 8636452e..51e4c468 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "format.h" #include "counters.h" @@ -134,6 +135,7 @@ struct message_send { struct message_recv { struct scoutfs_tseq_entry tseq_entry; struct work_struct proc_work; + struct list_head ordered_head; struct scoutfs_net_connection *conn; struct scoutfs_net_header nh; }; @@ -498,6 +500,51 @@ static void scoutfs_net_proc_worker(struct work_struct *work) trace_scoutfs_net_proc_work_exit(sb, 0, ret); } +static void scoutfs_net_ordered_proc_worker(struct work_struct *work) +{ + struct scoutfs_work_list *wlist = container_of(work, struct scoutfs_work_list, work); + struct message_recv *mrecv; + struct message_recv *mrecv__; + LIST_HEAD(list); + + spin_lock(&wlist->lock); + list_splice_init(&wlist->list, &list); + spin_unlock(&wlist->lock); + + list_for_each_entry_safe(mrecv, mrecv__, &list, ordered_head) { + list_del_init(&mrecv->ordered_head); + scoutfs_net_proc_worker(&mrecv->proc_work); + } +} + +/* + * Some messages require in-order processing. But the scope of the + * ordering isn't global. In the case of lock messages, it's per lock. + * So for these messages we hash them to a number of ordered workers who + * walk a list and call the usual work function in order. This replaced + * first the proc work detecting OOO and re-ordering, and then only + * calling proc from the one recv work context. + */ +static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct message_recv *mrecv) +{ + struct scoutfs_work_list *wlist; + struct scoutfs_net_lock *nl; + u32 h; + + if (WARN_ON_ONCE(mrecv->nh.cmd != SCOUTFS_NET_CMD_LOCK || + le16_to_cpu(mrecv->nh.data_len) != sizeof(struct scoutfs_net_lock))) + return scoutfs_net_proc_worker(&mrecv->proc_work); + + nl = (void *)mrecv->nh.data; + h = jhash(&nl->key, sizeof(struct scoutfs_key), 0x6fdd3cd5); + wlist = &conn->ordered_proc_wlists[h % conn->ordered_proc_nr]; + + spin_lock(&wlist->lock); + list_add_tail(&mrecv->ordered_head, &wlist->list); + spin_unlock(&wlist->lock); + queue_work(conn->workq, &wlist->work); +} + /* * Free live responses up to and including the seq by marking them dead * and moving them to the send queue to be freed. @@ -651,6 +698,7 @@ static void scoutfs_net_recv_worker(struct work_struct *work) mrecv->conn = conn; INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); + INIT_LIST_HEAD(&mrecv->ordered_head); mrecv->nh = nh; /* receive the data payload */ @@ -681,15 +729,17 @@ static void scoutfs_net_recv_worker(struct work_struct *work) scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); /* - * Initial received greetings are processed - * synchronously before any other incoming messages. + * Initial received greetings are processed inline + * before any other incoming messages. * - * Incoming requests or responses to the lock client are - * called synchronously to avoid reordering. + * Incoming requests or responses to the lock client + * can't handle re-ordering, so they're queued to + * ordered receive processing work. */ - if (nh.cmd == SCOUTFS_NET_CMD_GREETING || - (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)) + if (nh.cmd == SCOUTFS_NET_CMD_GREETING) scoutfs_net_proc_worker(&mrecv->proc_work); + else if (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn) + queue_ordered_proc(conn, mrecv); else queue_work(conn->workq, &mrecv->proc_work); } @@ -862,6 +912,7 @@ static void scoutfs_net_destroy_worker(struct work_struct *work) destroy_workqueue(conn->workq); scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry); kfree(conn->info); + kfree(conn->ordered_proc_wlists); trace_scoutfs_conn_destroy_free(conn); kfree(conn); @@ -1330,25 +1381,30 @@ scoutfs_net_alloc_conn(struct super_block *sb, { struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct scoutfs_net_connection *conn; + unsigned int nr; + unsigned int i; + + nr = min_t(unsigned int, num_possible_cpus(), + PAGE_SIZE / sizeof(struct scoutfs_work_list)); conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS); - if (!conn) - return NULL; - - if (info_size) { - conn->info = kzalloc(info_size, GFP_NOFS); - if (!conn->info) { - kfree(conn); - return NULL; - } + if (conn) { + if (info_size) + conn->info = kzalloc(info_size, GFP_NOFS); + conn->ordered_proc_wlists = kmalloc_array(nr, sizeof(struct scoutfs_work_list), + GFP_NOFS); + conn->workq = alloc_workqueue("scoutfs_net_%s", + WQ_UNBOUND | WQ_NON_REENTRANT, 0, + name_suffix); } - - conn->workq = alloc_workqueue("scoutfs_net_%s", - WQ_UNBOUND | WQ_NON_REENTRANT, 0, - name_suffix); - if (!conn->workq) { - kfree(conn->info); - kfree(conn); + if (!conn || (info_size && !conn->info) || !conn->workq || !conn->ordered_proc_wlists) { + if (conn) { + kfree(conn->info); + kfree(conn->ordered_proc_wlists); + if (conn->workq) + destroy_workqueue(conn->workq); + kfree(conn); + } return NULL; } @@ -1378,6 +1434,13 @@ scoutfs_net_alloc_conn(struct super_block *sb, INIT_DELAYED_WORK(&conn->reconn_free_dwork, scoutfs_net_reconn_free_worker); + conn->ordered_proc_nr = nr; + for (i = 0; i < nr; i++) { + INIT_WORK(&conn->ordered_proc_wlists[i].work, scoutfs_net_ordered_proc_worker); + spin_lock_init(&conn->ordered_proc_wlists[i].lock); + INIT_LIST_HEAD(&conn->ordered_proc_wlists[i].list); + } + scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry); trace_scoutfs_conn_alloc(conn); diff --git a/kmod/src/net.h b/kmod/src/net.h index fe2c29e4..096f12d8 100644 --- a/kmod/src/net.h +++ b/kmod/src/net.h @@ -1,10 +1,18 @@ #ifndef _SCOUTFS_NET_H_ #define _SCOUTFS_NET_H_ +#include +#include #include #include "endian_swap.h" #include "tseq.h" +struct scoutfs_work_list { + struct work_struct work; + spinlock_t lock; + struct list_head list; +}; + struct scoutfs_net_connection; /* These are called in their own blocking context */ @@ -61,6 +69,8 @@ struct scoutfs_net_connection { struct list_head resend_queue; atomic64_t recv_seq; + unsigned int ordered_proc_nr; + struct scoutfs_work_list *ordered_proc_wlists; struct workqueue_struct *workq; struct work_struct listen_work;