diff --git a/kmod/src/Makefile.kernelcompat b/kmod/src/Makefile.kernelcompat index d21b51f5..18818269 100644 --- a/kmod/src/Makefile.kernelcompat +++ b/kmod/src/Makefile.kernelcompat @@ -158,15 +158,6 @@ ifneq (,$(shell grep 'sock_create_kern.*struct net' include/linux/net.h)) ccflags-y += -DKC_SOCK_CREATE_KERN_NET=1 endif -# -# v3.18-rc6-1619-gc0371da6047a -# -# iov_iter is now part of struct msghdr -# -ifneq (,$(shell grep 'struct iov_iter.*msg_iter' include/linux/socket.h)) -ccflags-y += -DKC_MSGHDR_STRUCT_IOV_ITER=1 -endif - # # v4.17-rc6-7-g95582b008388 # diff --git a/kmod/src/net.c b/kmod/src/net.c index 8636452e..380ee637 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "format.h" #include "counters.h" @@ -134,6 +135,7 @@ struct message_send { struct message_recv { struct scoutfs_tseq_entry tseq_entry; struct work_struct proc_work; + struct list_head ordered_head; struct scoutfs_net_connection *conn; struct scoutfs_net_header nh; }; @@ -498,6 +500,51 @@ static void scoutfs_net_proc_worker(struct work_struct *work) trace_scoutfs_net_proc_work_exit(sb, 0, ret); } +static void scoutfs_net_ordered_proc_worker(struct work_struct *work) +{ + struct scoutfs_work_list *wlist = container_of(work, struct scoutfs_work_list, work); + struct message_recv *mrecv; + struct message_recv *mrecv__; + LIST_HEAD(list); + + spin_lock(&wlist->lock); + list_splice_init(&wlist->list, &list); + spin_unlock(&wlist->lock); + + list_for_each_entry_safe(mrecv, mrecv__, &list, ordered_head) { + list_del_init(&mrecv->ordered_head); + scoutfs_net_proc_worker(&mrecv->proc_work); + } +} + +/* + * Some messages require in-order processing. But the scope of the + * ordering isn't global. In the case of lock messages, it's per lock. + * So for these messages we hash them to a number of ordered workers who + * walk a list and call the usual work function in order. This replaced + * first the proc work detecting OOO and re-ordering, and then only + * calling proc from the one recv work context. + */ +static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct message_recv *mrecv) +{ + struct scoutfs_work_list *wlist; + struct scoutfs_net_lock *nl; + u32 h; + + if (WARN_ON_ONCE(mrecv->nh.cmd != SCOUTFS_NET_CMD_LOCK || + le16_to_cpu(mrecv->nh.data_len) != sizeof(struct scoutfs_net_lock))) + return scoutfs_net_proc_worker(&mrecv->proc_work); + + nl = (void *)mrecv->nh.data; + h = jhash(&nl->key, sizeof(struct scoutfs_key), 0x6fdd3cd5); + wlist = &conn->ordered_proc_wlists[h % conn->ordered_proc_nr]; + + spin_lock(&wlist->lock); + list_add_tail(&mrecv->ordered_head, &wlist->list); + spin_unlock(&wlist->lock); + queue_work(conn->workq, &wlist->work); +} + /* * Free live responses up to and including the seq by marking them dead * and moving them to the send queue to be freed. @@ -541,33 +588,17 @@ static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq) queue_work(conn->workq, &conn->send_work); } -static int recvmsg_full(struct socket *sock, void *buf, unsigned len) +static int k_recvmsg(struct socket *sock, void *buf, unsigned len) { - struct msghdr msg; - struct kvec kv; - int ret; + struct kvec kv = { + .iov_base = buf, + .iov_len = len, + }; + struct msghdr msg = { + .msg_flags = MSG_NOSIGNAL, + }; - while (len) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_NOSIGNAL; - kv.iov_base = buf; - kv.iov_len = len; - -#ifndef KC_MSGHDR_STRUCT_IOV_ITER - msg.msg_iov = (struct iovec *)&kv; - msg.msg_iovlen = 1; -#else - iov_iter_init(&msg.msg_iter, READ, (struct iovec *)&kv, len, 1); -#endif - ret = kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags); - if (ret <= 0) - return -ECONNABORTED; - - len -= ret; - buf += ret; - } - - return 0; + return kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags); } static bool invalid_message(struct scoutfs_net_connection *conn, @@ -604,6 +635,72 @@ static bool invalid_message(struct scoutfs_net_connection *conn, return false; } +static int recv_one_message(struct super_block *sb, struct net_info *ninf, + struct scoutfs_net_connection *conn, struct scoutfs_net_header *nh, + unsigned int data_len) +{ + struct message_recv *mrecv; + int ret; + + scoutfs_inc_counter(sb, net_recv_messages); + scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len)); + trace_scoutfs_net_recv_message(sb, &conn->sockname, &conn->peername, nh); + + /* caller's invalid message checked data len */ + mrecv = kmalloc(offsetof(struct message_recv, nh.data[data_len]), GFP_NOFS); + if (!mrecv) { + ret = -ENOMEM; + goto out; + } + + mrecv->conn = conn; + INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); + INIT_LIST_HEAD(&mrecv->ordered_head); + mrecv->nh = *nh; + if (data_len) + memcpy(mrecv->nh.data, (nh + 1), data_len); + + if (nh->cmd == SCOUTFS_NET_CMD_GREETING) { + /* greetings are out of band, no seq mechanics */ + set_conn_fl(conn, saw_greeting); + + } else if (le64_to_cpu(nh->seq) <= + atomic64_read(&conn->recv_seq)) { + /* drop any resent duplicated messages */ + scoutfs_inc_counter(sb, net_recv_dropped_duplicate); + kfree(mrecv); + ret = 0; + goto out; + + } else { + /* record that we've received sender's seq */ + atomic64_set(&conn->recv_seq, le64_to_cpu(nh->seq)); + /* and free our responses that sender has received */ + free_acked_responses(conn, le64_to_cpu(nh->recv_seq)); + } + + scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); + + /* + * Initial received greetings are processed inline + * before any other incoming messages. + * + * Incoming requests or responses to the lock client + * can't handle re-ordering, so they're queued to + * ordered receive processing work. + */ + if (nh->cmd == SCOUTFS_NET_CMD_GREETING) + scoutfs_net_proc_worker(&mrecv->proc_work); + else if (nh->cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn) + queue_ordered_proc(conn, mrecv); + else + queue_work(conn->workq, &mrecv->proc_work); + ret = 0; + +out: + return ret; +} + /* * Always block receiving from the socket. Errors trigger shutting down * the connection. @@ -614,86 +711,72 @@ static void scoutfs_net_recv_worker(struct work_struct *work) struct super_block *sb = conn->sb; struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct socket *sock = conn->sock; - struct scoutfs_net_header nh; - struct message_recv *mrecv; + struct scoutfs_net_header *nh; + struct page *page = NULL; unsigned int data_len; + int hdr_off; + int rx_off; + int size; int ret; trace_scoutfs_net_recv_work_enter(sb, 0, 0); + page = alloc_page(GFP_NOFS); + if (!page) { + ret = -ENOMEM; + goto out; + } + + hdr_off = 0; + rx_off = 0; + for (;;) { /* receive the header */ - ret = recvmsg_full(sock, &nh, sizeof(nh)); - if (ret) - break; - - /* receiving an invalid message breaks the connection */ - if (invalid_message(conn, &nh)) { - scoutfs_inc_counter(sb, net_recv_invalid_message); - ret = -EBADMSG; - break; + ret = k_recvmsg(sock, page_address(page) + rx_off, PAGE_SIZE - rx_off); + if (ret <= 0) { + ret = -ECONNABORTED; + goto out; } - data_len = le16_to_cpu(nh.data_len); + rx_off += ret; - scoutfs_inc_counter(sb, net_recv_messages); - scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len)); - trace_scoutfs_net_recv_message(sb, &conn->sockname, - &conn->peername, &nh); + for (;;) { + size = rx_off - hdr_off; + if (size < sizeof(struct scoutfs_net_header)) + break; - /* invalid message checked data len */ - mrecv = kmalloc(offsetof(struct message_recv, - nh.data[data_len]), GFP_NOFS); - if (!mrecv) { - ret = -ENOMEM; - break; + nh = page_address(page) + hdr_off; + + /* receiving an invalid message breaks the connection */ + if (invalid_message(conn, nh)) { + scoutfs_inc_counter(sb, net_recv_invalid_message); + ret = -EBADMSG; + break; + } + + data_len = le16_to_cpu(nh->data_len); + if (sizeof(struct scoutfs_net_header) + data_len > size) + break; + + ret = recv_one_message(sb, ninf, conn, nh, data_len); + if (ret < 0) + goto out; + + hdr_off += sizeof(struct scoutfs_net_header) + data_len; } - mrecv->conn = conn; - INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); - mrecv->nh = nh; - - /* receive the data payload */ - ret = recvmsg_full(sock, mrecv->nh.data, data_len); - if (ret) { - kfree(mrecv); - break; + if ((PAGE_SIZE - rx_off) < + (sizeof(struct scoutfs_net_header) + SCOUTFS_NET_MAX_DATA_LEN)) { + if (size) + memmove(page_address(page), page_address(page) + hdr_off, size); + hdr_off = 0; + rx_off = size; } - - if (nh.cmd == SCOUTFS_NET_CMD_GREETING) { - /* greetings are out of band, no seq mechanics */ - set_conn_fl(conn, saw_greeting); - - } else if (le64_to_cpu(nh.seq) <= - atomic64_read(&conn->recv_seq)) { - /* drop any resent duplicated messages */ - scoutfs_inc_counter(sb, net_recv_dropped_duplicate); - kfree(mrecv); - continue; - - } else { - /* record that we've received sender's seq */ - atomic64_set(&conn->recv_seq, le64_to_cpu(nh.seq)); - /* and free our responses that sender has received */ - free_acked_responses(conn, le64_to_cpu(nh.recv_seq)); - } - - scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); - - /* - * Initial received greetings are processed - * synchronously before any other incoming messages. - * - * Incoming requests or responses to the lock client are - * called synchronously to avoid reordering. - */ - if (nh.cmd == SCOUTFS_NET_CMD_GREETING || - (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)) - scoutfs_net_proc_worker(&mrecv->proc_work); - else - queue_work(conn->workq, &mrecv->proc_work); } +out: + __free_page(page); + if (ret) scoutfs_inc_counter(sb, net_recv_error); @@ -703,33 +786,41 @@ static void scoutfs_net_recv_worker(struct work_struct *work) trace_scoutfs_net_recv_work_exit(sb, 0, ret); } -static int sendmsg_full(struct socket *sock, void *buf, unsigned len) +/* + * This consumes the kvec. + */ +static int k_sendmsg_full(struct socket *sock, struct kvec *kv, unsigned long nr_segs, size_t count) { - struct msghdr msg; - struct kvec kv; - int ret; + int ret = 0; - while (len) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_NOSIGNAL; - kv.iov_base = buf; - kv.iov_len = len; + while (count > 0) { + struct msghdr msg = { + .msg_flags = MSG_NOSIGNAL, + }; -#ifndef KC_MSGHDR_STRUCT_IOV_ITER - msg.msg_iov = (struct iovec *)&kv; - msg.msg_iovlen = 1; -#else - iov_iter_init(&msg.msg_iter, WRITE, (struct iovec *)&kv, len, 1); -#endif - ret = kernel_sendmsg(sock, &msg, &kv, 1, len); - if (ret <= 0) - return -ECONNABORTED; + ret = kernel_sendmsg(sock, &msg, kv, nr_segs, count); + if (ret <= 0) { + ret = -ECONNABORTED; + break; + } - len -= ret; - buf += ret; + count -= ret; + if (count) { + while (nr_segs > 0 && ret >= kv->iov_len) { + ret -= kv->iov_len; + kv++; + nr_segs--; + } + if (nr_segs > 0 && ret > 0) { + kv->iov_base += ret; + kv->iov_len -= ret; + } + BUG_ON(nr_segs == 0); + } + ret = 0; } - - return 0; + + return ret; } static void free_msend(struct net_info *ninf, struct message_send *msend) @@ -760,54 +851,73 @@ static void scoutfs_net_send_worker(struct work_struct *work) struct super_block *sb = conn->sb; struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct message_send *msend; - int ret = 0; + struct message_send *_msend_; + struct kvec kv[16]; + unsigned long nr_segs; + size_t count; int len; + int ret; trace_scoutfs_net_send_work_enter(sb, 0, 0); - spin_lock(&conn->lock); - - while ((msend = list_first_entry_or_null(&conn->send_queue, - struct message_send, head))) { - - if (msend->dead) { - free_msend(ninf, msend); - continue; - } - - if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) && - nh_is_response(&msend->nh)) { - set_conn_fl(conn, saw_farewell); - } - - msend->nh.recv_seq = - cpu_to_le64(atomic64_read(&conn->recv_seq)); - - spin_unlock(&conn->lock); - - len = nh_bytes(le16_to_cpu(msend->nh.data_len)); - - scoutfs_inc_counter(sb, net_send_messages); - scoutfs_add_counter(sb, net_send_bytes, len); - trace_scoutfs_net_send_message(sb, &conn->sockname, - &conn->peername, &msend->nh); - - ret = sendmsg_full(conn->sock, &msend->nh, len); + for (;;) { + nr_segs = 0; + count = 0; spin_lock(&conn->lock); + list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) { + if (msend->dead) { + free_msend(ninf, msend); + continue; + } - msend->nh.recv_seq = 0; + len = nh_bytes(le16_to_cpu(msend->nh.data_len)); - if (ret) - break; + if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) && + nh_is_response(&msend->nh)) { + set_conn_fl(conn, saw_farewell); + } - /* resend if it wasn't freed while we sent */ - if (!msend->dead) - list_move_tail(&msend->head, &conn->resend_queue); + msend->nh.recv_seq = cpu_to_le64(atomic64_read(&conn->recv_seq)); + + scoutfs_inc_counter(sb, net_send_messages); + scoutfs_add_counter(sb, net_send_bytes, len); + trace_scoutfs_net_send_message(sb, &conn->sockname, + &conn->peername, &msend->nh); + + count += len; + kv[nr_segs].iov_base = &msend->nh; + kv[nr_segs].iov_len = len; + if (++nr_segs == ARRAY_SIZE(kv)) + break; + + } + spin_unlock(&conn->lock); + + if (nr_segs == 0) { + ret = 0; + goto out; + } + + ret = k_sendmsg_full(conn->sock, kv, nr_segs, count); + if (ret < 0) + goto out; + + spin_lock(&conn->lock); + list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) { + msend->nh.recv_seq = 0; + + /* resend if it wasn't freed while we sent */ + if (!msend->dead) + list_move_tail(&msend->head, &conn->resend_queue); + + if (--nr_segs == 0) + break; + } + spin_unlock(&conn->lock); } - spin_unlock(&conn->lock); - +out: if (ret) { scoutfs_inc_counter(sb, net_send_error); shutdown_conn(conn); @@ -862,6 +972,7 @@ static void scoutfs_net_destroy_worker(struct work_struct *work) destroy_workqueue(conn->workq); scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry); kfree(conn->info); + kfree(conn->ordered_proc_wlists); trace_scoutfs_conn_destroy_free(conn); kfree(conn); @@ -1330,25 +1441,30 @@ scoutfs_net_alloc_conn(struct super_block *sb, { struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct scoutfs_net_connection *conn; + unsigned int nr; + unsigned int i; + + nr = min_t(unsigned int, num_possible_cpus(), + PAGE_SIZE / sizeof(struct scoutfs_work_list)); conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS); - if (!conn) - return NULL; - - if (info_size) { - conn->info = kzalloc(info_size, GFP_NOFS); - if (!conn->info) { - kfree(conn); - return NULL; - } + if (conn) { + if (info_size) + conn->info = kzalloc(info_size, GFP_NOFS); + conn->ordered_proc_wlists = kmalloc_array(nr, sizeof(struct scoutfs_work_list), + GFP_NOFS); + conn->workq = alloc_workqueue("scoutfs_net_%s", + WQ_UNBOUND | WQ_NON_REENTRANT, 0, + name_suffix); } - - conn->workq = alloc_workqueue("scoutfs_net_%s", - WQ_UNBOUND | WQ_NON_REENTRANT, 0, - name_suffix); - if (!conn->workq) { - kfree(conn->info); - kfree(conn); + if (!conn || (info_size && !conn->info) || !conn->workq || !conn->ordered_proc_wlists) { + if (conn) { + kfree(conn->info); + kfree(conn->ordered_proc_wlists); + if (conn->workq) + destroy_workqueue(conn->workq); + kfree(conn); + } return NULL; } @@ -1378,6 +1494,13 @@ scoutfs_net_alloc_conn(struct super_block *sb, INIT_DELAYED_WORK(&conn->reconn_free_dwork, scoutfs_net_reconn_free_worker); + conn->ordered_proc_nr = nr; + for (i = 0; i < nr; i++) { + INIT_WORK(&conn->ordered_proc_wlists[i].work, scoutfs_net_ordered_proc_worker); + spin_lock_init(&conn->ordered_proc_wlists[i].lock); + INIT_LIST_HEAD(&conn->ordered_proc_wlists[i].list); + } + scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry); trace_scoutfs_conn_alloc(conn); diff --git a/kmod/src/net.h b/kmod/src/net.h index fe2c29e4..096f12d8 100644 --- a/kmod/src/net.h +++ b/kmod/src/net.h @@ -1,10 +1,18 @@ #ifndef _SCOUTFS_NET_H_ #define _SCOUTFS_NET_H_ +#include +#include #include #include "endian_swap.h" #include "tseq.h" +struct scoutfs_work_list { + struct work_struct work; + spinlock_t lock; + struct list_head list; +}; + struct scoutfs_net_connection; /* These are called in their own blocking context */ @@ -61,6 +69,8 @@ struct scoutfs_net_connection { struct list_head resend_queue; atomic64_t recv_seq; + unsigned int ordered_proc_nr; + struct scoutfs_work_list *ordered_proc_wlists; struct workqueue_struct *workq; struct work_struct listen_work; diff --git a/kmod/src/quorum.c b/kmod/src/quorum.c index 1c42188e..7c385448 100644 --- a/kmod/src/quorum.c +++ b/kmod/src/quorum.c @@ -243,10 +243,6 @@ static int send_msg_members(struct super_block *sb, int type, u64 term, int only }; struct sockaddr_in sin; struct msghdr mh = { -#ifndef KC_MSGHDR_STRUCT_IOV_ITER - .msg_iov = (struct iovec *)&kv, - .msg_iovlen = 1, -#endif .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL, .msg_name = &sin, .msg_namelen = sizeof(sin), @@ -268,9 +264,7 @@ static int send_msg_members(struct super_block *sb, int type, u64 term, int only scoutfs_quorum_slot_sin(&qinf->qconf, i, &sin); now = ktime_get(); -#ifdef KC_MSGHDR_STRUCT_IOV_ITER - iov_iter_init(&mh.msg_iter, WRITE, (struct iovec *)&kv, sizeof(qmes), 1); -#endif + ret = kernel_sendmsg(qinf->sock, &mh, &kv, 1, kv.iov_len); if (ret != kv.iov_len) failed++; @@ -312,10 +306,6 @@ static int recv_msg(struct super_block *sb, struct quorum_host_msg *msg, .iov_len = sizeof(struct scoutfs_quorum_message), }; struct msghdr mh = { -#ifndef KC_MSGHDR_STRUCT_IOV_ITER - .msg_iov = (struct iovec *)&kv, - .msg_iovlen = 1, -#endif .msg_flags = MSG_NOSIGNAL, }; @@ -333,9 +323,6 @@ static int recv_msg(struct super_block *sb, struct quorum_host_msg *msg, ret = kc_tcp_sock_set_rcvtimeo(qinf->sock, rel_to); } -#ifdef KC_MSGHDR_STRUCT_IOV_ITER - iov_iter_init(&mh.msg_iter, READ, (struct iovec *)&kv, sizeof(struct scoutfs_quorum_message), 1); -#endif ret = kernel_recvmsg(qinf->sock, &mh, &kv, 1, kv.iov_len, mh.msg_flags); if (ret < 0) return ret;