diff --git a/kmod/src/net.c b/kmod/src/net.c index 51e4c468..f02c2e80 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -588,33 +588,23 @@ static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq) queue_work(conn->workq, &conn->send_work); } -static int recvmsg_full(struct socket *sock, void *buf, unsigned len) +static int k_recvmsg(struct socket *sock, void *buf, unsigned len) { - struct msghdr msg; - struct kvec kv; - int ret; - - while (len) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_NOSIGNAL; - kv.iov_base = buf; - kv.iov_len = len; + struct kvec kv = { + .iov_base = buf, + .iov_len = len, + }; + struct msghdr msg = { + .msg_flags = MSG_NOSIGNAL, + }; #ifndef KC_MSGHDR_STRUCT_IOV_ITER - msg.msg_iov = (struct iovec *)&kv; - msg.msg_iovlen = 1; + msg.msg_iov = (struct iovec *)&kv; + msg.msg_iovlen = 1; #else - iov_iter_init(&msg.msg_iter, READ, (struct iovec *)&kv, len, 1); + iov_iter_init(&msg.msg_iter, READ, (struct iovec *)&kv, len, 1); #endif - ret = kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags); - if (ret <= 0) - return -ECONNABORTED; - - len -= ret; - buf += ret; - } - - return 0; + return kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags); } static bool invalid_message(struct scoutfs_net_connection *conn, @@ -651,6 +641,72 @@ static bool invalid_message(struct scoutfs_net_connection *conn, return false; } +static int recv_one_message(struct super_block *sb, struct net_info *ninf, + struct scoutfs_net_connection *conn, struct scoutfs_net_header *nh, + unsigned int data_len) +{ + struct message_recv *mrecv; + int ret; + + scoutfs_inc_counter(sb, net_recv_messages); + scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len)); + trace_scoutfs_net_recv_message(sb, &conn->sockname, &conn->peername, nh); + + /* caller's invalid message checked data len */ + mrecv = kmalloc(offsetof(struct message_recv, nh.data[data_len]), GFP_NOFS); + if (!mrecv) { + ret = -ENOMEM; + goto out; + } + + mrecv->conn = conn; + INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); + INIT_LIST_HEAD(&mrecv->ordered_head); + mrecv->nh = *nh; + if (data_len) + memcpy(mrecv->nh.data, (nh + 1), data_len); + + if (nh->cmd == SCOUTFS_NET_CMD_GREETING) { + /* greetings are out of band, no seq mechanics */ + set_conn_fl(conn, saw_greeting); + + } else if (le64_to_cpu(nh->seq) <= + atomic64_read(&conn->recv_seq)) { + /* drop any resent duplicated messages */ + scoutfs_inc_counter(sb, net_recv_dropped_duplicate); + kfree(mrecv); + ret = 0; + goto out; + + } else { + /* record that we've received sender's seq */ + atomic64_set(&conn->recv_seq, le64_to_cpu(nh->seq)); + /* and free our responses that sender has received */ + free_acked_responses(conn, le64_to_cpu(nh->recv_seq)); + } + + scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); + + /* + * Initial received greetings are processed inline + * before any other incoming messages. + * + * Incoming requests or responses to the lock client + * can't handle re-ordering, so they're queued to + * ordered receive processing work. + */ + if (nh->cmd == SCOUTFS_NET_CMD_GREETING) + scoutfs_net_proc_worker(&mrecv->proc_work); + else if (nh->cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn) + queue_ordered_proc(conn, mrecv); + else + queue_work(conn->workq, &mrecv->proc_work); + ret = 0; + +out: + return ret; +} + /* * Always block receiving from the socket. Errors trigger shutting down * the connection. @@ -661,89 +717,72 @@ static void scoutfs_net_recv_worker(struct work_struct *work) struct super_block *sb = conn->sb; struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct socket *sock = conn->sock; - struct scoutfs_net_header nh; - struct message_recv *mrecv; + struct scoutfs_net_header *nh; + struct page *page = NULL; unsigned int data_len; + int hdr_off; + int rx_off; + int size; int ret; trace_scoutfs_net_recv_work_enter(sb, 0, 0); + page = alloc_page(GFP_NOFS); + if (!page) { + ret = -ENOMEM; + goto out; + } + + hdr_off = 0; + rx_off = 0; + for (;;) { /* receive the header */ - ret = recvmsg_full(sock, &nh, sizeof(nh)); - if (ret) - break; - - /* receiving an invalid message breaks the connection */ - if (invalid_message(conn, &nh)) { - scoutfs_inc_counter(sb, net_recv_invalid_message); - ret = -EBADMSG; - break; + ret = k_recvmsg(sock, page_address(page) + rx_off, PAGE_SIZE - rx_off); + if (ret <= 0) { + ret = -ECONNABORTED; + goto out; } - data_len = le16_to_cpu(nh.data_len); + rx_off += ret; - scoutfs_inc_counter(sb, net_recv_messages); - scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len)); - trace_scoutfs_net_recv_message(sb, &conn->sockname, - &conn->peername, &nh); + for (;;) { + size = rx_off - hdr_off; + if (size < sizeof(struct scoutfs_net_header)) + break; - /* invalid message checked data len */ - mrecv = kmalloc(offsetof(struct message_recv, - nh.data[data_len]), GFP_NOFS); - if (!mrecv) { - ret = -ENOMEM; - break; + nh = page_address(page) + hdr_off; + + /* receiving an invalid message breaks the connection */ + if (invalid_message(conn, nh)) { + scoutfs_inc_counter(sb, net_recv_invalid_message); + ret = -EBADMSG; + break; + } + + data_len = le16_to_cpu(nh->data_len); + if (sizeof(struct scoutfs_net_header) + data_len > size) + break; + + ret = recv_one_message(sb, ninf, conn, nh, data_len); + if (ret < 0) + goto out; + + hdr_off += sizeof(struct scoutfs_net_header) + data_len; } - mrecv->conn = conn; - INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker); - INIT_LIST_HEAD(&mrecv->ordered_head); - mrecv->nh = nh; - - /* receive the data payload */ - ret = recvmsg_full(sock, mrecv->nh.data, data_len); - if (ret) { - kfree(mrecv); - break; + if ((PAGE_SIZE - rx_off) < + (sizeof(struct scoutfs_net_header) + SCOUTFS_NET_MAX_DATA_LEN)) { + if (size) + memmove(page_address(page), page_address(page) + hdr_off, size); + hdr_off = 0; + rx_off = size; } - - if (nh.cmd == SCOUTFS_NET_CMD_GREETING) { - /* greetings are out of band, no seq mechanics */ - set_conn_fl(conn, saw_greeting); - - } else if (le64_to_cpu(nh.seq) <= - atomic64_read(&conn->recv_seq)) { - /* drop any resent duplicated messages */ - scoutfs_inc_counter(sb, net_recv_dropped_duplicate); - kfree(mrecv); - continue; - - } else { - /* record that we've received sender's seq */ - atomic64_set(&conn->recv_seq, le64_to_cpu(nh.seq)); - /* and free our responses that sender has received */ - free_acked_responses(conn, le64_to_cpu(nh.recv_seq)); - } - - scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); - - /* - * Initial received greetings are processed inline - * before any other incoming messages. - * - * Incoming requests or responses to the lock client - * can't handle re-ordering, so they're queued to - * ordered receive processing work. - */ - if (nh.cmd == SCOUTFS_NET_CMD_GREETING) - scoutfs_net_proc_worker(&mrecv->proc_work); - else if (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn) - queue_ordered_proc(conn, mrecv); - else - queue_work(conn->workq, &mrecv->proc_work); } +out: + __free_page(page); + if (ret) scoutfs_inc_counter(sb, net_recv_error);