Search messages in rbtree instead of lists

The net layer was initially built around send queue lists with the
presumption that there wouldn't be many messages in flight and that
responses would be sent roughly in order.

Then many years passed.

In the modern era, we can have 10s of thousands of lock request messages
in flight.  This lead to o(n^2) processing in quite a few places as recv
processing searched for either requests to complete or responses to
free.

This adds messages to two rbtrees, indexing either requests by their id
or responses by their send sequence.  Recv processing can find messages
in o(log n).

Then we add a specific list that the send worker uses to free dead
messages, rather than abusing the send queue.  It doesn't make a huge
functional difference but it's less messy and only costs the list_head
per message.

The end result is that, on a single node, with ~40k lock shrink attempts
in flight, we go from processing ~800 total request/grant
request/response pairs per second to ~60,000 per second.

Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
Zach Brown
2026-01-07 17:37:05 -08:00
parent f0c7996612
commit e82e127435
2 changed files with 148 additions and 85 deletions

View File

@@ -21,6 +21,7 @@
#include <net/tcp.h>
#include <linux/log2.h>
#include <linux/jhash.h>
#include <linux/rbtree.h>
#include "format.h"
#include "counters.h"
@@ -122,9 +123,10 @@ enum spin_lock_subtype {
*/
struct message_send {
struct scoutfs_tseq_entry tseq_entry;
unsigned long dead:1;
struct list_head head;
struct rb_node node;
scoutfs_net_response_t resp_func;
struct list_head head;
struct list_head dead_head; /* o/~ playin' in the band o/~ */
void *resp_data;
struct scoutfs_net_header nh;
};
@@ -161,58 +163,133 @@ static bool nh_is_request(struct scoutfs_net_header *nh)
return !nh_is_response(nh);
}
static bool msend_is_dead(struct message_send *msend)
{
return !list_empty(&msend->dead_head);
}
static int cmp_sorted_msend(u64 pos, struct message_send *msend)
{
if (nh_is_request(&msend->nh))
return pos < le64_to_cpu(msend->nh.id) ? -1 :
pos > le64_to_cpu(msend->nh.id) ? 1 : 0;
else
return pos < le64_to_cpu(msend->nh.seq) ? -1 :
pos > le64_to_cpu(msend->nh.seq) ? 1 : 0;
}
static struct message_send *search_sorted_msends(struct rb_root *root, u64 pos, struct rb_node *ins)
{
struct rb_node **node = &root->rb_node;
struct rb_node *parent = NULL;
struct message_send *msend = NULL;
struct message_send *next = NULL;
int cmp = -1;
while (*node) {
parent = *node;
msend = container_of(*node, struct message_send, node);
cmp = cmp_sorted_msend(pos, msend);
if (cmp < 0) {
next = msend;
node = &(*node)->rb_left;
} else if (cmp > 0) {
node = &(*node)->rb_right;
} else {
next = msend;
break;
}
}
BUG_ON(cmp == 0 && ins);
if (ins) {
rb_link_node(ins, parent, node);
rb_insert_color(ins, root);
}
return next;
}
static struct message_send *next_sorted_msend(struct message_send *msend)
{
struct rb_node *node = rb_next(&msend->node);
return node ? rb_entry(node, struct message_send, node) : NULL;
}
#define for_each_sorted_msend(MSEND_, TMP_, ROOT_, POS_) \
for (MSEND_ = search_sorted_msends(ROOT_, POS_, NULL); \
MSEND_ != NULL && ({ TMP_ = next_sorted_msend(MSEND_); true; }); \
MSEND_ = TMP_)
static void insert_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend)
{
BUG_ON(!RB_EMPTY_NODE(&msend->node));
if (nh_is_request(&msend->nh))
search_sorted_msends(&conn->req_root, le64_to_cpu(msend->nh.id), &msend->node);
else
search_sorted_msends(&conn->resp_root, le64_to_cpu(msend->nh.seq), &msend->node);
}
static void erase_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend)
{
if (!RB_EMPTY_NODE(&msend->node)) {
if (nh_is_request(&msend->nh))
rb_erase(&msend->node, &conn->req_root);
else
rb_erase(&msend->node, &conn->resp_root);
RB_CLEAR_NODE(&msend->node);
}
}
static void move_sorted_msends(struct scoutfs_net_connection *dst_conn, struct rb_root *dst_root,
struct scoutfs_net_connection *src_conn, struct rb_root *src_root)
{
struct message_send *msend;
struct message_send *tmp;
for_each_sorted_msend(msend, tmp, src_root, 0) {
erase_sorted_msend(src_conn, msend);
insert_sorted_msend(dst_conn, msend);
}
}
/*
* We return dead requests so that the caller can stop searching other
* lists for the dead request that we found.
* Pending requests are uniquely identified by the id they were assigned
* as they were first put on the send queue.
*/
static struct message_send *search_list(struct scoutfs_net_connection *conn,
struct list_head *list,
u8 cmd, u64 id)
static struct message_send *find_request(struct scoutfs_net_connection *conn, u64 id)
{
struct message_send *msend;
assert_spin_locked(&conn->lock);
list_for_each_entry(msend, list, head) {
if (nh_is_request(&msend->nh) && msend->nh.cmd == cmd &&
le64_to_cpu(msend->nh.id) == id)
return msend;
}
return NULL;
}
/*
* Find an active send request on the lists. It's almost certainly
* waiting on the resend queue but it could be actively being sent.
*/
static struct message_send *find_request(struct scoutfs_net_connection *conn,
u8 cmd, u64 id)
{
struct message_send *msend;
msend = search_list(conn, &conn->resend_queue, cmd, id) ?:
search_list(conn, &conn->send_queue, cmd, id);
if (msend && msend->dead)
msend = search_sorted_msends(&conn->req_root, id, NULL);
if (msend && le64_to_cpu(msend->nh.id) != id)
msend = NULL;
return msend;
}
/*
* Complete a send message by moving it to the send queue and marking it
* to be freed. It won't be visible to callers trying to find sends.
* Free a message by moving it to the dead list and queueing the send
* work to free it. Once considered dead the message won't be sent or
* visible as a request for response processing.
*/
static void complete_send(struct scoutfs_net_connection *conn,
struct message_send *msend)
static void queue_dead_free(struct scoutfs_net_connection *conn, struct message_send *msend)
{
assert_spin_locked(&conn->lock);
if (WARN_ON_ONCE(msend->dead) ||
/* callers look up the message in the sorted roots, which dead messages are removed from */
if (WARN_ON_ONCE(msend_is_dead(msend)) ||
WARN_ON_ONCE(list_empty(&msend->head)))
return;
msend->dead = 1;
list_move(&msend->head, &conn->send_queue);
erase_sorted_msend(conn, msend);
list_add_tail(&msend->dead_head, &conn->dead_list);
queue_work(conn->workq, &conn->send_work);
}
@@ -369,7 +446,8 @@ static int submit_send(struct super_block *sb,
msend->resp_func = resp_func;
msend->resp_data = resp_data;
msend->dead = 0;
INIT_LIST_HEAD(&msend->dead_head);
RB_CLEAR_NODE(&msend->node);
msend->nh.seq = cpu_to_le64(seq);
msend->nh.recv_seq = 0; /* set when sent, not when queued */
@@ -390,6 +468,7 @@ static int submit_send(struct super_block *sb,
} else {
list_add_tail(&msend->head, &conn->resend_queue);
}
insert_sorted_msend(conn, msend);
if (id_ret)
*id_ret = le64_to_cpu(msend->nh.id);
@@ -455,11 +534,11 @@ static int process_response(struct scoutfs_net_connection *conn,
spin_lock(&conn->lock);
msend = find_request(conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id));
msend = find_request(conn, le64_to_cpu(mrecv->nh.id));
if (msend) {
resp_func = msend->resp_func;
resp_data = msend->resp_data;
complete_send(conn, msend);
queue_dead_free(conn, msend);
} else {
scoutfs_inc_counter(sb, net_dropped_response);
}
@@ -546,47 +625,22 @@ static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct messa
queue_work(conn->workq, &wlist->work);
}
/*
* Free live responses up to and including the seq by marking them dead
* and moving them to the send queue to be freed.
*/
static bool move_acked_responses(struct scoutfs_net_connection *conn,
struct list_head *list, u64 seq)
{
struct message_send *msend;
struct message_send *tmp;
bool moved = false;
assert_spin_locked(&conn->lock);
list_for_each_entry_safe(msend, tmp, list, head) {
if (le64_to_cpu(msend->nh.seq) > seq)
break;
if (!nh_is_response(&msend->nh) || msend->dead)
continue;
msend->dead = 1;
list_move(&msend->head, &conn->send_queue);
moved = true;
}
return moved;
}
/* acks are processed inline in the recv worker */
static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq)
{
bool moved;
struct message_send *msend;
struct message_send *tmp;
spin_lock(&conn->lock);
moved = move_acked_responses(conn, &conn->send_queue, seq) |
move_acked_responses(conn, &conn->resend_queue, seq);
for_each_sorted_msend(msend, tmp, &conn->resp_root, 0) {
if (le64_to_cpu(msend->nh.seq) > seq)
break;
queue_dead_free(conn, msend);
}
spin_unlock(&conn->lock);
if (moved)
queue_work(conn->workq, &conn->send_work);
}
static int k_recvmsg(struct socket *sock, void *buf, unsigned len)
@@ -824,9 +878,13 @@ static int k_sendmsg_full(struct socket *sock, struct kvec *kv, unsigned long nr
return ret;
}
static void free_msend(struct net_info *ninf, struct message_send *msend)
static void free_msend(struct net_info *ninf, struct scoutfs_net_connection *conn,
struct message_send *msend)
{
list_del_init(&msend->head);
if (!list_empty(&msend->dead_head))
list_del_init(&msend->dead_head);
erase_sorted_msend(conn, msend);
scoutfs_tseq_del(&ninf->msg_tseq_tree, &msend->tseq_entry);
kfree(msend);
}
@@ -866,12 +924,11 @@ static void scoutfs_net_send_worker(struct work_struct *work)
count = 0;
spin_lock(&conn->lock);
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
if (msend->dead) {
free_msend(ninf, msend);
continue;
}
list_for_each_entry_safe(msend, _msend_, &conn->dead_list, dead_head)
free_msend(ninf, conn, msend);
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
len = nh_bytes(le16_to_cpu(msend->nh.data_len));
if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) &&
@@ -909,7 +966,7 @@ static void scoutfs_net_send_worker(struct work_struct *work)
msend->nh.recv_seq = 0;
/* resend if it wasn't freed while we sent */
if (!msend->dead)
if (!msend_is_dead(msend))
list_move_tail(&msend->head, &conn->resend_queue);
if (--nr_segs == 0)
@@ -957,7 +1014,7 @@ static void scoutfs_net_destroy_worker(struct work_struct *work)
list_splice_init(&conn->resend_queue, &conn->send_queue);
list_for_each_entry_safe(msend, tmp, &conn->send_queue, head)
free_msend(ninf, msend);
free_msend(ninf, conn, msend);
/* accepted sockets are removed from their listener's list */
if (conn->listening_conn) {
@@ -1303,7 +1360,7 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work)
struct message_send, head))) {
resp_func = msend->resp_func;
resp_data = msend->resp_data;
free_msend(ninf, msend);
free_msend(ninf, conn, msend);
spin_unlock(&conn->lock);
call_resp_func(sb, conn, resp_func, resp_data, NULL, 0, -ECONNABORTED);
@@ -1319,7 +1376,7 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work)
list_splice_tail_init(&conn->send_queue, &conn->resend_queue);
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head) {
if (msend->nh.cmd == SCOUTFS_NET_CMD_GREETING)
free_msend(ninf, msend);
free_msend(ninf, conn, msend);
}
clear_conn_fl(conn, saw_greeting);
@@ -1493,6 +1550,9 @@ scoutfs_net_alloc_conn(struct super_block *sb,
atomic64_set(&conn->recv_seq, 0);
INIT_LIST_HEAD(&conn->send_queue);
INIT_LIST_HEAD(&conn->resend_queue);
INIT_LIST_HEAD(&conn->dead_list);
conn->req_root = RB_ROOT;
conn->resp_root = RB_ROOT;
INIT_WORK(&conn->listen_work, scoutfs_net_listen_worker);
INIT_WORK(&conn->connect_work, scoutfs_net_connect_worker);
INIT_WORK(&conn->send_work, scoutfs_net_send_worker);
@@ -1703,10 +1763,8 @@ void scoutfs_net_client_greeting(struct super_block *sb,
if (new_server) {
atomic64_set(&conn->recv_seq, 0);
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head){
if (nh_is_response(&msend->nh))
free_msend(ninf, msend);
}
for_each_sorted_msend(msend, tmp, &conn->resp_root, 0)
free_msend(ninf, conn, msend);
}
set_valid_greeting(conn);
@@ -1808,6 +1866,8 @@ restart:
BUG_ON(!list_empty(&reconn->send_queue));
/* queued greeting response is racing, can be in send or resend queue */
list_splice_tail_init(&reconn->resend_queue, &conn->resend_queue);
move_sorted_msends(conn, &conn->req_root, reconn, &reconn->req_root);
move_sorted_msends(conn, &conn->resp_root, reconn, &reconn->resp_root);
/* new conn info is unused, swap, old won't call down */
swap(conn->info, reconn->info);

View File

@@ -67,6 +67,9 @@ struct scoutfs_net_connection {
u64 next_send_id;
struct list_head send_queue;
struct list_head resend_queue;
struct list_head dead_list;
struct rb_root req_root;
struct rb_root resp_root;
atomic64_t recv_seq;
unsigned int ordered_proc_nr;