mirror of
https://github.com/versity/scoutfs.git
synced 2026-01-04 03:14:02 +00:00
Receive incoming messages in bulk
Our messaging layer is used for small control messages, not large data payloads. By calling recvmsg twice for every incoming message we're hitting the socket lock reasonably hard. With senders doing the same, and a lot of messages flowing in each direction, the contention is non-trivial. This changes the receiver to copy as much of the incoming stream into a page that is then framed and copied again into individual allocated messages that can be processed concurrently. We're avoiding contention with the sender on the socket at the cost of additional copies of our small messages. Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
217
kmod/src/net.c
217
kmod/src/net.c
@@ -588,33 +588,23 @@ static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq)
|
||||
queue_work(conn->workq, &conn->send_work);
|
||||
}
|
||||
|
||||
static int recvmsg_full(struct socket *sock, void *buf, unsigned len)
|
||||
static int k_recvmsg(struct socket *sock, void *buf, unsigned len)
|
||||
{
|
||||
struct msghdr msg;
|
||||
struct kvec kv;
|
||||
int ret;
|
||||
|
||||
while (len) {
|
||||
memset(&msg, 0, sizeof(msg));
|
||||
msg.msg_flags = MSG_NOSIGNAL;
|
||||
kv.iov_base = buf;
|
||||
kv.iov_len = len;
|
||||
struct kvec kv = {
|
||||
.iov_base = buf,
|
||||
.iov_len = len,
|
||||
};
|
||||
struct msghdr msg = {
|
||||
.msg_flags = MSG_NOSIGNAL,
|
||||
};
|
||||
|
||||
#ifndef KC_MSGHDR_STRUCT_IOV_ITER
|
||||
msg.msg_iov = (struct iovec *)&kv;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_iov = (struct iovec *)&kv;
|
||||
msg.msg_iovlen = 1;
|
||||
#else
|
||||
iov_iter_init(&msg.msg_iter, READ, (struct iovec *)&kv, len, 1);
|
||||
iov_iter_init(&msg.msg_iter, READ, (struct iovec *)&kv, len, 1);
|
||||
#endif
|
||||
ret = kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags);
|
||||
if (ret <= 0)
|
||||
return -ECONNABORTED;
|
||||
|
||||
len -= ret;
|
||||
buf += ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags);
|
||||
}
|
||||
|
||||
static bool invalid_message(struct scoutfs_net_connection *conn,
|
||||
@@ -651,6 +641,72 @@ static bool invalid_message(struct scoutfs_net_connection *conn,
|
||||
return false;
|
||||
}
|
||||
|
||||
static int recv_one_message(struct super_block *sb, struct net_info *ninf,
|
||||
struct scoutfs_net_connection *conn, struct scoutfs_net_header *nh,
|
||||
unsigned int data_len)
|
||||
{
|
||||
struct message_recv *mrecv;
|
||||
int ret;
|
||||
|
||||
scoutfs_inc_counter(sb, net_recv_messages);
|
||||
scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len));
|
||||
trace_scoutfs_net_recv_message(sb, &conn->sockname, &conn->peername, nh);
|
||||
|
||||
/* caller's invalid message checked data len */
|
||||
mrecv = kmalloc(offsetof(struct message_recv, nh.data[data_len]), GFP_NOFS);
|
||||
if (!mrecv) {
|
||||
ret = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
mrecv->conn = conn;
|
||||
INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker);
|
||||
INIT_LIST_HEAD(&mrecv->ordered_head);
|
||||
mrecv->nh = *nh;
|
||||
if (data_len)
|
||||
memcpy(mrecv->nh.data, (nh + 1), data_len);
|
||||
|
||||
if (nh->cmd == SCOUTFS_NET_CMD_GREETING) {
|
||||
/* greetings are out of band, no seq mechanics */
|
||||
set_conn_fl(conn, saw_greeting);
|
||||
|
||||
} else if (le64_to_cpu(nh->seq) <=
|
||||
atomic64_read(&conn->recv_seq)) {
|
||||
/* drop any resent duplicated messages */
|
||||
scoutfs_inc_counter(sb, net_recv_dropped_duplicate);
|
||||
kfree(mrecv);
|
||||
ret = 0;
|
||||
goto out;
|
||||
|
||||
} else {
|
||||
/* record that we've received sender's seq */
|
||||
atomic64_set(&conn->recv_seq, le64_to_cpu(nh->seq));
|
||||
/* and free our responses that sender has received */
|
||||
free_acked_responses(conn, le64_to_cpu(nh->recv_seq));
|
||||
}
|
||||
|
||||
scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry);
|
||||
|
||||
/*
|
||||
* Initial received greetings are processed inline
|
||||
* before any other incoming messages.
|
||||
*
|
||||
* Incoming requests or responses to the lock client
|
||||
* can't handle re-ordering, so they're queued to
|
||||
* ordered receive processing work.
|
||||
*/
|
||||
if (nh->cmd == SCOUTFS_NET_CMD_GREETING)
|
||||
scoutfs_net_proc_worker(&mrecv->proc_work);
|
||||
else if (nh->cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)
|
||||
queue_ordered_proc(conn, mrecv);
|
||||
else
|
||||
queue_work(conn->workq, &mrecv->proc_work);
|
||||
ret = 0;
|
||||
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Always block receiving from the socket. Errors trigger shutting down
|
||||
* the connection.
|
||||
@@ -661,89 +717,72 @@ static void scoutfs_net_recv_worker(struct work_struct *work)
|
||||
struct super_block *sb = conn->sb;
|
||||
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
||||
struct socket *sock = conn->sock;
|
||||
struct scoutfs_net_header nh;
|
||||
struct message_recv *mrecv;
|
||||
struct scoutfs_net_header *nh;
|
||||
struct page *page = NULL;
|
||||
unsigned int data_len;
|
||||
int hdr_off;
|
||||
int rx_off;
|
||||
int size;
|
||||
int ret;
|
||||
|
||||
trace_scoutfs_net_recv_work_enter(sb, 0, 0);
|
||||
|
||||
page = alloc_page(GFP_NOFS);
|
||||
if (!page) {
|
||||
ret = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
hdr_off = 0;
|
||||
rx_off = 0;
|
||||
|
||||
for (;;) {
|
||||
/* receive the header */
|
||||
ret = recvmsg_full(sock, &nh, sizeof(nh));
|
||||
if (ret)
|
||||
break;
|
||||
|
||||
/* receiving an invalid message breaks the connection */
|
||||
if (invalid_message(conn, &nh)) {
|
||||
scoutfs_inc_counter(sb, net_recv_invalid_message);
|
||||
ret = -EBADMSG;
|
||||
break;
|
||||
ret = k_recvmsg(sock, page_address(page) + rx_off, PAGE_SIZE - rx_off);
|
||||
if (ret <= 0) {
|
||||
ret = -ECONNABORTED;
|
||||
goto out;
|
||||
}
|
||||
|
||||
data_len = le16_to_cpu(nh.data_len);
|
||||
rx_off += ret;
|
||||
|
||||
scoutfs_inc_counter(sb, net_recv_messages);
|
||||
scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len));
|
||||
trace_scoutfs_net_recv_message(sb, &conn->sockname,
|
||||
&conn->peername, &nh);
|
||||
for (;;) {
|
||||
size = rx_off - hdr_off;
|
||||
if (size < sizeof(struct scoutfs_net_header))
|
||||
break;
|
||||
|
||||
/* invalid message checked data len */
|
||||
mrecv = kmalloc(offsetof(struct message_recv,
|
||||
nh.data[data_len]), GFP_NOFS);
|
||||
if (!mrecv) {
|
||||
ret = -ENOMEM;
|
||||
break;
|
||||
nh = page_address(page) + hdr_off;
|
||||
|
||||
/* receiving an invalid message breaks the connection */
|
||||
if (invalid_message(conn, nh)) {
|
||||
scoutfs_inc_counter(sb, net_recv_invalid_message);
|
||||
ret = -EBADMSG;
|
||||
break;
|
||||
}
|
||||
|
||||
data_len = le16_to_cpu(nh->data_len);
|
||||
if (sizeof(struct scoutfs_net_header) + data_len > size)
|
||||
break;
|
||||
|
||||
ret = recv_one_message(sb, ninf, conn, nh, data_len);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
hdr_off += sizeof(struct scoutfs_net_header) + data_len;
|
||||
}
|
||||
|
||||
mrecv->conn = conn;
|
||||
INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker);
|
||||
INIT_LIST_HEAD(&mrecv->ordered_head);
|
||||
mrecv->nh = nh;
|
||||
|
||||
/* receive the data payload */
|
||||
ret = recvmsg_full(sock, mrecv->nh.data, data_len);
|
||||
if (ret) {
|
||||
kfree(mrecv);
|
||||
break;
|
||||
if ((PAGE_SIZE - rx_off) <
|
||||
(sizeof(struct scoutfs_net_header) + SCOUTFS_NET_MAX_DATA_LEN)) {
|
||||
if (size)
|
||||
memmove(page_address(page), page_address(page) + hdr_off, size);
|
||||
hdr_off = 0;
|
||||
rx_off = size;
|
||||
}
|
||||
|
||||
if (nh.cmd == SCOUTFS_NET_CMD_GREETING) {
|
||||
/* greetings are out of band, no seq mechanics */
|
||||
set_conn_fl(conn, saw_greeting);
|
||||
|
||||
} else if (le64_to_cpu(nh.seq) <=
|
||||
atomic64_read(&conn->recv_seq)) {
|
||||
/* drop any resent duplicated messages */
|
||||
scoutfs_inc_counter(sb, net_recv_dropped_duplicate);
|
||||
kfree(mrecv);
|
||||
continue;
|
||||
|
||||
} else {
|
||||
/* record that we've received sender's seq */
|
||||
atomic64_set(&conn->recv_seq, le64_to_cpu(nh.seq));
|
||||
/* and free our responses that sender has received */
|
||||
free_acked_responses(conn, le64_to_cpu(nh.recv_seq));
|
||||
}
|
||||
|
||||
scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry);
|
||||
|
||||
/*
|
||||
* Initial received greetings are processed inline
|
||||
* before any other incoming messages.
|
||||
*
|
||||
* Incoming requests or responses to the lock client
|
||||
* can't handle re-ordering, so they're queued to
|
||||
* ordered receive processing work.
|
||||
*/
|
||||
if (nh.cmd == SCOUTFS_NET_CMD_GREETING)
|
||||
scoutfs_net_proc_worker(&mrecv->proc_work);
|
||||
else if (nh.cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)
|
||||
queue_ordered_proc(conn, mrecv);
|
||||
else
|
||||
queue_work(conn->workq, &mrecv->proc_work);
|
||||
}
|
||||
|
||||
out:
|
||||
__free_page(page);
|
||||
|
||||
if (ret)
|
||||
scoutfs_inc_counter(sb, net_recv_error);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user