mirror of
https://github.com/versity/scoutfs.git
synced 2026-02-06 18:50:45 +00:00
scoutfs: make networking more reliable
The current networking code has loose reliability guarantees. If a connection between the client and server is broken then the client reconnects as though its an entirely new connection. The client resends requests but no responses are resent. A client's requests could be processed twice on the same server. The server throws away disconnected client state. This was fine, sort of, for the simple requests we had implemented so far. It's not good enough for the locking service which would prefer to let networking worry about reliable message delivery so it doesn't have to track and replay partial state across reconnection between the same client and server. This adds the infrastructure to ensure that requests and responses between a given client and server will be delivered across reconnected sockets and will only be processed once. The server keeps track of disconnected clients and restores state if the same client reconnects. This required some work around the greetings so that clients and servers can recognize each other. Now that the server remembers disconnected clients we add a farewell request so that servers can forget about clients that are shutting down and won't be reconnecting. Now that connections between the client and server are preserved we can resend responses across reconnection. We add outgoing message sequence numbers which are used to drop duplicates and communicate the received sequence back to the sender to free responses once they're received. When the client is reconnecting to a new server it resets its receive state that was dependent on the old server and it drops responses which were being sent to a server instance which no longer exists. This stronger reliable messaging guarantee will make it much easier to implement lock recovery which can now rewind state relative to requests that are in flight and replay existing state on a new server instance. Signed-off-by: Zach Brown <zab@versity.com>
This commit is contained in:
@@ -57,6 +57,12 @@ struct client_info {
|
||||
|
||||
struct scoutfs_quorum_elected_info qei;
|
||||
u64 old_elected_nr;
|
||||
|
||||
u64 server_term;
|
||||
|
||||
bool sending_farewell;
|
||||
int farewell_error;
|
||||
struct completion farewell_comp;
|
||||
};
|
||||
|
||||
/*
|
||||
@@ -281,7 +287,8 @@ static int client_greeting(struct super_block *sb,
|
||||
struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super;
|
||||
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
|
||||
struct scoutfs_net_greeting *gr = resp;
|
||||
int ret = 0;
|
||||
bool new_server;
|
||||
int ret;
|
||||
|
||||
if (error) {
|
||||
ret = error;
|
||||
@@ -328,6 +335,11 @@ static int client_greeting(struct super_block *sb,
|
||||
complete(&client->node_id_comp);
|
||||
}
|
||||
|
||||
new_server = le64_to_cpu(gr->server_term) != client->server_term;
|
||||
scoutfs_net_client_greeting(sb, conn, new_server);
|
||||
|
||||
client->server_term = le64_to_cpu(gr->server_term);
|
||||
ret = 0;
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
@@ -398,7 +410,7 @@ static void scoutfs_client_connect_worker(struct work_struct *work)
|
||||
goto out;
|
||||
|
||||
if (qei->run_server) {
|
||||
ret = scoutfs_server_start(sb, &qei->sin);
|
||||
ret = scoutfs_server_start(sb, &qei->sin, qei->elected_nr);
|
||||
if (ret) {
|
||||
/* forget that we tried to start the server */
|
||||
memset(qei, 0, sizeof(*qei));
|
||||
@@ -423,7 +435,11 @@ static void scoutfs_client_connect_worker(struct work_struct *work)
|
||||
/* send a greeting to verify endpoints of each connection */
|
||||
greet.fsid = super->hdr.fsid;
|
||||
greet.format_hash = super->format_hash;
|
||||
greet.server_term = cpu_to_le64(client->server_term);
|
||||
greet.node_id = cpu_to_le64(sbi->node_id);
|
||||
greet.flags = 0;
|
||||
if (client->sending_farewell)
|
||||
greet.flags |= cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL);
|
||||
|
||||
ret = scoutfs_net_submit_request(sb, client->conn,
|
||||
SCOUTFS_NET_CMD_GREETING,
|
||||
@@ -537,6 +553,7 @@ int scoutfs_client_setup(struct super_block *sb)
|
||||
init_completion(&client->node_id_comp);
|
||||
atomic_set(&client->shutting_down, 0);
|
||||
INIT_WORK(&client->connect_work, scoutfs_client_connect_worker);
|
||||
init_completion(&client->farewell_comp);
|
||||
|
||||
client->conn = scoutfs_net_alloc_conn(sb, NULL, client_notify_down, 0,
|
||||
client_req_funcs, "client");
|
||||
@@ -560,34 +577,89 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Once we get a response from the server we can shut down */
|
||||
static int client_farewell_response(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
void *resp, unsigned int resp_len,
|
||||
int error, void *data)
|
||||
{
|
||||
struct client_info *client = SCOUTFS_SB(sb)->client_info;
|
||||
|
||||
if (resp_len != 0)
|
||||
return -EINVAL;
|
||||
|
||||
client->farewell_error = error;
|
||||
complete(&client->farewell_comp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* There must be no more callers to the client request functions by the
|
||||
* time we get here.
|
||||
*
|
||||
* If we've connected to a server then we send them a farewell request
|
||||
* so that they don't wait for us to reconnect and trigger a timeout.
|
||||
*
|
||||
* This decision is a little racy. The server considers us connected
|
||||
* when it assigns us a node_id as it processes the greeting. We can
|
||||
* disconnect before receiving the response and leave without sending a
|
||||
* farewell. So given that awkward initial race, we also have a bit of
|
||||
* a race where we just test the server_term to see if we've ever gotten
|
||||
* a greeting reply from any server. We don't try to synchronize with
|
||||
* pending connection attempts.
|
||||
*
|
||||
* The consequences of aborting a mount at just the wrong time and
|
||||
* disconnecting without the farewell handshake depend on what the
|
||||
* server does to timed out clients. At best it'll spit out a warning
|
||||
* message that a client disconnected but it won't fence us if we didn't
|
||||
* have any persistent state.
|
||||
*/
|
||||
void scoutfs_client_destroy(struct super_block *sb)
|
||||
{
|
||||
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
|
||||
struct client_info *client = SCOUTFS_SB(sb)->client_info;
|
||||
struct scoutfs_net_connection *conn;
|
||||
int ret;
|
||||
|
||||
if (client) {
|
||||
/* stop notify_down from queueing connect work */
|
||||
atomic_set(&client->shutting_down, 1);
|
||||
if (client == NULL)
|
||||
return;
|
||||
|
||||
/* make sure worker isn't using the conn */
|
||||
cancel_work_sync(&client->connect_work);
|
||||
|
||||
/* make racing conn use explode */
|
||||
conn = client->conn;
|
||||
client->conn = NULL;
|
||||
scoutfs_net_free_conn(sb, conn);
|
||||
|
||||
/* stop running the server if we were, harmless otherwise */
|
||||
stop_our_server(sb, &client->qei);
|
||||
|
||||
if (client->workq)
|
||||
destroy_workqueue(client->workq);
|
||||
kfree(client);
|
||||
sbi->client_info = NULL;
|
||||
if (client->server_term != 0) {
|
||||
client->sending_farewell = true;
|
||||
ret = scoutfs_net_submit_request(sb, client->conn,
|
||||
SCOUTFS_NET_CMD_FAREWELL,
|
||||
NULL, 0,
|
||||
client_farewell_response,
|
||||
NULL, NULL);
|
||||
if (ret == 0) {
|
||||
ret = wait_for_completion_interruptible(
|
||||
&client->farewell_comp);
|
||||
if (ret == 0)
|
||||
ret = client->farewell_error;
|
||||
}
|
||||
if (ret) {
|
||||
scoutfs_inc_counter(sb, client_farewell_error);
|
||||
scoutfs_warn(sb, "client saw farewell error %d, server might see client connection time out\n", ret);
|
||||
}
|
||||
}
|
||||
|
||||
/* stop notify_down from queueing connect work */
|
||||
atomic_set(&client->shutting_down, 1);
|
||||
|
||||
/* make sure worker isn't using the conn */
|
||||
cancel_work_sync(&client->connect_work);
|
||||
|
||||
/* make racing conn use explode */
|
||||
conn = client->conn;
|
||||
client->conn = NULL;
|
||||
scoutfs_net_free_conn(sb, conn);
|
||||
|
||||
/* stop running the server if we were, harmless otherwise */
|
||||
stop_our_server(sb, &client->qei);
|
||||
|
||||
if (client->workq)
|
||||
destroy_workqueue(client->workq);
|
||||
kfree(client);
|
||||
sbi->client_info = NULL;
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
EXPAND_COUNTER(btree_read_error) \
|
||||
EXPAND_COUNTER(btree_stale_read) \
|
||||
EXPAND_COUNTER(btree_write_error) \
|
||||
EXPAND_COUNTER(client_farewell_error) \
|
||||
EXPAND_COUNTER(compact_invalid_request) \
|
||||
EXPAND_COUNTER(compact_operations) \
|
||||
EXPAND_COUNTER(compact_segment_busy) \
|
||||
@@ -110,6 +111,7 @@
|
||||
EXPAND_COUNTER(net_send_error) \
|
||||
EXPAND_COUNTER(net_send_messages) \
|
||||
EXPAND_COUNTER(net_recv_bytes) \
|
||||
EXPAND_COUNTER(net_recv_dropped_duplicate) \
|
||||
EXPAND_COUNTER(net_recv_error) \
|
||||
EXPAND_COUNTER(net_recv_invalid_message) \
|
||||
EXPAND_COUNTER(net_recv_messages) \
|
||||
|
||||
@@ -571,23 +571,53 @@ enum {
|
||||
*/
|
||||
|
||||
/*
|
||||
* Greetings verify identity of communicating nodes. The sender
|
||||
* sends their credentials and the receiver verifies them.
|
||||
* Greetings verify identity of communicating nodes. The sender sends
|
||||
* their credentials and the receiver verifies them.
|
||||
*
|
||||
* @server_term: The raft term that elected the server. Initially 0
|
||||
* from the client, sent by the server, then sent by the client as it
|
||||
* tries to reconnect. Used to identify a client reconnecting to a
|
||||
* server that has timed out its connection.
|
||||
*
|
||||
* @node_id: The id of the client. Initially 0 from the client,
|
||||
* assigned by the server, and sent by the client as it reconnects.
|
||||
* Used by the server to identify reconnecting clients whose existing
|
||||
* state must be dealt with.
|
||||
*/
|
||||
struct scoutfs_net_greeting {
|
||||
__le64 fsid;
|
||||
__le64 format_hash;
|
||||
__le64 server_term;
|
||||
__le64 node_id;
|
||||
__le64 flags;
|
||||
} __packed;
|
||||
|
||||
#define SCOUTFS_NET_GREETING_FLAG_FAREWELL (1 << 0)
|
||||
#define SCOUTFS_NET_GREETING_FLAG_INVALID (~(__u64)0 << 1)
|
||||
|
||||
/*
|
||||
* This header precedes and describes all network messages sent over
|
||||
* sockets. The id is set by the request and sent in the response.
|
||||
* sockets.
|
||||
*
|
||||
* @seq: A sequence number that is increased for each message queued for
|
||||
* send on the sender. The sender will never reorder messages in the
|
||||
* send queue so this will always increase in recv on the receiver. The
|
||||
* receiver can use this to drop messages that arrived twice after being
|
||||
* resent across a newly connected socket for a given connection.
|
||||
*
|
||||
* @recv_seq: The sequence number of the last received message. The
|
||||
* receiver is sending this to the sender in every message. The sender
|
||||
* uses them to drop responses which have been delivered.
|
||||
*
|
||||
* @id: An increasing identifier that is set in each request. Responses
|
||||
* specify the request that they're responding to.
|
||||
*
|
||||
* Error is only set to a translated errno and will only be found in
|
||||
* response messages.
|
||||
*/
|
||||
struct scoutfs_net_header {
|
||||
__le64 seq;
|
||||
__le64 recv_seq;
|
||||
__le64 id;
|
||||
__le16 data_len;
|
||||
__u8 cmd;
|
||||
@@ -612,6 +642,7 @@ enum {
|
||||
SCOUTFS_NET_CMD_STATFS,
|
||||
SCOUTFS_NET_CMD_COMPACT,
|
||||
SCOUTFS_NET_CMD_LOCK,
|
||||
SCOUTFS_NET_CMD_FAREWELL,
|
||||
SCOUTFS_NET_CMD_UNKNOWN,
|
||||
};
|
||||
|
||||
|
||||
536
kmod/src/net.c
536
kmod/src/net.c
@@ -41,38 +41,39 @@
|
||||
* Both set up a connection and specify the set of request commands they
|
||||
* can process.
|
||||
*
|
||||
* Requests are tracked on a connection and sent to its peer. They're
|
||||
* Request and response messages are queued on a connection. They're
|
||||
* resent down newly established sockets on a long lived connection.
|
||||
* Queued requests are removed as a response is processed or if the
|
||||
* request is canceled by the sender.
|
||||
* request is canceled by the sender. Queued responses are removed as
|
||||
* the receiver acknowledges their delivery.
|
||||
*
|
||||
* Request processing sends a response down the socket that received a
|
||||
* connection. Processing is stopped as a socket is shutdown so
|
||||
* responses are only send down sockets that received a request.
|
||||
* Request and response resending is asymmetrical because of the
|
||||
* client/server relationship. If a client connects to a new server it
|
||||
* drops responses because the new server doesn't have any requests
|
||||
* pending. If a server times out a client it drops everything because
|
||||
* that client is never coming back.
|
||||
*
|
||||
* Thus requests can be received multiple times as sockets are shutdown
|
||||
* and reconnected. Responses are only processed once for a given
|
||||
* request. It is up to request and response implementations to ensure
|
||||
* that duplicate requests are safely handled.
|
||||
* Requests and responses are only processed once for a given client and
|
||||
* server pair. Callers have to deal with the possibility that two
|
||||
* servers might both process the same client request, even though the
|
||||
* client may only see the most recent response.
|
||||
*
|
||||
* It turns out that we have to deal with duplicate request processing
|
||||
* at the layer above networking anyway. Request processing can make
|
||||
* persistent changes that are committed on the server before it
|
||||
* crashes. The client then reconnects to before it crashes and the
|
||||
* client reconnects to a server who must detect that the persistent
|
||||
* work on behalf of the resent request has already been committed. If
|
||||
* we have to deal with that duplicate processing we may as well
|
||||
* simplify networking by allowing it between reconnecting peers as
|
||||
* well.
|
||||
* The functional core of this implementation is solid, but boy are the
|
||||
* interface boundaries getting fuzzy. The core knows too much about
|
||||
* clients and servers and the communications across the net interface
|
||||
* boundary are questionable. We probably want to pull more client and
|
||||
* server specific behaviour up into the client and server and turn the
|
||||
* "net" code into more passive shared framing helpers.
|
||||
*
|
||||
* XXX:
|
||||
* - defer accepted conn destruction until reconnect timeout
|
||||
* - trace command and response data payloads
|
||||
* - checksum message contents?
|
||||
* - explicit shutdown message to free accepted, timeout and fence otherwise
|
||||
* - shutdown server if accept can't alloc resources for new conn?
|
||||
*/
|
||||
|
||||
/* reasonable multiple of max client reconnect attempt interval */
|
||||
#define CLIENT_RECONNECT_TIMEOUT_MS (20 * MSEC_PER_SEC)
|
||||
|
||||
/*
|
||||
* A connection's shutdown work executes in its own workqueue so that the
|
||||
* work can free the connection's workq.
|
||||
@@ -97,13 +98,19 @@ struct scoutfs_net_connection {
|
||||
|
||||
unsigned long valid_greeting:1, /* other commands can proceed */
|
||||
established:1, /* added sends queue send work */
|
||||
shutting_down:1; /* shutdown work has been queued */
|
||||
shutting_down:1, /* shutdown work has been queued */
|
||||
saw_greeting:1, /* saw greeting on this sock */
|
||||
saw_farewell:1, /* saw farewell request from client */
|
||||
reconn_wait:1, /* shutdown, waiting for reconnect */
|
||||
reconn_freeing:1; /* waiting done, setter frees */
|
||||
unsigned long reconn_deadline;
|
||||
|
||||
struct sockaddr_in connect_sin;
|
||||
unsigned long connect_timeout_ms;
|
||||
|
||||
struct socket *sock;
|
||||
u64 node_id; /* assigned during greeting */
|
||||
u64 greeting_id;
|
||||
struct sockaddr_in sockname;
|
||||
struct sockaddr_in peername;
|
||||
|
||||
@@ -111,21 +118,25 @@ struct scoutfs_net_connection {
|
||||
struct scoutfs_net_connection *listening_conn;
|
||||
struct list_head accepted_list;
|
||||
|
||||
u64 next_send_seq;
|
||||
u64 next_send_id;
|
||||
struct list_head send_queue;
|
||||
struct list_head resend_queue;
|
||||
|
||||
atomic64_t recv_seq;
|
||||
|
||||
struct workqueue_struct *workq;
|
||||
struct work_struct listen_work;
|
||||
struct work_struct connect_work;
|
||||
struct work_struct send_work;
|
||||
struct work_struct recv_work;
|
||||
struct work_struct shutdown_work;
|
||||
struct delayed_work reconn_free_dwork;
|
||||
/* message_recv proc_work also executes in the conn workq */
|
||||
|
||||
struct scoutfs_tseq_entry tseq_entry;
|
||||
|
||||
u8 info[0] __aligned(sizeof(u64));
|
||||
void *info;
|
||||
};
|
||||
|
||||
/* listening and their accepting sockets have a fixed locking order */
|
||||
@@ -186,6 +197,10 @@ static bool nh_is_request(struct scoutfs_net_header *nh)
|
||||
return !nh_is_response(nh);
|
||||
}
|
||||
|
||||
/*
|
||||
* We return dead requests so that the caller can stop searching other
|
||||
* lists for the dead request that we found.
|
||||
*/
|
||||
static struct message_send *search_list(struct scoutfs_net_connection *conn,
|
||||
struct list_head *list,
|
||||
u8 cmd, u64 id)
|
||||
@@ -343,6 +358,7 @@ static int submit_send(struct super_block *sb,
|
||||
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
||||
struct scoutfs_net_connection *acc_conn;
|
||||
struct message_send *msend;
|
||||
u64 seq;
|
||||
|
||||
if (WARN_ON_ONCE(cmd >= SCOUTFS_NET_CMD_UNKNOWN) ||
|
||||
WARN_ON_ONCE(flags & SCOUTFS_NET_FLAGS_UNKNOWN) ||
|
||||
@@ -378,12 +394,16 @@ static int submit_send(struct super_block *sb,
|
||||
}
|
||||
}
|
||||
|
||||
seq = conn->next_send_seq++;
|
||||
if (id == 0)
|
||||
id = conn->next_send_id++;
|
||||
|
||||
msend->resp_func = resp_func;
|
||||
msend->resp_data = resp_data;
|
||||
msend->dead = 0;
|
||||
|
||||
if (id == 0)
|
||||
id = conn->next_send_id++;
|
||||
msend->nh.seq = cpu_to_le64(seq);
|
||||
msend->nh.recv_seq = 0; /* set when sent, not when queued */
|
||||
msend->nh.id = cpu_to_le64(id);
|
||||
msend->nh.cmd = cmd;
|
||||
msend->nh.flags = flags;
|
||||
@@ -411,36 +431,7 @@ static int submit_send(struct super_block *sb,
|
||||
}
|
||||
|
||||
/*
|
||||
* Messages can flow once we receive and process a valid greeting from
|
||||
* our peer.
|
||||
*
|
||||
* At this point recv processing has queued the greeting response
|
||||
* message on the send queue. Any request messages waiting to be resent
|
||||
* need to be added to the end of the send queue after the greeting
|
||||
* response.
|
||||
*
|
||||
* Update the conn's node_id so that servers can send to specific
|
||||
* clients.
|
||||
*/
|
||||
static void saw_valid_greeting(struct scoutfs_net_connection *conn, u64 node_id)
|
||||
{
|
||||
struct super_block *sb = conn->sb;
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
conn->valid_greeting = 1;
|
||||
conn->node_id = node_id;
|
||||
list_splice_tail_init(&conn->resend_queue, &conn->send_queue);
|
||||
queue_work(conn->workq, &conn->send_work);
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
if (conn->notify_up)
|
||||
conn->notify_up(sb, conn, conn->info, node_id);
|
||||
}
|
||||
|
||||
/*
|
||||
* Process an incoming response. The greeting should ensure that the
|
||||
* Process an incoming request. The greeting should ensure that the
|
||||
* sender won't send us unknown commands. We return an error if we see
|
||||
* an unknown command because the greeting should agree on an understood
|
||||
* protocol. The request function sends a response and returns an error
|
||||
@@ -451,8 +442,6 @@ static int process_request(struct scoutfs_net_connection *conn,
|
||||
{
|
||||
struct super_block *sb = conn->sb;
|
||||
scoutfs_net_request_t req_func;
|
||||
struct scoutfs_net_greeting *gr;
|
||||
int ret;
|
||||
|
||||
if (mrecv->nh.cmd < SCOUTFS_NET_CMD_UNKNOWN)
|
||||
req_func = conn->req_funcs[mrecv->nh.cmd];
|
||||
@@ -464,21 +453,8 @@ static int process_request(struct scoutfs_net_connection *conn,
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
ret = req_func(sb, conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id),
|
||||
mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len));
|
||||
|
||||
/*
|
||||
* Greeting response updates our *request* node_id so that
|
||||
* we can consume a new allocation without callbacks. We're
|
||||
* about to free the recv in the caller anyway.
|
||||
*/
|
||||
if (!conn->valid_greeting &&
|
||||
mrecv->nh.cmd == SCOUTFS_NET_CMD_GREETING && ret == 0) {
|
||||
gr = (void *)mrecv->nh.data;
|
||||
saw_valid_greeting(conn, le64_to_cpu(gr->node_id));
|
||||
}
|
||||
|
||||
return ret;
|
||||
return req_func(sb, conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id),
|
||||
mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len));
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -514,11 +490,6 @@ static int process_response(struct scoutfs_net_connection *conn,
|
||||
ret = resp_func(sb, conn, mrecv->nh.data,
|
||||
le16_to_cpu(mrecv->nh.data_len),
|
||||
net_err_to_host(mrecv->nh.error), resp_data);
|
||||
|
||||
if (!conn->valid_greeting &&
|
||||
mrecv->nh.cmd == SCOUTFS_NET_CMD_GREETING && msend && ret == 0)
|
||||
saw_valid_greeting(conn, 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -553,6 +524,49 @@ static void scoutfs_net_proc_worker(struct work_struct *work)
|
||||
trace_scoutfs_net_proc_work_exit(sb, 0, ret);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free live responses up to and including the seq by marking them dead
|
||||
* and moving them to the send queue to be freed.
|
||||
*/
|
||||
static int move_acked_responses(struct scoutfs_net_connection *conn,
|
||||
struct list_head *list, u64 seq)
|
||||
{
|
||||
struct message_send *msend;
|
||||
struct message_send *tmp;
|
||||
int ret = 0;
|
||||
|
||||
assert_spin_locked(&conn->lock);
|
||||
|
||||
list_for_each_entry_safe(msend, tmp, list, head) {
|
||||
if (le64_to_cpu(msend->nh.seq) > seq)
|
||||
break;
|
||||
if (!nh_is_response(&msend->nh) || msend->dead)
|
||||
continue;
|
||||
|
||||
msend->dead = 1;
|
||||
list_move(&msend->head, &conn->send_queue);
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* acks are processed inline in the recv worker */
|
||||
static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq)
|
||||
{
|
||||
int moved;
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
moved = move_acked_responses(conn, &conn->send_queue, seq) +
|
||||
move_acked_responses(conn, &conn->resend_queue, seq);
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
if (moved)
|
||||
queue_work(conn->workq, &conn->send_work);
|
||||
}
|
||||
|
||||
static int recvmsg_full(struct socket *sock, void *buf, unsigned len)
|
||||
{
|
||||
struct msghdr msg;
|
||||
@@ -578,10 +592,11 @@ static int recvmsg_full(struct socket *sock, void *buf, unsigned len)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool invalid_message(struct scoutfs_net_header *nh)
|
||||
static bool invalid_message(struct scoutfs_net_connection *conn,
|
||||
struct scoutfs_net_header *nh)
|
||||
{
|
||||
/* ids must be non-zero */
|
||||
if (nh->id == 0)
|
||||
/* seq and id must be non-zero */
|
||||
if (nh->seq == 0 || nh->id == 0)
|
||||
return true;
|
||||
|
||||
/* greeting should negotiate understood protocol */
|
||||
@@ -598,6 +613,16 @@ static bool invalid_message(struct scoutfs_net_header *nh)
|
||||
if (nh_is_request(nh) && nh->error != SCOUTFS_NET_ERR_NONE)
|
||||
return true;
|
||||
|
||||
if (nh->cmd == SCOUTFS_NET_CMD_GREETING) {
|
||||
/* each endpoint can only receive one greeting per socket */
|
||||
if (conn->saw_greeting)
|
||||
return true;
|
||||
|
||||
/* servers get greeting requests, clients get responses */
|
||||
if (!!conn->listening_conn != !!nh_is_request(nh))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -625,7 +650,7 @@ static void scoutfs_net_recv_worker(struct work_struct *work)
|
||||
break;
|
||||
|
||||
/* receiving an invalid message breaks the connection */
|
||||
if (invalid_message(&nh)) {
|
||||
if (invalid_message(conn, &nh)) {
|
||||
scoutfs_inc_counter(sb, net_recv_invalid_message);
|
||||
ret = -EBADMSG;
|
||||
break;
|
||||
@@ -657,8 +682,31 @@ static void scoutfs_net_recv_worker(struct work_struct *work)
|
||||
break;
|
||||
}
|
||||
|
||||
if (nh.cmd == SCOUTFS_NET_CMD_GREETING) {
|
||||
/* greetings are out of band, no seq mechanics */
|
||||
conn->saw_greeting = 1;
|
||||
|
||||
} else if (le64_to_cpu(nh.seq) <=
|
||||
atomic64_read(&conn->recv_seq)) {
|
||||
/* drop any resent duplicated messages */
|
||||
scoutfs_inc_counter(sb, net_recv_dropped_duplicate);
|
||||
kfree(mrecv);
|
||||
continue;
|
||||
|
||||
} else {
|
||||
/* record that we've received sender's seq */
|
||||
atomic64_set(&conn->recv_seq, le64_to_cpu(nh.seq));
|
||||
/* and free our responses that sender has received */
|
||||
free_acked_responses(conn, le64_to_cpu(nh.recv_seq));
|
||||
}
|
||||
|
||||
scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry);
|
||||
queue_work(conn->workq, &mrecv->proc_work);
|
||||
|
||||
/* synchronously process greeting before next recvmsg */
|
||||
if (nh.cmd == SCOUTFS_NET_CMD_GREETING)
|
||||
scoutfs_net_proc_worker(&mrecv->proc_work);
|
||||
else
|
||||
queue_work(conn->workq, &mrecv->proc_work);
|
||||
}
|
||||
|
||||
if (ret)
|
||||
@@ -712,6 +760,10 @@ static void free_msend(struct net_info *ninf, struct message_send *msend)
|
||||
* The worker is responsible for freeing messages so that other contexts
|
||||
* don't have to worry about freeing a message while we're blocked
|
||||
* sending it without the lock held.
|
||||
*
|
||||
* We set the current recv_seq on every outgoing frame as it represents
|
||||
* the current connection state, not the state back when each message
|
||||
* was first queued.
|
||||
*/
|
||||
static void scoutfs_net_send_worker(struct work_struct *work)
|
||||
{
|
||||
@@ -734,6 +786,9 @@ static void scoutfs_net_send_worker(struct work_struct *work)
|
||||
continue;
|
||||
}
|
||||
|
||||
msend->nh.recv_seq =
|
||||
cpu_to_le64(atomic64_read(&conn->recv_seq));
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
len = nh_bytes(le16_to_cpu(msend->nh.data_len));
|
||||
@@ -747,14 +802,14 @@ static void scoutfs_net_send_worker(struct work_struct *work)
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
msend->nh.recv_seq = 0;
|
||||
|
||||
if (ret)
|
||||
break;
|
||||
|
||||
/* active requests are resent, everything else is freed */
|
||||
if (nh_is_request(&msend->nh) && !msend->dead)
|
||||
/* resend if it wasn't freed while we sent */
|
||||
if (!msend->dead)
|
||||
list_move_tail(&msend->head, &conn->resend_queue);
|
||||
else
|
||||
msend->dead = 1;
|
||||
}
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
@@ -778,6 +833,10 @@ static void destroy_conn(struct scoutfs_net_connection *conn)
|
||||
WARN_ON_ONCE(conn->sock != NULL);
|
||||
WARN_ON_ONCE(!list_empty(&conn->accepted_list));
|
||||
|
||||
/* tell callers that accepted connection finally done */
|
||||
if (conn->listening_conn && conn->notify_down)
|
||||
conn->notify_down(sb, conn, conn->info, conn->node_id);
|
||||
|
||||
/* free all messages, refactor and complete for forced unmount? */
|
||||
list_splice_init(&conn->resend_queue, &conn->send_queue);
|
||||
list_for_each_entry_safe(msend, tmp, &conn->send_queue, head) {
|
||||
@@ -797,6 +856,7 @@ static void destroy_conn(struct scoutfs_net_connection *conn)
|
||||
|
||||
destroy_workqueue(conn->workq);
|
||||
scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry);
|
||||
kfree(conn->info);
|
||||
kfree(conn);
|
||||
}
|
||||
|
||||
@@ -1025,9 +1085,11 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work)
|
||||
DEFINE_CONN_FROM_WORK(conn, work, shutdown_work);
|
||||
struct super_block *sb = conn->sb;
|
||||
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
||||
struct scoutfs_net_connection *listener;
|
||||
struct scoutfs_net_connection *acc_conn;
|
||||
struct message_send *msend;
|
||||
struct message_send *tmp;
|
||||
unsigned long delay;
|
||||
|
||||
trace_scoutfs_net_shutdown_work_enter(sb, 0, 0);
|
||||
|
||||
@@ -1062,39 +1124,119 @@ static void scoutfs_net_shutdown_worker(struct work_struct *work)
|
||||
spin_unlock(&acc_conn->lock);
|
||||
}
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
/* free any conns waiting for reconnection */
|
||||
cancel_delayed_work_sync(&conn->reconn_free_dwork);
|
||||
queue_delayed_work(conn->workq, &conn->reconn_free_dwork, 0);
|
||||
/* relies on delay 0 scheduling immediately so no timer to cancel */
|
||||
flush_delayed_work(&conn->reconn_free_dwork);
|
||||
|
||||
/* and wait for accepted conn shutdown work to finish */
|
||||
wait_event(conn->waitq, empty_accepted_list(conn));
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
/* resend any pending requests, drop responses or greetings */
|
||||
/* greetings aren't resent across sockets */
|
||||
list_splice_tail_init(&conn->send_queue, &conn->resend_queue);
|
||||
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head) {
|
||||
if (nh_is_response(&msend->nh) ||
|
||||
msend->nh.cmd == SCOUTFS_NET_CMD_GREETING)
|
||||
if (msend->nh.cmd == SCOUTFS_NET_CMD_GREETING)
|
||||
free_msend(ninf, msend);
|
||||
}
|
||||
|
||||
conn->saw_greeting = 0;
|
||||
|
||||
/* signal connect failure */
|
||||
memset(&conn->connect_sin, 0, sizeof(conn->connect_sin));
|
||||
wake_up(&conn->waitq);
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
/* tell the caller that the connection is down */
|
||||
if (conn->notify_down)
|
||||
conn->notify_down(sb, conn, conn->info, conn->node_id);
|
||||
/* resolve racing with listener shutdown with locked shutting_down */
|
||||
if (conn->listening_conn &&
|
||||
(conn->listening_conn->shutting_down || conn->saw_farewell)) {
|
||||
|
||||
/* accepted conns are destroyed */
|
||||
if (conn->listening_conn) {
|
||||
/* free accepted sockets after farewell or listener shutdown */
|
||||
spin_unlock(&conn->lock);
|
||||
destroy_conn(conn);
|
||||
|
||||
} else {
|
||||
spin_lock(&conn->lock);
|
||||
conn->shutting_down = 0;
|
||||
|
||||
if (conn->listening_conn) {
|
||||
/* server accepted sockets wait for reconnect */
|
||||
listener = conn->listening_conn;
|
||||
delay = msecs_to_jiffies(CLIENT_RECONNECT_TIMEOUT_MS);
|
||||
conn->reconn_wait = 1;
|
||||
conn->reconn_deadline = jiffies + delay;
|
||||
queue_delayed_work(listener->workq,
|
||||
&listener->reconn_free_dwork, delay);
|
||||
} else {
|
||||
/* clients and listeners can retry */
|
||||
conn->shutting_down = 0;
|
||||
if (conn->notify_down)
|
||||
conn->notify_down(sb, conn, conn->info,
|
||||
conn->node_id);
|
||||
}
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
}
|
||||
|
||||
trace_scoutfs_net_shutdown_work_exit(sb, 0, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free any connections that have been shutdown for too long without the
|
||||
* client reconnecting. This runs in work on the listening connection.
|
||||
* It's racing with connection attempts searching for shutdown
|
||||
* connections to steal state from. Shutdown cancels the work and waits
|
||||
* for it to finish.
|
||||
*
|
||||
* Connections are currently freed without the lock held so this walks
|
||||
* the entire list every time it frees a connection. This is irritating
|
||||
* but timed out connections are rare and client counts are relatively
|
||||
* low given a cpu's ability to burn through the list.
|
||||
*/
|
||||
static void scoutfs_net_reconn_free_worker(struct work_struct *work)
|
||||
{
|
||||
DEFINE_CONN_FROM_WORK(conn, work, reconn_free_dwork.work);
|
||||
struct super_block *sb = conn->sb;
|
||||
struct scoutfs_net_connection *acc;
|
||||
unsigned long now = jiffies;
|
||||
unsigned long deadline = 0;
|
||||
bool requeue = false;
|
||||
|
||||
trace_scoutfs_net_reconn_free_work_enter(sb, 0, 0);
|
||||
|
||||
restart:
|
||||
spin_lock(&conn->lock);
|
||||
list_for_each_entry(acc, &conn->accepted_list, accepted_head) {
|
||||
|
||||
if (acc->reconn_wait && !acc->reconn_freeing &&
|
||||
(conn->shutting_down ||
|
||||
time_after_eq(now, acc->reconn_deadline))) {
|
||||
acc->reconn_freeing = 1;
|
||||
spin_unlock(&conn->lock);
|
||||
if (!conn->shutting_down)
|
||||
scoutfs_info(sb, "client timed out "SIN_FMT" -> "SIN_FMT", can not reconnect",
|
||||
SIN_ARG(&acc->sockname),
|
||||
SIN_ARG(&acc->peername));
|
||||
destroy_conn(acc);
|
||||
goto restart;
|
||||
}
|
||||
|
||||
/* calc delay of next work, can drift a bit */
|
||||
if (acc->reconn_wait && !acc->reconn_freeing &&
|
||||
(!requeue || time_before(now, deadline))) {
|
||||
requeue = true;
|
||||
deadline = acc->reconn_deadline;
|
||||
}
|
||||
}
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
if (requeue)
|
||||
queue_delayed_work(conn->workq, &conn->reconn_free_dwork,
|
||||
deadline - now);
|
||||
|
||||
trace_scoutfs_net_reconn_free_work_exit(sb, 0, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Accepted connections inherit the callbacks from their listening
|
||||
* connection.
|
||||
@@ -1116,15 +1258,21 @@ scoutfs_net_alloc_conn(struct super_block *sb,
|
||||
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
||||
struct scoutfs_net_connection *conn;
|
||||
|
||||
conn = kzalloc(offsetof(struct scoutfs_net_connection,
|
||||
info[info_size]), GFP_NOFS);
|
||||
conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS);
|
||||
if (!conn)
|
||||
return NULL;
|
||||
|
||||
conn->info = kzalloc(info_size, GFP_NOFS);
|
||||
if (!conn->info) {
|
||||
kfree(conn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
conn->workq = alloc_workqueue("scoutfs_net_%s",
|
||||
WQ_UNBOUND | WQ_NON_REENTRANT, 0,
|
||||
name_suffix);
|
||||
if (!conn->workq) {
|
||||
kfree(conn->info);
|
||||
kfree(conn);
|
||||
return NULL;
|
||||
}
|
||||
@@ -1140,7 +1288,9 @@ scoutfs_net_alloc_conn(struct super_block *sb,
|
||||
conn->peername.sin_family = AF_INET;
|
||||
INIT_LIST_HEAD(&conn->accepted_head);
|
||||
INIT_LIST_HEAD(&conn->accepted_list);
|
||||
conn->next_send_seq = 1;
|
||||
conn->next_send_id = 1;
|
||||
atomic64_set(&conn->recv_seq, 0);
|
||||
INIT_LIST_HEAD(&conn->send_queue);
|
||||
INIT_LIST_HEAD(&conn->resend_queue);
|
||||
INIT_WORK(&conn->listen_work, scoutfs_net_listen_worker);
|
||||
@@ -1148,6 +1298,8 @@ scoutfs_net_alloc_conn(struct super_block *sb,
|
||||
INIT_WORK(&conn->send_work, scoutfs_net_send_worker);
|
||||
INIT_WORK(&conn->recv_work, scoutfs_net_recv_worker);
|
||||
INIT_WORK(&conn->shutdown_work, scoutfs_net_shutdown_worker);
|
||||
INIT_DELAYED_WORK(&conn->reconn_free_dwork,
|
||||
scoutfs_net_reconn_free_worker);
|
||||
|
||||
scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry);
|
||||
|
||||
@@ -1302,6 +1454,185 @@ int scoutfs_net_connect(struct super_block *sb,
|
||||
return ret ?: error;
|
||||
}
|
||||
|
||||
static void set_valid_greeting(struct scoutfs_net_connection *conn)
|
||||
{
|
||||
assert_spin_locked(&conn->lock);
|
||||
|
||||
/* recv should have dropped invalid duplicate greeting messages */
|
||||
BUG_ON(conn->valid_greeting);
|
||||
|
||||
conn->valid_greeting = 1;
|
||||
list_splice_tail_init(&conn->resend_queue, &conn->send_queue);
|
||||
queue_work(conn->workq, &conn->send_work);
|
||||
}
|
||||
|
||||
/*
|
||||
* The client has received a valid greeting from the server. Send
|
||||
* can proceed and we might need to reset our recv state if we reconnected
|
||||
* to a new server.
|
||||
*/
|
||||
void scoutfs_net_client_greeting(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
bool new_server)
|
||||
{
|
||||
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
||||
struct message_send *msend;
|
||||
struct message_send *tmp;
|
||||
|
||||
/* only called on client connections :/ */
|
||||
BUG_ON(conn->listening_conn);
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
if (new_server) {
|
||||
atomic64_set(&conn->recv_seq, 0);
|
||||
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head){
|
||||
if (nh_is_response(&msend->nh))
|
||||
free_msend(ninf, msend);
|
||||
}
|
||||
}
|
||||
|
||||
set_valid_greeting(conn);
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
/* client up/down drives reconnect */
|
||||
if (conn->notify_up)
|
||||
conn->notify_up(sb, conn, conn->info, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* The calling server has received a valid greeting from a client. If
|
||||
* the server is reconnecting to us then we need to find its old
|
||||
* connection that held its state and transfer it to this connection
|
||||
* (connection and socket life cycles make this easier than migrating
|
||||
* the socket between the connections).
|
||||
*
|
||||
* The previous connection that holds the client's state might still be
|
||||
* in active use depending on network failure and work processing races.
|
||||
* We shut it down before migrating its message state. We can be
|
||||
* processing greetings from multiple reconnecting sockets that are all
|
||||
* referring to the same original connection. We use the increasing
|
||||
* greeting id to have the most recent connection attempt win.
|
||||
*
|
||||
* A node can be reconnecting to us for the first time. In that case we
|
||||
* just trust its node_id. It will notice the new server term and take
|
||||
* steps to recover.
|
||||
*
|
||||
* A client can be reconnecting to us after we've destroyed their state.
|
||||
* This is fatal for the client if they just took too long to reconnect.
|
||||
* But this can also happen if something disconnects the socket after
|
||||
* we've sent a farewell response before the client received it. In
|
||||
* this case we let the client reconnect so we can resend the farewell
|
||||
* response and they can disconnect cleanly.
|
||||
*
|
||||
* At this point our connection is idle except for send submissions and
|
||||
* shutdown being queued. Once we shut down a We completely own a We
|
||||
* have exclusive access to a previous conn once its shutdown and we set
|
||||
* _freeing.
|
||||
*/
|
||||
void scoutfs_net_server_greeting(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
u64 node_id, u64 greeting_id,
|
||||
bool sent_node_id, bool first_contact,
|
||||
bool farewell)
|
||||
{
|
||||
struct scoutfs_net_connection *listener;
|
||||
struct scoutfs_net_connection *reconn;
|
||||
struct scoutfs_net_connection *acc;
|
||||
|
||||
/* only called on accepted server connections :/ */
|
||||
BUG_ON(!conn->listening_conn);
|
||||
|
||||
/* see if we have a previous conn for the client's sent node_id */
|
||||
reconn = NULL;
|
||||
if (sent_node_id) {
|
||||
listener = conn->listening_conn;
|
||||
restart:
|
||||
spin_lock_nested(&listener->lock, CONN_LOCK_LISTENER);
|
||||
list_for_each_entry(acc, &listener->accepted_list,
|
||||
accepted_head) {
|
||||
if (acc->node_id != node_id ||
|
||||
acc->greeting_id >= greeting_id ||
|
||||
acc->reconn_freeing)
|
||||
continue;
|
||||
|
||||
if (!acc->reconn_wait) {
|
||||
spin_lock_nested(&acc->lock,
|
||||
CONN_LOCK_ACCEPTED);
|
||||
shutdown_conn_locked(acc);
|
||||
spin_unlock(&acc->lock);
|
||||
spin_unlock(&listener->lock);
|
||||
msleep(10); /* XXX might be freed :/ */
|
||||
goto restart;
|
||||
}
|
||||
|
||||
reconn = acc;
|
||||
reconn->reconn_freeing = 1;
|
||||
break;
|
||||
}
|
||||
spin_unlock(&listener->lock);
|
||||
}
|
||||
|
||||
/* drop a connection if we can't find its necessary old conn */
|
||||
if (sent_node_id && !reconn && !first_contact && !farewell) {
|
||||
shutdown_conn(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
/* migrate state from previous conn for this reconnecting node_id */
|
||||
if (reconn) {
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
conn->saw_farewell = reconn->saw_farewell;
|
||||
conn->next_send_seq = reconn->next_send_seq;
|
||||
conn->next_send_id = reconn->next_send_id;
|
||||
atomic64_set(&conn->recv_seq, atomic64_read(&reconn->recv_seq));
|
||||
|
||||
/* greeting response/ack will be on conn send queue */
|
||||
BUG_ON(!list_empty(&reconn->send_queue));
|
||||
BUG_ON(!list_empty(&conn->resend_queue));
|
||||
list_splice_init(&reconn->resend_queue, &conn->resend_queue);
|
||||
|
||||
/* new conn info is unused, swap, old won't call down */
|
||||
swap(conn->info, reconn->info);
|
||||
reconn->notify_down = NULL;
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
/* we set _freeing */
|
||||
destroy_conn(reconn);
|
||||
}
|
||||
|
||||
spin_lock(&conn->lock);
|
||||
|
||||
conn->node_id = node_id;
|
||||
conn->greeting_id = greeting_id;
|
||||
set_valid_greeting(conn);
|
||||
|
||||
spin_unlock(&conn->lock);
|
||||
|
||||
/* only call notify_up the first time we see the node_id */
|
||||
if (conn->notify_up && first_contact)
|
||||
conn->notify_up(sb, conn, conn->info, node_id);
|
||||
}
|
||||
|
||||
/*
|
||||
* The server has received a farewell message and is sending a response.
|
||||
* All we do is mark the connection so that it is freed the next time it
|
||||
* is shutdown, presumably as the client disconnects after receiving the
|
||||
* response. The server caller has cleaned up all the state it had
|
||||
* associated with the client.
|
||||
*/
|
||||
void scoutfs_net_server_farewell(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn)
|
||||
{
|
||||
spin_lock(&conn->lock);
|
||||
conn->saw_farewell = 1;
|
||||
spin_unlock(&conn->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Submit a request down the connection. It's up to the caller to
|
||||
* ensure that the conn is allocated. Sends submitted when the
|
||||
@@ -1455,10 +1786,13 @@ static void net_tseq_show_conn(struct seq_file *m,
|
||||
struct scoutfs_net_connection *conn =
|
||||
container_of(ent, struct scoutfs_net_connection, tseq_entry);
|
||||
|
||||
seq_printf(m, "name "SIN_FMT" peer "SIN_FMT" vg %u est %u sd %u cto_ms %lu nsi %llu\n",
|
||||
seq_printf(m, "name "SIN_FMT" peer "SIN_FMT" node_id %llu greeting_id %llu vg %u est %u sd %u sg %u sf %u rw %u rf %u cto_ms rdl_j %lu %lu nss %llu rs %llu nsi %llu\n",
|
||||
SIN_ARG(&conn->sockname), SIN_ARG(&conn->peername),
|
||||
conn->valid_greeting, conn->established,
|
||||
conn->shutting_down, conn->connect_timeout_ms,
|
||||
conn->node_id, conn->greeting_id, conn->valid_greeting,
|
||||
conn->established, conn->shutting_down, conn->saw_greeting,
|
||||
conn->saw_farewell, conn->reconn_wait, conn->reconn_freeing,
|
||||
conn->connect_timeout_ms, conn->reconn_deadline,
|
||||
conn->next_send_seq, (u64)atomic64_read(&conn->recv_seq),
|
||||
conn->next_send_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,19 @@ void scoutfs_net_shutdown(struct super_block *sb,
|
||||
void scoutfs_net_free_conn(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn);
|
||||
|
||||
void scoutfs_net_client_greeting(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
bool new_server);
|
||||
void scoutfs_net_server_greeting(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
u64 node_id, u64 greeting_id,
|
||||
bool sent_node_id, bool first_contact,
|
||||
bool farewell);
|
||||
void scoutfs_net_server_farewell(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn);
|
||||
void scoutfs_net_farewell(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn);
|
||||
|
||||
int scoutfs_net_setup(struct super_block *sb);
|
||||
void scoutfs_net_destroy(struct super_block *sb);
|
||||
|
||||
|
||||
@@ -1840,6 +1840,14 @@ DEFINE_EVENT(scoutfs_work_class, scoutfs_net_shutdown_work_exit,
|
||||
TP_PROTO(struct super_block *sb, u64 data, int ret),
|
||||
TP_ARGS(sb, data, ret)
|
||||
);
|
||||
DEFINE_EVENT(scoutfs_work_class, scoutfs_net_reconn_free_work_enter,
|
||||
TP_PROTO(struct super_block *sb, u64 data, int ret),
|
||||
TP_ARGS(sb, data, ret)
|
||||
);
|
||||
DEFINE_EVENT(scoutfs_work_class, scoutfs_net_reconn_free_work_exit,
|
||||
TP_PROTO(struct super_block *sb, u64 data, int ret),
|
||||
TP_ARGS(sb, data, ret)
|
||||
);
|
||||
DEFINE_EVENT(scoutfs_work_class, scoutfs_net_send_work_enter,
|
||||
TP_PROTO(struct super_block *sb, u64 data, int ret),
|
||||
TP_ARGS(sb, data, ret)
|
||||
|
||||
@@ -56,6 +56,7 @@ struct server_info {
|
||||
bool shutting_down;
|
||||
struct completion start_comp;
|
||||
struct sockaddr_in listen_sin;
|
||||
u64 term;
|
||||
struct scoutfs_net_connection *conn;
|
||||
|
||||
/* request processing coordinates committing manifest and alloc */
|
||||
@@ -1072,11 +1073,16 @@ int scoutfs_server_lock_response(struct super_block *sb, u64 node_id,
|
||||
* response shuts down the connection.
|
||||
*
|
||||
* We allocate a new node_id for the first connect attempt from a
|
||||
* client. We update the request node_id for the calling net layer to
|
||||
* consume.
|
||||
* client.
|
||||
*
|
||||
* If a client reconnects they'll send their initially assigned node_id
|
||||
* in their greeting request.
|
||||
*
|
||||
* XXX We can lose allocated node_ids here as we record the node_id as
|
||||
* live as we send a valid greeting response. The client might
|
||||
* disconnect before they receive the response and resent and initial
|
||||
* blank greeting. We could use a client uuid to associate with
|
||||
* allocated node_ids.
|
||||
*/
|
||||
static int server_greeting(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
@@ -1088,6 +1094,9 @@ static int server_greeting(struct super_block *sb,
|
||||
DECLARE_SERVER_INFO(sb, server);
|
||||
struct commit_waiter cw;
|
||||
__le64 node_id = 0;
|
||||
bool sent_node_id;
|
||||
bool first_contact;
|
||||
bool farewell;
|
||||
int ret = 0;
|
||||
|
||||
if (arg_len != sizeof(struct scoutfs_net_greeting)) {
|
||||
@@ -1122,24 +1131,61 @@ static int server_greeting(struct super_block *sb,
|
||||
queue_commit_work(server, &cw);
|
||||
up_read(&server->commit_rwsem);
|
||||
ret = wait_for_commit(&cw);
|
||||
if (ret)
|
||||
if (ret) {
|
||||
node_id = 0;
|
||||
goto out;
|
||||
}
|
||||
} else {
|
||||
node_id = gr->node_id;
|
||||
}
|
||||
|
||||
greet.fsid = super->hdr.fsid;
|
||||
greet.format_hash = super->format_hash;
|
||||
greet.server_term = cpu_to_le64(server->term);
|
||||
greet.node_id = node_id;
|
||||
greet.flags = 0;
|
||||
out:
|
||||
ret = scoutfs_net_response(sb, conn, cmd, id, ret,
|
||||
&greet, sizeof(greet));
|
||||
/* give net caller client's new node_id :/ */
|
||||
if (ret == 0 && node_id != 0)
|
||||
gr->node_id = node_id;
|
||||
if (node_id != 0 && ret == 0) {
|
||||
sent_node_id = gr->node_id != 0;
|
||||
first_contact = le64_to_cpu(gr->server_term) != server->term;
|
||||
if (gr->flags & cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL))
|
||||
farewell = true;
|
||||
else
|
||||
farewell = false;
|
||||
|
||||
scoutfs_net_server_greeting(sb, conn, le64_to_cpu(node_id), id,
|
||||
sent_node_id, first_contact,
|
||||
farewell);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* The server is receiving a farewell message from a client that is
|
||||
* unmounting. It won't send any more requests and once it receives our
|
||||
* response it will not reconnect.
|
||||
*
|
||||
* XXX we should make sure that all our requests to the client have finished
|
||||
* before we respond. Locking will have its own messaging for orderly
|
||||
* shutdown. That leaves compaction which will be addressed as part of
|
||||
* the larger work of recovering compactions that were in flight when
|
||||
* a client crashed.
|
||||
*/
|
||||
static int server_farewell(struct super_block *sb,
|
||||
struct scoutfs_net_connection *conn,
|
||||
u8 cmd, u64 id, void *arg, u16 arg_len)
|
||||
{
|
||||
if (arg_len != 0)
|
||||
return -EINVAL;
|
||||
|
||||
scoutfs_net_server_farewell(sb, conn);
|
||||
|
||||
return scoutfs_net_response(sb, conn, cmd, id, 0, NULL, 0);
|
||||
}
|
||||
|
||||
/* requests sent to clients are tracked so we can free resources */
|
||||
struct compact_request {
|
||||
struct list_head head;
|
||||
@@ -1743,6 +1789,7 @@ static scoutfs_net_request_t server_req_funcs[] = {
|
||||
[SCOUTFS_NET_CMD_GET_MANIFEST_ROOT] = server_get_manifest_root,
|
||||
[SCOUTFS_NET_CMD_STATFS] = server_statfs,
|
||||
[SCOUTFS_NET_CMD_LOCK] = server_lock,
|
||||
[SCOUTFS_NET_CMD_FAREWELL] = server_farewell,
|
||||
};
|
||||
|
||||
static void server_notify_up(struct super_block *sb,
|
||||
@@ -1880,13 +1927,15 @@ out:
|
||||
}
|
||||
|
||||
/* XXX can we call start multiple times? */
|
||||
int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin)
|
||||
int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin,
|
||||
u64 term)
|
||||
{
|
||||
DECLARE_SERVER_INFO(sb, server);
|
||||
|
||||
server->err = 0;
|
||||
server->shutting_down = false;
|
||||
server->listen_sin = *sin;
|
||||
server->term = term;
|
||||
init_completion(&server->start_comp);
|
||||
|
||||
queue_work(server->wq, &server->work);
|
||||
|
||||
@@ -22,11 +22,16 @@ do { \
|
||||
__entry->name##_addr & 255, \
|
||||
__entry->name##_port
|
||||
|
||||
#define SNH_FMT "id %llu data_len %u cmd %u flags 0x%x error %u"
|
||||
#define SNH_ARG(nh) le64_to_cpu((nh)->id), le16_to_cpu((nh)->data_len), \
|
||||
(nh)->cmd, (nh)->flags, (nh)->error
|
||||
#define SNH_FMT \
|
||||
"seq %llu recv_seq %llu id %llu data_len %u cmd %u flags 0x%x error %u"
|
||||
#define SNH_ARG(nh) \
|
||||
le64_to_cpu((nh)->seq), le64_to_cpu((nh)->recv_seq), \
|
||||
le64_to_cpu((nh)->id), le16_to_cpu((nh)->data_len), (nh)->cmd, \
|
||||
(nh)->flags, (nh)->error
|
||||
|
||||
#define snh_trace_define(name) \
|
||||
__field(__u64, name##_seq) \
|
||||
__field(__u64, name##_recv_seq) \
|
||||
__field(__u64, name##_id) \
|
||||
__field(__u16, name##_data_len) \
|
||||
__field(__u8, name##_cmd) \
|
||||
@@ -37,6 +42,8 @@ do { \
|
||||
do { \
|
||||
__typeof__(nh) _nh = (nh); \
|
||||
\
|
||||
__entry->name##_seq = le64_to_cpu(_nh->seq); \
|
||||
__entry->name##_recv_seq = le64_to_cpu(_nh->recv_seq); \
|
||||
__entry->name##_id = le64_to_cpu(_nh->id); \
|
||||
__entry->name##_data_len = le16_to_cpu(_nh->data_len); \
|
||||
__entry->name##_cmd = _nh->cmd; \
|
||||
@@ -44,9 +51,10 @@ do { \
|
||||
__entry->name##_error = _nh->error; \
|
||||
} while (0)
|
||||
|
||||
#define snh_trace_args(name) \
|
||||
__entry->name##_id, __entry->name##_data_len, __entry->name##_cmd, \
|
||||
__entry->name##_flags, __entry->name##_error
|
||||
#define snh_trace_args(name) \
|
||||
__entry->name##_seq, __entry->name##_recv_seq, __entry->name##_id, \
|
||||
__entry->name##_data_len, __entry->name##_cmd, __entry->name##_flags, \
|
||||
__entry->name##_error
|
||||
|
||||
struct scoutfs_net_manifest_entry;
|
||||
struct scoutfs_manifest_entry;
|
||||
@@ -62,7 +70,8 @@ int scoutfs_server_lock_response(struct super_block *sb, u64 node_id,
|
||||
u64 id, struct scoutfs_net_lock *nl);
|
||||
|
||||
struct sockaddr_in;
|
||||
int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin);
|
||||
int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin,
|
||||
u64 term);
|
||||
void scoutfs_server_stop(struct super_block *sb);
|
||||
|
||||
int scoutfs_server_setup(struct super_block *sb);
|
||||
|
||||
Reference in New Issue
Block a user