diff --git a/kmod/src/client.c b/kmod/src/client.c index 11bd1fe7..85a5a729 100644 --- a/kmod/src/client.c +++ b/kmod/src/client.c @@ -57,6 +57,12 @@ struct client_info { struct scoutfs_quorum_elected_info qei; u64 old_elected_nr; + + u64 server_term; + + bool sending_farewell; + int farewell_error; + struct completion farewell_comp; }; /* @@ -281,7 +287,8 @@ static int client_greeting(struct super_block *sb, struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_net_greeting *gr = resp; - int ret = 0; + bool new_server; + int ret; if (error) { ret = error; @@ -328,6 +335,11 @@ static int client_greeting(struct super_block *sb, complete(&client->node_id_comp); } + new_server = le64_to_cpu(gr->server_term) != client->server_term; + scoutfs_net_client_greeting(sb, conn, new_server); + + client->server_term = le64_to_cpu(gr->server_term); + ret = 0; out: return ret; } @@ -398,7 +410,7 @@ static void scoutfs_client_connect_worker(struct work_struct *work) goto out; if (qei->run_server) { - ret = scoutfs_server_start(sb, &qei->sin); + ret = scoutfs_server_start(sb, &qei->sin, qei->elected_nr); if (ret) { /* forget that we tried to start the server */ memset(qei, 0, sizeof(*qei)); @@ -423,7 +435,11 @@ static void scoutfs_client_connect_worker(struct work_struct *work) /* send a greeting to verify endpoints of each connection */ greet.fsid = super->hdr.fsid; greet.format_hash = super->format_hash; + greet.server_term = cpu_to_le64(client->server_term); greet.node_id = cpu_to_le64(sbi->node_id); + greet.flags = 0; + if (client->sending_farewell) + greet.flags |= cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL); ret = scoutfs_net_submit_request(sb, client->conn, SCOUTFS_NET_CMD_GREETING, @@ -537,6 +553,7 @@ int scoutfs_client_setup(struct super_block *sb) init_completion(&client->node_id_comp); atomic_set(&client->shutting_down, 0); INIT_WORK(&client->connect_work, scoutfs_client_connect_worker); + init_completion(&client->farewell_comp); client->conn = scoutfs_net_alloc_conn(sb, NULL, client_notify_down, 0, client_req_funcs, "client"); @@ -560,34 +577,89 @@ out: return ret; } +/* Once we get a response from the server we can shut down */ +static int client_farewell_response(struct super_block *sb, + struct scoutfs_net_connection *conn, + void *resp, unsigned int resp_len, + int error, void *data) +{ + struct client_info *client = SCOUTFS_SB(sb)->client_info; + + if (resp_len != 0) + return -EINVAL; + + client->farewell_error = error; + complete(&client->farewell_comp); + + return 0; +} + /* * There must be no more callers to the client request functions by the * time we get here. + * + * If we've connected to a server then we send them a farewell request + * so that they don't wait for us to reconnect and trigger a timeout. + * + * This decision is a little racy. The server considers us connected + * when it assigns us a node_id as it processes the greeting. We can + * disconnect before receiving the response and leave without sending a + * farewell. So given that awkward initial race, we also have a bit of + * a race where we just test the server_term to see if we've ever gotten + * a greeting reply from any server. We don't try to synchronize with + * pending connection attempts. + * + * The consequences of aborting a mount at just the wrong time and + * disconnecting without the farewell handshake depend on what the + * server does to timed out clients. At best it'll spit out a warning + * message that a client disconnected but it won't fence us if we didn't + * have any persistent state. */ void scoutfs_client_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct client_info *client = SCOUTFS_SB(sb)->client_info; struct scoutfs_net_connection *conn; + int ret; - if (client) { - /* stop notify_down from queueing connect work */ - atomic_set(&client->shutting_down, 1); + if (client == NULL) + return; - /* make sure worker isn't using the conn */ - cancel_work_sync(&client->connect_work); - - /* make racing conn use explode */ - conn = client->conn; - client->conn = NULL; - scoutfs_net_free_conn(sb, conn); - - /* stop running the server if we were, harmless otherwise */ - stop_our_server(sb, &client->qei); - - if (client->workq) - destroy_workqueue(client->workq); - kfree(client); - sbi->client_info = NULL; + if (client->server_term != 0) { + client->sending_farewell = true; + ret = scoutfs_net_submit_request(sb, client->conn, + SCOUTFS_NET_CMD_FAREWELL, + NULL, 0, + client_farewell_response, + NULL, NULL); + if (ret == 0) { + ret = wait_for_completion_interruptible( + &client->farewell_comp); + if (ret == 0) + ret = client->farewell_error; + } + if (ret) { + scoutfs_inc_counter(sb, client_farewell_error); + scoutfs_warn(sb, "client saw farewell error %d, server might see client connection time out\n", ret); + } } + + /* stop notify_down from queueing connect work */ + atomic_set(&client->shutting_down, 1); + + /* make sure worker isn't using the conn */ + cancel_work_sync(&client->connect_work); + + /* make racing conn use explode */ + conn = client->conn; + client->conn = NULL; + scoutfs_net_free_conn(sb, conn); + + /* stop running the server if we were, harmless otherwise */ + stop_our_server(sb, &client->qei); + + if (client->workq) + destroy_workqueue(client->workq); + kfree(client); + sbi->client_info = NULL; } diff --git a/kmod/src/counters.h b/kmod/src/counters.h index 55a7ea14..e997fe31 100644 --- a/kmod/src/counters.h +++ b/kmod/src/counters.h @@ -15,6 +15,7 @@ EXPAND_COUNTER(btree_read_error) \ EXPAND_COUNTER(btree_stale_read) \ EXPAND_COUNTER(btree_write_error) \ + EXPAND_COUNTER(client_farewell_error) \ EXPAND_COUNTER(compact_invalid_request) \ EXPAND_COUNTER(compact_operations) \ EXPAND_COUNTER(compact_segment_busy) \ @@ -110,6 +111,7 @@ EXPAND_COUNTER(net_send_error) \ EXPAND_COUNTER(net_send_messages) \ EXPAND_COUNTER(net_recv_bytes) \ + EXPAND_COUNTER(net_recv_dropped_duplicate) \ EXPAND_COUNTER(net_recv_error) \ EXPAND_COUNTER(net_recv_invalid_message) \ EXPAND_COUNTER(net_recv_messages) \ diff --git a/kmod/src/format.h b/kmod/src/format.h index e2bfecfa..d46b2195 100644 --- a/kmod/src/format.h +++ b/kmod/src/format.h @@ -571,23 +571,53 @@ enum { */ /* - * Greetings verify identity of communicating nodes. The sender - * sends their credentials and the receiver verifies them. + * Greetings verify identity of communicating nodes. The sender sends + * their credentials and the receiver verifies them. + * + * @server_term: The raft term that elected the server. Initially 0 + * from the client, sent by the server, then sent by the client as it + * tries to reconnect. Used to identify a client reconnecting to a + * server that has timed out its connection. + * + * @node_id: The id of the client. Initially 0 from the client, + * assigned by the server, and sent by the client as it reconnects. + * Used by the server to identify reconnecting clients whose existing + * state must be dealt with. */ struct scoutfs_net_greeting { __le64 fsid; __le64 format_hash; + __le64 server_term; __le64 node_id; + __le64 flags; } __packed; +#define SCOUTFS_NET_GREETING_FLAG_FAREWELL (1 << 0) +#define SCOUTFS_NET_GREETING_FLAG_INVALID (~(__u64)0 << 1) + /* * This header precedes and describes all network messages sent over - * sockets. The id is set by the request and sent in the response. + * sockets. + * + * @seq: A sequence number that is increased for each message queued for + * send on the sender. The sender will never reorder messages in the + * send queue so this will always increase in recv on the receiver. The + * receiver can use this to drop messages that arrived twice after being + * resent across a newly connected socket for a given connection. + * + * @recv_seq: The sequence number of the last received message. The + * receiver is sending this to the sender in every message. The sender + * uses them to drop responses which have been delivered. + * + * @id: An increasing identifier that is set in each request. Responses + * specify the request that they're responding to. * * Error is only set to a translated errno and will only be found in * response messages. */ struct scoutfs_net_header { + __le64 seq; + __le64 recv_seq; __le64 id; __le16 data_len; __u8 cmd; @@ -612,6 +642,7 @@ enum { SCOUTFS_NET_CMD_STATFS, SCOUTFS_NET_CMD_COMPACT, SCOUTFS_NET_CMD_LOCK, + SCOUTFS_NET_CMD_FAREWELL, SCOUTFS_NET_CMD_UNKNOWN, }; diff --git a/kmod/src/net.c b/kmod/src/net.c index 14c763a9..41aff95e 100644 --- a/kmod/src/net.c +++ b/kmod/src/net.c @@ -41,38 +41,39 @@ * Both set up a connection and specify the set of request commands they * can process. * - * Requests are tracked on a connection and sent to its peer. They're + * Request and response messages are queued on a connection. They're * resent down newly established sockets on a long lived connection. * Queued requests are removed as a response is processed or if the - * request is canceled by the sender. + * request is canceled by the sender. Queued responses are removed as + * the receiver acknowledges their delivery. * - * Request processing sends a response down the socket that received a - * connection. Processing is stopped as a socket is shutdown so - * responses are only send down sockets that received a request. + * Request and response resending is asymmetrical because of the + * client/server relationship. If a client connects to a new server it + * drops responses because the new server doesn't have any requests + * pending. If a server times out a client it drops everything because + * that client is never coming back. * - * Thus requests can be received multiple times as sockets are shutdown - * and reconnected. Responses are only processed once for a given - * request. It is up to request and response implementations to ensure - * that duplicate requests are safely handled. + * Requests and responses are only processed once for a given client and + * server pair. Callers have to deal with the possibility that two + * servers might both process the same client request, even though the + * client may only see the most recent response. * - * It turns out that we have to deal with duplicate request processing - * at the layer above networking anyway. Request processing can make - * persistent changes that are committed on the server before it - * crashes. The client then reconnects to before it crashes and the - * client reconnects to a server who must detect that the persistent - * work on behalf of the resent request has already been committed. If - * we have to deal with that duplicate processing we may as well - * simplify networking by allowing it between reconnecting peers as - * well. + * The functional core of this implementation is solid, but boy are the + * interface boundaries getting fuzzy. The core knows too much about + * clients and servers and the communications across the net interface + * boundary are questionable. We probably want to pull more client and + * server specific behaviour up into the client and server and turn the + * "net" code into more passive shared framing helpers. * * XXX: - * - defer accepted conn destruction until reconnect timeout * - trace command and response data payloads * - checksum message contents? - * - explicit shutdown message to free accepted, timeout and fence otherwise * - shutdown server if accept can't alloc resources for new conn? */ +/* reasonable multiple of max client reconnect attempt interval */ +#define CLIENT_RECONNECT_TIMEOUT_MS (20 * MSEC_PER_SEC) + /* * A connection's shutdown work executes in its own workqueue so that the * work can free the connection's workq. @@ -97,13 +98,19 @@ struct scoutfs_net_connection { unsigned long valid_greeting:1, /* other commands can proceed */ established:1, /* added sends queue send work */ - shutting_down:1; /* shutdown work has been queued */ + shutting_down:1, /* shutdown work has been queued */ + saw_greeting:1, /* saw greeting on this sock */ + saw_farewell:1, /* saw farewell request from client */ + reconn_wait:1, /* shutdown, waiting for reconnect */ + reconn_freeing:1; /* waiting done, setter frees */ + unsigned long reconn_deadline; struct sockaddr_in connect_sin; unsigned long connect_timeout_ms; struct socket *sock; u64 node_id; /* assigned during greeting */ + u64 greeting_id; struct sockaddr_in sockname; struct sockaddr_in peername; @@ -111,21 +118,25 @@ struct scoutfs_net_connection { struct scoutfs_net_connection *listening_conn; struct list_head accepted_list; + u64 next_send_seq; u64 next_send_id; struct list_head send_queue; struct list_head resend_queue; + atomic64_t recv_seq; + struct workqueue_struct *workq; struct work_struct listen_work; struct work_struct connect_work; struct work_struct send_work; struct work_struct recv_work; struct work_struct shutdown_work; + struct delayed_work reconn_free_dwork; /* message_recv proc_work also executes in the conn workq */ struct scoutfs_tseq_entry tseq_entry; - u8 info[0] __aligned(sizeof(u64)); + void *info; }; /* listening and their accepting sockets have a fixed locking order */ @@ -186,6 +197,10 @@ static bool nh_is_request(struct scoutfs_net_header *nh) return !nh_is_response(nh); } +/* + * We return dead requests so that the caller can stop searching other + * lists for the dead request that we found. + */ static struct message_send *search_list(struct scoutfs_net_connection *conn, struct list_head *list, u8 cmd, u64 id) @@ -343,6 +358,7 @@ static int submit_send(struct super_block *sb, struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct scoutfs_net_connection *acc_conn; struct message_send *msend; + u64 seq; if (WARN_ON_ONCE(cmd >= SCOUTFS_NET_CMD_UNKNOWN) || WARN_ON_ONCE(flags & SCOUTFS_NET_FLAGS_UNKNOWN) || @@ -378,12 +394,16 @@ static int submit_send(struct super_block *sb, } } + seq = conn->next_send_seq++; + if (id == 0) + id = conn->next_send_id++; + msend->resp_func = resp_func; msend->resp_data = resp_data; msend->dead = 0; - if (id == 0) - id = conn->next_send_id++; + msend->nh.seq = cpu_to_le64(seq); + msend->nh.recv_seq = 0; /* set when sent, not when queued */ msend->nh.id = cpu_to_le64(id); msend->nh.cmd = cmd; msend->nh.flags = flags; @@ -411,36 +431,7 @@ static int submit_send(struct super_block *sb, } /* - * Messages can flow once we receive and process a valid greeting from - * our peer. - * - * At this point recv processing has queued the greeting response - * message on the send queue. Any request messages waiting to be resent - * need to be added to the end of the send queue after the greeting - * response. - * - * Update the conn's node_id so that servers can send to specific - * clients. - */ -static void saw_valid_greeting(struct scoutfs_net_connection *conn, u64 node_id) -{ - struct super_block *sb = conn->sb; - - spin_lock(&conn->lock); - - conn->valid_greeting = 1; - conn->node_id = node_id; - list_splice_tail_init(&conn->resend_queue, &conn->send_queue); - queue_work(conn->workq, &conn->send_work); - - spin_unlock(&conn->lock); - - if (conn->notify_up) - conn->notify_up(sb, conn, conn->info, node_id); -} - -/* - * Process an incoming response. The greeting should ensure that the + * Process an incoming request. The greeting should ensure that the * sender won't send us unknown commands. We return an error if we see * an unknown command because the greeting should agree on an understood * protocol. The request function sends a response and returns an error @@ -451,8 +442,6 @@ static int process_request(struct scoutfs_net_connection *conn, { struct super_block *sb = conn->sb; scoutfs_net_request_t req_func; - struct scoutfs_net_greeting *gr; - int ret; if (mrecv->nh.cmd < SCOUTFS_NET_CMD_UNKNOWN) req_func = conn->req_funcs[mrecv->nh.cmd]; @@ -464,21 +453,8 @@ static int process_request(struct scoutfs_net_connection *conn, return -EINVAL; } - ret = req_func(sb, conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id), - mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len)); - - /* - * Greeting response updates our *request* node_id so that - * we can consume a new allocation without callbacks. We're - * about to free the recv in the caller anyway. - */ - if (!conn->valid_greeting && - mrecv->nh.cmd == SCOUTFS_NET_CMD_GREETING && ret == 0) { - gr = (void *)mrecv->nh.data; - saw_valid_greeting(conn, le64_to_cpu(gr->node_id)); - } - - return ret; + return req_func(sb, conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id), + mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len)); } /* @@ -514,11 +490,6 @@ static int process_response(struct scoutfs_net_connection *conn, ret = resp_func(sb, conn, mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len), net_err_to_host(mrecv->nh.error), resp_data); - - if (!conn->valid_greeting && - mrecv->nh.cmd == SCOUTFS_NET_CMD_GREETING && msend && ret == 0) - saw_valid_greeting(conn, 0); - return ret; } @@ -553,6 +524,49 @@ static void scoutfs_net_proc_worker(struct work_struct *work) trace_scoutfs_net_proc_work_exit(sb, 0, ret); } +/* + * Free live responses up to and including the seq by marking them dead + * and moving them to the send queue to be freed. + */ +static int move_acked_responses(struct scoutfs_net_connection *conn, + struct list_head *list, u64 seq) +{ + struct message_send *msend; + struct message_send *tmp; + int ret = 0; + + assert_spin_locked(&conn->lock); + + list_for_each_entry_safe(msend, tmp, list, head) { + if (le64_to_cpu(msend->nh.seq) > seq) + break; + if (!nh_is_response(&msend->nh) || msend->dead) + continue; + + msend->dead = 1; + list_move(&msend->head, &conn->send_queue); + ret = 1; + } + + return ret; +} + +/* acks are processed inline in the recv worker */ +static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq) +{ + int moved; + + spin_lock(&conn->lock); + + moved = move_acked_responses(conn, &conn->send_queue, seq) + + move_acked_responses(conn, &conn->resend_queue, seq); + + spin_unlock(&conn->lock); + + if (moved) + queue_work(conn->workq, &conn->send_work); +} + static int recvmsg_full(struct socket *sock, void *buf, unsigned len) { struct msghdr msg; @@ -578,10 +592,11 @@ static int recvmsg_full(struct socket *sock, void *buf, unsigned len) return 0; } -static bool invalid_message(struct scoutfs_net_header *nh) +static bool invalid_message(struct scoutfs_net_connection *conn, + struct scoutfs_net_header *nh) { - /* ids must be non-zero */ - if (nh->id == 0) + /* seq and id must be non-zero */ + if (nh->seq == 0 || nh->id == 0) return true; /* greeting should negotiate understood protocol */ @@ -598,6 +613,16 @@ static bool invalid_message(struct scoutfs_net_header *nh) if (nh_is_request(nh) && nh->error != SCOUTFS_NET_ERR_NONE) return true; + if (nh->cmd == SCOUTFS_NET_CMD_GREETING) { + /* each endpoint can only receive one greeting per socket */ + if (conn->saw_greeting) + return true; + + /* servers get greeting requests, clients get responses */ + if (!!conn->listening_conn != !!nh_is_request(nh)) + return true; + } + return false; } @@ -625,7 +650,7 @@ static void scoutfs_net_recv_worker(struct work_struct *work) break; /* receiving an invalid message breaks the connection */ - if (invalid_message(&nh)) { + if (invalid_message(conn, &nh)) { scoutfs_inc_counter(sb, net_recv_invalid_message); ret = -EBADMSG; break; @@ -657,8 +682,31 @@ static void scoutfs_net_recv_worker(struct work_struct *work) break; } + if (nh.cmd == SCOUTFS_NET_CMD_GREETING) { + /* greetings are out of band, no seq mechanics */ + conn->saw_greeting = 1; + + } else if (le64_to_cpu(nh.seq) <= + atomic64_read(&conn->recv_seq)) { + /* drop any resent duplicated messages */ + scoutfs_inc_counter(sb, net_recv_dropped_duplicate); + kfree(mrecv); + continue; + + } else { + /* record that we've received sender's seq */ + atomic64_set(&conn->recv_seq, le64_to_cpu(nh.seq)); + /* and free our responses that sender has received */ + free_acked_responses(conn, le64_to_cpu(nh.recv_seq)); + } + scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry); - queue_work(conn->workq, &mrecv->proc_work); + + /* synchronously process greeting before next recvmsg */ + if (nh.cmd == SCOUTFS_NET_CMD_GREETING) + scoutfs_net_proc_worker(&mrecv->proc_work); + else + queue_work(conn->workq, &mrecv->proc_work); } if (ret) @@ -712,6 +760,10 @@ static void free_msend(struct net_info *ninf, struct message_send *msend) * The worker is responsible for freeing messages so that other contexts * don't have to worry about freeing a message while we're blocked * sending it without the lock held. + * + * We set the current recv_seq on every outgoing frame as it represents + * the current connection state, not the state back when each message + * was first queued. */ static void scoutfs_net_send_worker(struct work_struct *work) { @@ -734,6 +786,9 @@ static void scoutfs_net_send_worker(struct work_struct *work) continue; } + msend->nh.recv_seq = + cpu_to_le64(atomic64_read(&conn->recv_seq)); + spin_unlock(&conn->lock); len = nh_bytes(le16_to_cpu(msend->nh.data_len)); @@ -747,14 +802,14 @@ static void scoutfs_net_send_worker(struct work_struct *work) spin_lock(&conn->lock); + msend->nh.recv_seq = 0; + if (ret) break; - /* active requests are resent, everything else is freed */ - if (nh_is_request(&msend->nh) && !msend->dead) + /* resend if it wasn't freed while we sent */ + if (!msend->dead) list_move_tail(&msend->head, &conn->resend_queue); - else - msend->dead = 1; } spin_unlock(&conn->lock); @@ -778,6 +833,10 @@ static void destroy_conn(struct scoutfs_net_connection *conn) WARN_ON_ONCE(conn->sock != NULL); WARN_ON_ONCE(!list_empty(&conn->accepted_list)); + /* tell callers that accepted connection finally done */ + if (conn->listening_conn && conn->notify_down) + conn->notify_down(sb, conn, conn->info, conn->node_id); + /* free all messages, refactor and complete for forced unmount? */ list_splice_init(&conn->resend_queue, &conn->send_queue); list_for_each_entry_safe(msend, tmp, &conn->send_queue, head) { @@ -797,6 +856,7 @@ static void destroy_conn(struct scoutfs_net_connection *conn) destroy_workqueue(conn->workq); scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry); + kfree(conn->info); kfree(conn); } @@ -1025,9 +1085,11 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work) DEFINE_CONN_FROM_WORK(conn, work, shutdown_work); struct super_block *sb = conn->sb; struct net_info *ninf = SCOUTFS_SB(sb)->net_info; + struct scoutfs_net_connection *listener; struct scoutfs_net_connection *acc_conn; struct message_send *msend; struct message_send *tmp; + unsigned long delay; trace_scoutfs_net_shutdown_work_enter(sb, 0, 0); @@ -1062,39 +1124,119 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work) spin_unlock(&acc_conn->lock); } spin_unlock(&conn->lock); + + /* free any conns waiting for reconnection */ + cancel_delayed_work_sync(&conn->reconn_free_dwork); + queue_delayed_work(conn->workq, &conn->reconn_free_dwork, 0); + /* relies on delay 0 scheduling immediately so no timer to cancel */ + flush_delayed_work(&conn->reconn_free_dwork); + + /* and wait for accepted conn shutdown work to finish */ wait_event(conn->waitq, empty_accepted_list(conn)); spin_lock(&conn->lock); - /* resend any pending requests, drop responses or greetings */ + /* greetings aren't resent across sockets */ list_splice_tail_init(&conn->send_queue, &conn->resend_queue); list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head) { - if (nh_is_response(&msend->nh) || - msend->nh.cmd == SCOUTFS_NET_CMD_GREETING) + if (msend->nh.cmd == SCOUTFS_NET_CMD_GREETING) free_msend(ninf, msend); } + conn->saw_greeting = 0; + /* signal connect failure */ memset(&conn->connect_sin, 0, sizeof(conn->connect_sin)); wake_up(&conn->waitq); - spin_unlock(&conn->lock); - /* tell the caller that the connection is down */ - if (conn->notify_down) - conn->notify_down(sb, conn, conn->info, conn->node_id); + /* resolve racing with listener shutdown with locked shutting_down */ + if (conn->listening_conn && + (conn->listening_conn->shutting_down || conn->saw_farewell)) { - /* accepted conns are destroyed */ - if (conn->listening_conn) { + /* free accepted sockets after farewell or listener shutdown */ + spin_unlock(&conn->lock); destroy_conn(conn); + } else { - spin_lock(&conn->lock); - conn->shutting_down = 0; + + if (conn->listening_conn) { + /* server accepted sockets wait for reconnect */ + listener = conn->listening_conn; + delay = msecs_to_jiffies(CLIENT_RECONNECT_TIMEOUT_MS); + conn->reconn_wait = 1; + conn->reconn_deadline = jiffies + delay; + queue_delayed_work(listener->workq, + &listener->reconn_free_dwork, delay); + } else { + /* clients and listeners can retry */ + conn->shutting_down = 0; + if (conn->notify_down) + conn->notify_down(sb, conn, conn->info, + conn->node_id); + } + spin_unlock(&conn->lock); } trace_scoutfs_net_shutdown_work_exit(sb, 0, 0); } +/* + * Free any connections that have been shutdown for too long without the + * client reconnecting. This runs in work on the listening connection. + * It's racing with connection attempts searching for shutdown + * connections to steal state from. Shutdown cancels the work and waits + * for it to finish. + * + * Connections are currently freed without the lock held so this walks + * the entire list every time it frees a connection. This is irritating + * but timed out connections are rare and client counts are relatively + * low given a cpu's ability to burn through the list. + */ +static void scoutfs_net_reconn_free_worker(struct work_struct *work) +{ + DEFINE_CONN_FROM_WORK(conn, work, reconn_free_dwork.work); + struct super_block *sb = conn->sb; + struct scoutfs_net_connection *acc; + unsigned long now = jiffies; + unsigned long deadline = 0; + bool requeue = false; + + trace_scoutfs_net_reconn_free_work_enter(sb, 0, 0); + +restart: + spin_lock(&conn->lock); + list_for_each_entry(acc, &conn->accepted_list, accepted_head) { + + if (acc->reconn_wait && !acc->reconn_freeing && + (conn->shutting_down || + time_after_eq(now, acc->reconn_deadline))) { + acc->reconn_freeing = 1; + spin_unlock(&conn->lock); + if (!conn->shutting_down) + scoutfs_info(sb, "client timed out "SIN_FMT" -> "SIN_FMT", can not reconnect", + SIN_ARG(&acc->sockname), + SIN_ARG(&acc->peername)); + destroy_conn(acc); + goto restart; + } + + /* calc delay of next work, can drift a bit */ + if (acc->reconn_wait && !acc->reconn_freeing && + (!requeue || time_before(now, deadline))) { + requeue = true; + deadline = acc->reconn_deadline; + } + } + spin_unlock(&conn->lock); + + if (requeue) + queue_delayed_work(conn->workq, &conn->reconn_free_dwork, + deadline - now); + + trace_scoutfs_net_reconn_free_work_exit(sb, 0, 0); +} + /* * Accepted connections inherit the callbacks from their listening * connection. @@ -1116,15 +1258,21 @@ scoutfs_net_alloc_conn(struct super_block *sb, struct net_info *ninf = SCOUTFS_SB(sb)->net_info; struct scoutfs_net_connection *conn; - conn = kzalloc(offsetof(struct scoutfs_net_connection, - info[info_size]), GFP_NOFS); + conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS); if (!conn) return NULL; + conn->info = kzalloc(info_size, GFP_NOFS); + if (!conn->info) { + kfree(conn); + return NULL; + } + conn->workq = alloc_workqueue("scoutfs_net_%s", WQ_UNBOUND | WQ_NON_REENTRANT, 0, name_suffix); if (!conn->workq) { + kfree(conn->info); kfree(conn); return NULL; } @@ -1140,7 +1288,9 @@ scoutfs_net_alloc_conn(struct super_block *sb, conn->peername.sin_family = AF_INET; INIT_LIST_HEAD(&conn->accepted_head); INIT_LIST_HEAD(&conn->accepted_list); + conn->next_send_seq = 1; conn->next_send_id = 1; + atomic64_set(&conn->recv_seq, 0); INIT_LIST_HEAD(&conn->send_queue); INIT_LIST_HEAD(&conn->resend_queue); INIT_WORK(&conn->listen_work, scoutfs_net_listen_worker); @@ -1148,6 +1298,8 @@ scoutfs_net_alloc_conn(struct super_block *sb, INIT_WORK(&conn->send_work, scoutfs_net_send_worker); INIT_WORK(&conn->recv_work, scoutfs_net_recv_worker); INIT_WORK(&conn->shutdown_work, scoutfs_net_shutdown_worker); + INIT_DELAYED_WORK(&conn->reconn_free_dwork, + scoutfs_net_reconn_free_worker); scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry); @@ -1302,6 +1454,185 @@ int scoutfs_net_connect(struct super_block *sb, return ret ?: error; } +static void set_valid_greeting(struct scoutfs_net_connection *conn) +{ + assert_spin_locked(&conn->lock); + + /* recv should have dropped invalid duplicate greeting messages */ + BUG_ON(conn->valid_greeting); + + conn->valid_greeting = 1; + list_splice_tail_init(&conn->resend_queue, &conn->send_queue); + queue_work(conn->workq, &conn->send_work); +} + +/* + * The client has received a valid greeting from the server. Send + * can proceed and we might need to reset our recv state if we reconnected + * to a new server. + */ +void scoutfs_net_client_greeting(struct super_block *sb, + struct scoutfs_net_connection *conn, + bool new_server) +{ + struct net_info *ninf = SCOUTFS_SB(sb)->net_info; + struct message_send *msend; + struct message_send *tmp; + + /* only called on client connections :/ */ + BUG_ON(conn->listening_conn); + + spin_lock(&conn->lock); + + if (new_server) { + atomic64_set(&conn->recv_seq, 0); + list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head){ + if (nh_is_response(&msend->nh)) + free_msend(ninf, msend); + } + } + + set_valid_greeting(conn); + + spin_unlock(&conn->lock); + + /* client up/down drives reconnect */ + if (conn->notify_up) + conn->notify_up(sb, conn, conn->info, 0); +} + +/* + * The calling server has received a valid greeting from a client. If + * the server is reconnecting to us then we need to find its old + * connection that held its state and transfer it to this connection + * (connection and socket life cycles make this easier than migrating + * the socket between the connections). + * + * The previous connection that holds the client's state might still be + * in active use depending on network failure and work processing races. + * We shut it down before migrating its message state. We can be + * processing greetings from multiple reconnecting sockets that are all + * referring to the same original connection. We use the increasing + * greeting id to have the most recent connection attempt win. + * + * A node can be reconnecting to us for the first time. In that case we + * just trust its node_id. It will notice the new server term and take + * steps to recover. + * + * A client can be reconnecting to us after we've destroyed their state. + * This is fatal for the client if they just took too long to reconnect. + * But this can also happen if something disconnects the socket after + * we've sent a farewell response before the client received it. In + * this case we let the client reconnect so we can resend the farewell + * response and they can disconnect cleanly. + * + * At this point our connection is idle except for send submissions and + * shutdown being queued. Once we shut down a We completely own a We + * have exclusive access to a previous conn once its shutdown and we set + * _freeing. + */ +void scoutfs_net_server_greeting(struct super_block *sb, + struct scoutfs_net_connection *conn, + u64 node_id, u64 greeting_id, + bool sent_node_id, bool first_contact, + bool farewell) +{ + struct scoutfs_net_connection *listener; + struct scoutfs_net_connection *reconn; + struct scoutfs_net_connection *acc; + + /* only called on accepted server connections :/ */ + BUG_ON(!conn->listening_conn); + + /* see if we have a previous conn for the client's sent node_id */ + reconn = NULL; + if (sent_node_id) { + listener = conn->listening_conn; +restart: + spin_lock_nested(&listener->lock, CONN_LOCK_LISTENER); + list_for_each_entry(acc, &listener->accepted_list, + accepted_head) { + if (acc->node_id != node_id || + acc->greeting_id >= greeting_id || + acc->reconn_freeing) + continue; + + if (!acc->reconn_wait) { + spin_lock_nested(&acc->lock, + CONN_LOCK_ACCEPTED); + shutdown_conn_locked(acc); + spin_unlock(&acc->lock); + spin_unlock(&listener->lock); + msleep(10); /* XXX might be freed :/ */ + goto restart; + } + + reconn = acc; + reconn->reconn_freeing = 1; + break; + } + spin_unlock(&listener->lock); + } + + /* drop a connection if we can't find its necessary old conn */ + if (sent_node_id && !reconn && !first_contact && !farewell) { + shutdown_conn(conn); + return; + } + + /* migrate state from previous conn for this reconnecting node_id */ + if (reconn) { + spin_lock(&conn->lock); + + conn->saw_farewell = reconn->saw_farewell; + conn->next_send_seq = reconn->next_send_seq; + conn->next_send_id = reconn->next_send_id; + atomic64_set(&conn->recv_seq, atomic64_read(&reconn->recv_seq)); + + /* greeting response/ack will be on conn send queue */ + BUG_ON(!list_empty(&reconn->send_queue)); + BUG_ON(!list_empty(&conn->resend_queue)); + list_splice_init(&reconn->resend_queue, &conn->resend_queue); + + /* new conn info is unused, swap, old won't call down */ + swap(conn->info, reconn->info); + reconn->notify_down = NULL; + + spin_unlock(&conn->lock); + + /* we set _freeing */ + destroy_conn(reconn); + } + + spin_lock(&conn->lock); + + conn->node_id = node_id; + conn->greeting_id = greeting_id; + set_valid_greeting(conn); + + spin_unlock(&conn->lock); + + /* only call notify_up the first time we see the node_id */ + if (conn->notify_up && first_contact) + conn->notify_up(sb, conn, conn->info, node_id); +} + +/* + * The server has received a farewell message and is sending a response. + * All we do is mark the connection so that it is freed the next time it + * is shutdown, presumably as the client disconnects after receiving the + * response. The server caller has cleaned up all the state it had + * associated with the client. + */ +void scoutfs_net_server_farewell(struct super_block *sb, + struct scoutfs_net_connection *conn) +{ + spin_lock(&conn->lock); + conn->saw_farewell = 1; + spin_unlock(&conn->lock); +} + + /* * Submit a request down the connection. It's up to the caller to * ensure that the conn is allocated. Sends submitted when the @@ -1455,10 +1786,13 @@ static void net_tseq_show_conn(struct seq_file *m, struct scoutfs_net_connection *conn = container_of(ent, struct scoutfs_net_connection, tseq_entry); - seq_printf(m, "name "SIN_FMT" peer "SIN_FMT" vg %u est %u sd %u cto_ms %lu nsi %llu\n", + seq_printf(m, "name "SIN_FMT" peer "SIN_FMT" node_id %llu greeting_id %llu vg %u est %u sd %u sg %u sf %u rw %u rf %u cto_ms rdl_j %lu %lu nss %llu rs %llu nsi %llu\n", SIN_ARG(&conn->sockname), SIN_ARG(&conn->peername), - conn->valid_greeting, conn->established, - conn->shutting_down, conn->connect_timeout_ms, + conn->node_id, conn->greeting_id, conn->valid_greeting, + conn->established, conn->shutting_down, conn->saw_greeting, + conn->saw_farewell, conn->reconn_wait, conn->reconn_freeing, + conn->connect_timeout_ms, conn->reconn_deadline, + conn->next_send_seq, (u64)atomic64_read(&conn->recv_seq), conn->next_send_id); } diff --git a/kmod/src/net.h b/kmod/src/net.h index b59db537..82da64bf 100644 --- a/kmod/src/net.h +++ b/kmod/src/net.h @@ -67,6 +67,19 @@ void scoutfs_net_shutdown(struct super_block *sb, void scoutfs_net_free_conn(struct super_block *sb, struct scoutfs_net_connection *conn); +void scoutfs_net_client_greeting(struct super_block *sb, + struct scoutfs_net_connection *conn, + bool new_server); +void scoutfs_net_server_greeting(struct super_block *sb, + struct scoutfs_net_connection *conn, + u64 node_id, u64 greeting_id, + bool sent_node_id, bool first_contact, + bool farewell); +void scoutfs_net_server_farewell(struct super_block *sb, + struct scoutfs_net_connection *conn); +void scoutfs_net_farewell(struct super_block *sb, + struct scoutfs_net_connection *conn); + int scoutfs_net_setup(struct super_block *sb); void scoutfs_net_destroy(struct super_block *sb); diff --git a/kmod/src/scoutfs_trace.h b/kmod/src/scoutfs_trace.h index 26e244ba..0693f1ce 100644 --- a/kmod/src/scoutfs_trace.h +++ b/kmod/src/scoutfs_trace.h @@ -1840,6 +1840,14 @@ DEFINE_EVENT(scoutfs_work_class, scoutfs_net_shutdown_work_exit, TP_PROTO(struct super_block *sb, u64 data, int ret), TP_ARGS(sb, data, ret) ); +DEFINE_EVENT(scoutfs_work_class, scoutfs_net_reconn_free_work_enter, + TP_PROTO(struct super_block *sb, u64 data, int ret), + TP_ARGS(sb, data, ret) +); +DEFINE_EVENT(scoutfs_work_class, scoutfs_net_reconn_free_work_exit, + TP_PROTO(struct super_block *sb, u64 data, int ret), + TP_ARGS(sb, data, ret) +); DEFINE_EVENT(scoutfs_work_class, scoutfs_net_send_work_enter, TP_PROTO(struct super_block *sb, u64 data, int ret), TP_ARGS(sb, data, ret) diff --git a/kmod/src/server.c b/kmod/src/server.c index e8d4d42f..1077afe5 100644 --- a/kmod/src/server.c +++ b/kmod/src/server.c @@ -56,6 +56,7 @@ struct server_info { bool shutting_down; struct completion start_comp; struct sockaddr_in listen_sin; + u64 term; struct scoutfs_net_connection *conn; /* request processing coordinates committing manifest and alloc */ @@ -1072,11 +1073,16 @@ int scoutfs_server_lock_response(struct super_block *sb, u64 node_id, * response shuts down the connection. * * We allocate a new node_id for the first connect attempt from a - * client. We update the request node_id for the calling net layer to - * consume. + * client. * * If a client reconnects they'll send their initially assigned node_id * in their greeting request. + * + * XXX We can lose allocated node_ids here as we record the node_id as + * live as we send a valid greeting response. The client might + * disconnect before they receive the response and resent and initial + * blank greeting. We could use a client uuid to associate with + * allocated node_ids. */ static int server_greeting(struct super_block *sb, struct scoutfs_net_connection *conn, @@ -1088,6 +1094,9 @@ static int server_greeting(struct super_block *sb, DECLARE_SERVER_INFO(sb, server); struct commit_waiter cw; __le64 node_id = 0; + bool sent_node_id; + bool first_contact; + bool farewell; int ret = 0; if (arg_len != sizeof(struct scoutfs_net_greeting)) { @@ -1122,24 +1131,61 @@ static int server_greeting(struct super_block *sb, queue_commit_work(server, &cw); up_read(&server->commit_rwsem); ret = wait_for_commit(&cw); - if (ret) + if (ret) { + node_id = 0; goto out; + } } else { node_id = gr->node_id; } greet.fsid = super->hdr.fsid; greet.format_hash = super->format_hash; + greet.server_term = cpu_to_le64(server->term); greet.node_id = node_id; + greet.flags = 0; out: ret = scoutfs_net_response(sb, conn, cmd, id, ret, &greet, sizeof(greet)); - /* give net caller client's new node_id :/ */ - if (ret == 0 && node_id != 0) - gr->node_id = node_id; + if (node_id != 0 && ret == 0) { + sent_node_id = gr->node_id != 0; + first_contact = le64_to_cpu(gr->server_term) != server->term; + if (gr->flags & cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL)) + farewell = true; + else + farewell = false; + + scoutfs_net_server_greeting(sb, conn, le64_to_cpu(node_id), id, + sent_node_id, first_contact, + farewell); + } + return ret; } +/* + * The server is receiving a farewell message from a client that is + * unmounting. It won't send any more requests and once it receives our + * response it will not reconnect. + * + * XXX we should make sure that all our requests to the client have finished + * before we respond. Locking will have its own messaging for orderly + * shutdown. That leaves compaction which will be addressed as part of + * the larger work of recovering compactions that were in flight when + * a client crashed. + */ +static int server_farewell(struct super_block *sb, + struct scoutfs_net_connection *conn, + u8 cmd, u64 id, void *arg, u16 arg_len) +{ + if (arg_len != 0) + return -EINVAL; + + scoutfs_net_server_farewell(sb, conn); + + return scoutfs_net_response(sb, conn, cmd, id, 0, NULL, 0); +} + /* requests sent to clients are tracked so we can free resources */ struct compact_request { struct list_head head; @@ -1743,6 +1789,7 @@ static scoutfs_net_request_t server_req_funcs[] = { [SCOUTFS_NET_CMD_GET_MANIFEST_ROOT] = server_get_manifest_root, [SCOUTFS_NET_CMD_STATFS] = server_statfs, [SCOUTFS_NET_CMD_LOCK] = server_lock, + [SCOUTFS_NET_CMD_FAREWELL] = server_farewell, }; static void server_notify_up(struct super_block *sb, @@ -1880,13 +1927,15 @@ out: } /* XXX can we call start multiple times? */ -int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin) +int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin, + u64 term) { DECLARE_SERVER_INFO(sb, server); server->err = 0; server->shutting_down = false; server->listen_sin = *sin; + server->term = term; init_completion(&server->start_comp); queue_work(server->wq, &server->work); diff --git a/kmod/src/server.h b/kmod/src/server.h index 01aed49b..a843e437 100644 --- a/kmod/src/server.h +++ b/kmod/src/server.h @@ -22,11 +22,16 @@ do { \ __entry->name##_addr & 255, \ __entry->name##_port -#define SNH_FMT "id %llu data_len %u cmd %u flags 0x%x error %u" -#define SNH_ARG(nh) le64_to_cpu((nh)->id), le16_to_cpu((nh)->data_len), \ - (nh)->cmd, (nh)->flags, (nh)->error +#define SNH_FMT \ + "seq %llu recv_seq %llu id %llu data_len %u cmd %u flags 0x%x error %u" +#define SNH_ARG(nh) \ + le64_to_cpu((nh)->seq), le64_to_cpu((nh)->recv_seq), \ + le64_to_cpu((nh)->id), le16_to_cpu((nh)->data_len), (nh)->cmd, \ + (nh)->flags, (nh)->error #define snh_trace_define(name) \ + __field(__u64, name##_seq) \ + __field(__u64, name##_recv_seq) \ __field(__u64, name##_id) \ __field(__u16, name##_data_len) \ __field(__u8, name##_cmd) \ @@ -37,6 +42,8 @@ do { \ do { \ __typeof__(nh) _nh = (nh); \ \ + __entry->name##_seq = le64_to_cpu(_nh->seq); \ + __entry->name##_recv_seq = le64_to_cpu(_nh->recv_seq); \ __entry->name##_id = le64_to_cpu(_nh->id); \ __entry->name##_data_len = le16_to_cpu(_nh->data_len); \ __entry->name##_cmd = _nh->cmd; \ @@ -44,9 +51,10 @@ do { \ __entry->name##_error = _nh->error; \ } while (0) -#define snh_trace_args(name) \ - __entry->name##_id, __entry->name##_data_len, __entry->name##_cmd, \ - __entry->name##_flags, __entry->name##_error +#define snh_trace_args(name) \ + __entry->name##_seq, __entry->name##_recv_seq, __entry->name##_id, \ + __entry->name##_data_len, __entry->name##_cmd, __entry->name##_flags, \ + __entry->name##_error struct scoutfs_net_manifest_entry; struct scoutfs_manifest_entry; @@ -62,7 +70,8 @@ int scoutfs_server_lock_response(struct super_block *sb, u64 node_id, u64 id, struct scoutfs_net_lock *nl); struct sockaddr_in; -int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin); +int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin, + u64 term); void scoutfs_server_stop(struct super_block *sb); int scoutfs_server_setup(struct super_block *sb);