diff --git a/kmod/src/net.c b/kmod/src/net.c index 7e633e44..345f5c68 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "format.h" #include "counters.h" @@ -122,9 +123,10 @@ enum spin_lock_subtype { */ struct message_send { struct scoutfs_tseq_entry tseq_entry; - unsigned long dead:1; - struct list_head head; + struct rb_node node; scoutfs_net_response_t resp_func; + struct list_head head; + struct list_head dead_head; /* o/~ playin' in the band o/~ */ void *resp_data; struct scoutfs_net_header nh; }; @@ -161,58 +163,133 @@ static bool nh_is_request(struct scoutfs_net_header *nh) return !nh_is_response(nh); } +static bool msend_is_dead(struct message_send *msend) +{ + return !list_empty(&msend->dead_head); +} + +static int cmp_sorted_msend(u64 pos, struct message_send *msend) +{ + if (nh_is_request(&msend->nh)) + return pos < le64_to_cpu(msend->nh.id) ? -1 : + pos > le64_to_cpu(msend->nh.id) ? 1 : 0; + else + return pos < le64_to_cpu(msend->nh.seq) ? -1 : + pos > le64_to_cpu(msend->nh.seq) ? 1 : 0; +} + +static struct message_send *search_sorted_msends(struct rb_root *root, u64 pos, struct rb_node *ins) +{ + struct rb_node **node = &root->rb_node; + struct rb_node *parent = NULL; + struct message_send *msend = NULL; + struct message_send *next = NULL; + int cmp = -1; + + while (*node) { + parent = *node; + msend = container_of(*node, struct message_send, node); + + cmp = cmp_sorted_msend(pos, msend); + if (cmp < 0) { + next = msend; + node = &(*node)->rb_left; + } else if (cmp > 0) { + node = &(*node)->rb_right; + } else { + next = msend; + break; + } + } + + BUG_ON(cmp == 0 && ins); + + if (ins) { + rb_link_node(ins, parent, node); + rb_insert_color(ins, root); + } + + return next; +} + +static struct message_send *next_sorted_msend(struct message_send *msend) +{ + struct rb_node *node = rb_next(&msend->node); + + return node ? rb_entry(node, struct message_send, node) : NULL; +} + +#define for_each_sorted_msend(MSEND_, TMP_, ROOT_, POS_) \ + for (MSEND_ = search_sorted_msends(ROOT_, POS_, NULL); \ + MSEND_ != NULL && ({ TMP_ = next_sorted_msend(MSEND_); true; }); \ + MSEND_ = TMP_) + +static void insert_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend) +{ + BUG_ON(!RB_EMPTY_NODE(&msend->node)); + + if (nh_is_request(&msend->nh)) + search_sorted_msends(&conn->req_root, le64_to_cpu(msend->nh.id), &msend->node); + else + search_sorted_msends(&conn->resp_root, le64_to_cpu(msend->nh.seq), &msend->node); +} + +static void erase_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend) +{ + if (!RB_EMPTY_NODE(&msend->node)) { + if (nh_is_request(&msend->nh)) + rb_erase(&msend->node, &conn->req_root); + else + rb_erase(&msend->node, &conn->resp_root); + RB_CLEAR_NODE(&msend->node); + } +} + +static void move_sorted_msends(struct scoutfs_net_connection *dst_conn, struct rb_root *dst_root, + struct scoutfs_net_connection *src_conn, struct rb_root *src_root) +{ + struct message_send *msend; + struct message_send *tmp; + + for_each_sorted_msend(msend, tmp, src_root, 0) { + erase_sorted_msend(src_conn, msend); + insert_sorted_msend(dst_conn, msend); + } +} + /* - * We return dead requests so that the caller can stop searching other - * lists for the dead request that we found. + * Pending requests are uniquely identified by the id they were assigned + * as they were first put on the send queue. */ -static struct message_send *search_list(struct scoutfs_net_connection *conn, - struct list_head *list, - u8 cmd, u64 id) +static struct message_send *find_request(struct scoutfs_net_connection *conn, u64 id) { struct message_send *msend; assert_spin_locked(&conn->lock); - list_for_each_entry(msend, list, head) { - if (nh_is_request(&msend->nh) && msend->nh.cmd == cmd && - le64_to_cpu(msend->nh.id) == id) - return msend; - } - - return NULL; -} - -/* - * Find an active send request on the lists. It's almost certainly - * waiting on the resend queue but it could be actively being sent. - */ -static struct message_send *find_request(struct scoutfs_net_connection *conn, - u8 cmd, u64 id) -{ - struct message_send *msend; - - msend = search_list(conn, &conn->resend_queue, cmd, id) ?: - search_list(conn, &conn->send_queue, cmd, id); - if (msend && msend->dead) + msend = search_sorted_msends(&conn->req_root, id, NULL); + if (msend && le64_to_cpu(msend->nh.id) != id) msend = NULL; + return msend; } /* - * Complete a send message by moving it to the send queue and marking it - * to be freed. It won't be visible to callers trying to find sends. + * Free a message by moving it to the dead list and queueing the send + * work to free it. Once considered dead the message won't be sent or + * visible as a request for response processing. */ -static void complete_send(struct scoutfs_net_connection *conn, - struct message_send *msend) +static void queue_dead_free(struct scoutfs_net_connection *conn, struct message_send *msend) { assert_spin_locked(&conn->lock); - if (WARN_ON_ONCE(msend->dead) || + /* callers look up the message in the sorted roots, which dead messages are removed from */ + if (WARN_ON_ONCE(msend_is_dead(msend)) || WARN_ON_ONCE(list_empty(&msend->head))) return; - msend->dead = 1; - list_move(&msend->head, &conn->send_queue); + erase_sorted_msend(conn, msend); + list_add_tail(&msend->dead_head, &conn->dead_list); queue_work(conn->workq, &conn->send_work); } @@ -369,7 +446,8 @@ static int submit_send(struct super_block *sb, msend->resp_func = resp_func; msend->resp_data = resp_data; - msend->dead = 0; + INIT_LIST_HEAD(&msend->dead_head); + RB_CLEAR_NODE(&msend->node); msend->nh.seq = cpu_to_le64(seq); msend->nh.recv_seq = 0; /* set when sent, not when queued */ @@ -390,6 +468,7 @@ static int submit_send(struct super_block *sb, } else { list_add_tail(&msend->head, &conn->resend_queue); } + insert_sorted_msend(conn, msend); if (id_ret) *id_ret = le64_to_cpu(msend->nh.id); @@ -455,11 +534,11 @@ static int process_response(struct scoutfs_net_connection *conn, spin_lock(&conn->lock); - msend = find_request(conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id)); + msend = find_request(conn, le64_to_cpu(mrecv->nh.id)); if (msend) { resp_func = msend->resp_func; resp_data = msend->resp_data; - complete_send(conn, msend); + queue_dead_free(conn, msend); } else { scoutfs_inc_counter(sb, net_dropped_response); } @@ -546,47 +625,22 @@ static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct messa queue_work(conn->workq, &wlist->work); } -/* - * Free live responses up to and including the seq by marking them dead - * and moving them to the send queue to be freed. - */ -static bool move_acked_responses(struct scoutfs_net_connection *conn, - struct list_head *list, u64 seq) -{ - struct message_send *msend; - struct message_send *tmp; - bool moved = false; - - assert_spin_locked(&conn->lock); - - list_for_each_entry_safe(msend, tmp, list, head) { - if (le64_to_cpu(msend->nh.seq) > seq) - break; - if (!nh_is_response(&msend->nh) || msend->dead) - continue; - - msend->dead = 1; - list_move(&msend->head, &conn->send_queue); - moved = true; - } - - return moved; -} - /* acks are processed inline in the recv worker */ static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq) { - bool moved; + struct message_send *msend; + struct message_send *tmp; spin_lock(&conn->lock); - moved = move_acked_responses(conn, &conn->send_queue, seq) | - move_acked_responses(conn, &conn->resend_queue, seq); + for_each_sorted_msend(msend, tmp, &conn->resp_root, 0) { + if (le64_to_cpu(msend->nh.seq) > seq) + break; + + queue_dead_free(conn, msend); + } spin_unlock(&conn->lock); - - if (moved) - queue_work(conn->workq, &conn->send_work); } static int k_recvmsg(struct socket *sock, void *buf, unsigned len) @@ -824,9 +878,13 @@ static int k_sendmsg_full(struct socket *sock, struct kvec *kv, unsigned long nr return ret; } -static void free_msend(struct net_info *ninf, struct message_send *msend) +static void free_msend(struct net_info *ninf, struct scoutfs_net_connection *conn, + struct message_send *msend) { list_del_init(&msend->head); + if (!list_empty(&msend->dead_head)) + list_del_init(&msend->dead_head); + erase_sorted_msend(conn, msend); scoutfs_tseq_del(&ninf->msg_tseq_tree, &msend->tseq_entry); kfree(msend); } @@ -866,12 +924,11 @@ static void scoutfs_net_send_worker(struct work_struct *work) count = 0; spin_lock(&conn->lock); - list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) { - if (msend->dead) { - free_msend(ninf, msend); - continue; - } + list_for_each_entry_safe(msend, _msend_, &conn->dead_list, dead_head) + free_msend(ninf, conn, msend); + + list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) { len = nh_bytes(le16_to_cpu(msend->nh.data_len)); if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) && @@ -909,7 +966,7 @@ static void scoutfs_net_send_worker(struct work_struct *work) msend->nh.recv_seq = 0; /* resend if it wasn't freed while we sent */ - if (!msend->dead) + if (!msend_is_dead(msend)) list_move_tail(&msend->head, &conn->resend_queue); if (--nr_segs == 0) @@ -957,7 +1014,7 @@ static void scoutfs_net_destroy_worker(struct work_struct *work) list_splice_init(&conn->resend_queue, &conn->send_queue); list_for_each_entry_safe(msend, tmp, &conn->send_queue, head) - free_msend(ninf, msend); + free_msend(ninf, conn, msend); /* accepted sockets are removed from their listener's list */ if (conn->listening_conn) { @@ -1303,7 +1360,7 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work) struct message_send, head))) { resp_func = msend->resp_func; resp_data = msend->resp_data; - free_msend(ninf, msend); + free_msend(ninf, conn, msend); spin_unlock(&conn->lock); call_resp_func(sb, conn, resp_func, resp_data, NULL, 0, -ECONNABORTED); @@ -1319,7 +1376,7 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work) list_splice_tail_init(&conn->send_queue, &conn->resend_queue); list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head) { if (msend->nh.cmd == SCOUTFS_NET_CMD_GREETING) - free_msend(ninf, msend); + free_msend(ninf, conn, msend); } clear_conn_fl(conn, saw_greeting); @@ -1493,6 +1550,9 @@ scoutfs_net_alloc_conn(struct super_block *sb, atomic64_set(&conn->recv_seq, 0); INIT_LIST_HEAD(&conn->send_queue); INIT_LIST_HEAD(&conn->resend_queue); + INIT_LIST_HEAD(&conn->dead_list); + conn->req_root = RB_ROOT; + conn->resp_root = RB_ROOT; INIT_WORK(&conn->listen_work, scoutfs_net_listen_worker); INIT_WORK(&conn->connect_work, scoutfs_net_connect_worker); INIT_WORK(&conn->send_work, scoutfs_net_send_worker); @@ -1703,10 +1763,8 @@ void scoutfs_net_client_greeting(struct super_block *sb, if (new_server) { atomic64_set(&conn->recv_seq, 0); - list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head){ - if (nh_is_response(&msend->nh)) - free_msend(ninf, msend); - } + for_each_sorted_msend(msend, tmp, &conn->resp_root, 0) + free_msend(ninf, conn, msend); } set_valid_greeting(conn); @@ -1808,6 +1866,8 @@ restart: BUG_ON(!list_empty(&reconn->send_queue)); /* queued greeting response is racing, can be in send or resend queue */ list_splice_tail_init(&reconn->resend_queue, &conn->resend_queue); + move_sorted_msends(conn, &conn->req_root, reconn, &reconn->req_root); + move_sorted_msends(conn, &conn->resp_root, reconn, &reconn->resp_root); /* new conn info is unused, swap, old won't call down */ swap(conn->info, reconn->info); diff --git a/kmod/src/net.h b/kmod/src/net.h index 096f12d8..1ed1d53f 100644 --- a/kmod/src/net.h +++ b/kmod/src/net.h @@ -67,6 +67,9 @@ struct scoutfs_net_connection { u64 next_send_id; struct list_head send_queue; struct list_head resend_queue; + struct list_head dead_list; + struct rb_root req_root; + struct rb_root resp_root; atomic64_t recv_seq; unsigned int ordered_proc_nr;