Send messages in batches

Previous work had the receiver try to receive multiple messages in bulk.
This does the same for the sender.

We walk the send queue and initialize a vector that we then send with
one call.  This is intentionally similar to the single message sending
pattern to avoid unintended changes.

Along with the changes to recieve in bulk this ended up increasing the
message processing rate by about 6x when both send and receive were
going full throttle.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2025-09-29 11:44:07 -07:00
parent e79086f381
commit feae5757c4

View File

@@ -792,33 +792,41 @@ out:
trace_scoutfs_net_recv_work_exit(sb, 0, ret);
}
static int sendmsg_full(struct socket *sock, void *buf, unsigned len)
/*
* This consumes the kvec.
*/
static int k_sendmsg_full(struct socket *sock, struct kvec *kv, unsigned long nr_segs, size_t count)
{
struct msghdr msg;
struct kvec kv;
int ret;
int ret = 0;
while (len) {
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_NOSIGNAL;
kv.iov_base = buf;
kv.iov_len = len;
while (count > 0) {
struct msghdr msg = {
.msg_flags = MSG_NOSIGNAL,
};
#ifndef KC_MSGHDR_STRUCT_IOV_ITER
msg.msg_iov = (struct iovec *)&kv;
msg.msg_iovlen = 1;
#else
iov_iter_init(&msg.msg_iter, WRITE, (struct iovec *)&kv, 1, len);
#endif
ret = kernel_sendmsg(sock, &msg, &kv, 1, len);
if (ret <= 0)
return -ECONNABORTED;
ret = kernel_sendmsg(sock, &msg, kv, nr_segs, count);
if (ret <= 0) {
ret = -ECONNABORTED;
break;
}
len -= ret;
buf += ret;
count -= ret;
if (count) {
while (nr_segs > 0 && ret >= kv->iov_len) {
ret -= kv->iov_len;
kv++;
nr_segs--;
}
if (nr_segs > 0 && ret > 0) {
kv->iov_base += ret;
kv->iov_len -= ret;
}
BUG_ON(nr_segs == 0);
}
ret = 0;
}
return 0;
return ret;
}
static void free_msend(struct net_info *ninf, struct message_send *msend)
@@ -849,54 +857,73 @@ static void scoutfs_net_send_worker(struct work_struct *work)
struct super_block *sb = conn->sb;
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
struct message_send *msend;
int ret = 0;
struct message_send *_msend_;
struct kvec kv[16];
unsigned long nr_segs;
size_t count;
int len;
int ret;
trace_scoutfs_net_send_work_enter(sb, 0, 0);
spin_lock(&conn->lock);
while ((msend = list_first_entry_or_null(&conn->send_queue,
struct message_send, head))) {
if (msend->dead) {
free_msend(ninf, msend);
continue;
}
if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) &&
nh_is_response(&msend->nh)) {
set_conn_fl(conn, saw_farewell);
}
msend->nh.recv_seq =
cpu_to_le64(atomic64_read(&conn->recv_seq));
spin_unlock(&conn->lock);
len = nh_bytes(le16_to_cpu(msend->nh.data_len));
scoutfs_inc_counter(sb, net_send_messages);
scoutfs_add_counter(sb, net_send_bytes, len);
trace_scoutfs_net_send_message(sb, &conn->sockname,
&conn->peername, &msend->nh);
ret = sendmsg_full(conn->sock, &msend->nh, len);
for (;;) {
nr_segs = 0;
count = 0;
spin_lock(&conn->lock);
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
if (msend->dead) {
free_msend(ninf, msend);
continue;
}
msend->nh.recv_seq = 0;
len = nh_bytes(le16_to_cpu(msend->nh.data_len));
if (ret)
break;
if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) &&
nh_is_response(&msend->nh)) {
set_conn_fl(conn, saw_farewell);
}
/* resend if it wasn't freed while we sent */
if (!msend->dead)
list_move_tail(&msend->head, &conn->resend_queue);
msend->nh.recv_seq = cpu_to_le64(atomic64_read(&conn->recv_seq));
scoutfs_inc_counter(sb, net_send_messages);
scoutfs_add_counter(sb, net_send_bytes, len);
trace_scoutfs_net_send_message(sb, &conn->sockname,
&conn->peername, &msend->nh);
count += len;
kv[nr_segs].iov_base = &msend->nh;
kv[nr_segs].iov_len = len;
if (++nr_segs == ARRAY_SIZE(kv))
break;
}
spin_unlock(&conn->lock);
if (nr_segs == 0) {
ret = 0;
goto out;
}
ret = k_sendmsg_full(conn->sock, kv, nr_segs, count);
if (ret < 0)
goto out;
spin_lock(&conn->lock);
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
msend->nh.recv_seq = 0;
/* resend if it wasn't freed while we sent */
if (!msend->dead)
list_move_tail(&msend->head, &conn->resend_queue);
if (--nr_segs == 0)
break;
}
spin_unlock(&conn->lock);
}
spin_unlock(&conn->lock);
out:
if (ret) {
scoutfs_inc_counter(sb, net_send_error);
shutdown_conn(conn);