mirror of
https://github.com/versity/scoutfs.git
synced 2026-02-11 04:51:08 +00:00
The net layer was initially built around send queue lists with the presumption that there wouldn't be many messages in flight and that responses would be sent roughly in order. In the modern era, we can have 10s of thousands of lock request messages in flight. This lead to o(n^2) processing in quite a few places as recv processing searched for either requests to complete or responses to free. This adds messages to two rbtrees, indexing either requests by their id or responses by their send sequence. Recv processing can find messages in o(log n). This patch intends to be minimally disruptive. It's only replacing the search of the send and resend queues in the recv path with rbtrees. Other uses of the two queue lists are untouched. On a single node, with ~40k lock shrink attempts in flight, we go from processing ~800 total request/grant request/response pairs per second to ~60,000 per second. Signed-off-by: Zach Brown <zab@versity.com>
2151 lines
60 KiB
C
2151 lines
60 KiB
C
/*
|
|
* Copyright (C) 2018 Versity Software, Inc. All rights reserved.
|
|
*
|
|
* This program is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU General Public
|
|
* License v2 as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* General Public License for more details.
|
|
*/
|
|
#include <linux/kernel.h>
|
|
#include <linux/fs.h>
|
|
#include <linux/slab.h>
|
|
#include <asm/ioctls.h>
|
|
#include <linux/net.h>
|
|
#include <linux/inet.h>
|
|
#include <linux/in.h>
|
|
#include <net/sock.h>
|
|
#include <net/tcp.h>
|
|
#include <linux/log2.h>
|
|
#include <linux/jhash.h>
|
|
#include <linux/rbtree.h>
|
|
|
|
#include "format.h"
|
|
#include "counters.h"
|
|
#include "inode.h"
|
|
#include "btree.h"
|
|
#include "scoutfs_trace.h"
|
|
#include "msg.h"
|
|
#include "net.h"
|
|
#include "endian_swap.h"
|
|
#include "tseq.h"
|
|
#include "fence.h"
|
|
#include "options.h"
|
|
|
|
/*
|
|
* scoutfs networking delivers requests and responses between nodes.
|
|
*
|
|
* Nodes decide to be either a connecting client or a listening server.
|
|
* Both set up a connection and specify the set of request commands they
|
|
* can process.
|
|
*
|
|
* Request and response messages are queued on a connection. They're
|
|
* resent down newly established sockets on a long lived connection.
|
|
* Queued requests are removed as a response is processed or if the
|
|
* request is canceled by the sender. Queued responses are removed as
|
|
* the receiver acknowledges their delivery.
|
|
*
|
|
* Request and response resending is asymmetrical because of the
|
|
* client/server relationship. If a client connects to a new server it
|
|
* drops responses because the new server doesn't have any requests
|
|
* pending. If a server times out a client it drops everything because
|
|
* that client is never coming back.
|
|
*
|
|
* Requests and responses are only processed once for a given client and
|
|
* server pair. Callers have to deal with the possibility that two
|
|
* servers might both process the same client request, even though the
|
|
* client may only see the most recent response.
|
|
*
|
|
* The functional core of this implementation is solid, but boy are the
|
|
* interface boundaries getting fuzzy. The core knows too much about
|
|
* clients and servers and the communications across the net interface
|
|
* boundary are questionable. We probably want to pull more client and
|
|
* server specific behaviour up into the client and server and turn the
|
|
* "net" code into more passive shared framing helpers.
|
|
*
|
|
* XXX:
|
|
* - trace command and response data payloads
|
|
* - checksum message contents?
|
|
* - shutdown server if accept can't alloc resources for new conn?
|
|
*/
|
|
|
|
/* reasonable multiple of max client reconnect attempt interval */
|
|
#define CLIENT_RECONNECT_TIMEOUT_MS (20 * MSEC_PER_SEC)
|
|
|
|
/*
|
|
* A connection's shutdown work executes in its own workqueue so that the
|
|
* work can free the connection's workq.
|
|
*/
|
|
struct net_info {
|
|
struct workqueue_struct *shutdown_workq;
|
|
struct workqueue_struct *destroy_workq;
|
|
struct dentry *conn_tseq_dentry;
|
|
struct scoutfs_tseq_tree conn_tseq_tree;
|
|
struct dentry *msg_tseq_dentry;
|
|
struct scoutfs_tseq_tree msg_tseq_tree;
|
|
};
|
|
|
|
/* flags enum is in net.h */
|
|
#define test_conn_fl(conn, which) (!!((conn)->flags & CONN_FL_##which))
|
|
#define set_conn_fl(conn, which) \
|
|
do { \
|
|
(conn)->flags |= CONN_FL_##which; \
|
|
} while (0)
|
|
#define clear_conn_fl(conn, which) \
|
|
do { \
|
|
(conn)->flags &= ~CONN_FL_##which; \
|
|
} while (0)
|
|
#define assign_conn_fl(dst, src, which) \
|
|
do { \
|
|
(dst)->flags |= ((conn)->flags & CONN_FL_##which); \
|
|
} while (0)
|
|
|
|
/* listening and their accepting sockets have a fixed locking order */
|
|
enum spin_lock_subtype {
|
|
CONN_LOCK_LISTENER,
|
|
CONN_LOCK_ACCEPTED,
|
|
};
|
|
|
|
/*
|
|
* Messages to be sent are allocated and put on the send queue.
|
|
*
|
|
* Request messages are put on the resend queue until their response
|
|
* messages is received and they can be freed.
|
|
*
|
|
* The send worker is the only context that references messages while
|
|
* not holding the lock. It does this while blocking sending the
|
|
* message down the socket. To free messages we mark them dead and have
|
|
* the send worker free them while under the lock so that we don't have
|
|
* to risk freeing messages from under the unlocked send worker.
|
|
*/
|
|
struct message_send {
|
|
struct scoutfs_tseq_entry tseq_entry;
|
|
unsigned long dead:1;
|
|
struct list_head head;
|
|
scoutfs_net_response_t resp_func;
|
|
struct rb_node node;
|
|
void *resp_data;
|
|
struct scoutfs_net_header nh;
|
|
};
|
|
|
|
/*
|
|
* Incoming received messages are processed in concurrent blocking work
|
|
* contexts.
|
|
*/
|
|
struct message_recv {
|
|
struct scoutfs_tseq_entry tseq_entry;
|
|
struct work_struct proc_work;
|
|
struct list_head ordered_head;
|
|
struct scoutfs_net_connection *conn;
|
|
struct scoutfs_net_header nh;
|
|
};
|
|
|
|
#define DEFINE_CONN_FROM_WORK(name, work, member) \
|
|
struct scoutfs_net_connection *name = \
|
|
container_of(work, struct scoutfs_net_connection, member)
|
|
|
|
/* Total message bytes including header and payload */
|
|
static int nh_bytes(unsigned int data_len)
|
|
{
|
|
return offsetof(struct scoutfs_net_header, data[data_len]);
|
|
}
|
|
|
|
static bool nh_is_response(struct scoutfs_net_header *nh)
|
|
{
|
|
return !!(nh->flags & SCOUTFS_NET_FLAG_RESPONSE);
|
|
}
|
|
|
|
static bool nh_is_request(struct scoutfs_net_header *nh)
|
|
{
|
|
return !nh_is_response(nh);
|
|
}
|
|
|
|
static int cmp_sorted_msend(u64 pos, struct message_send *msend)
|
|
{
|
|
if (nh_is_request(&msend->nh))
|
|
return pos < le64_to_cpu(msend->nh.id) ? -1 :
|
|
pos > le64_to_cpu(msend->nh.id) ? 1 : 0;
|
|
else
|
|
return pos < le64_to_cpu(msend->nh.seq) ? -1 :
|
|
pos > le64_to_cpu(msend->nh.seq) ? 1 : 0;
|
|
}
|
|
|
|
static struct message_send *search_sorted_msends(struct rb_root *root, u64 pos, struct rb_node *ins)
|
|
{
|
|
struct rb_node **node = &root->rb_node;
|
|
struct rb_node *parent = NULL;
|
|
struct message_send *msend = NULL;
|
|
struct message_send *next = NULL;
|
|
int cmp = -1;
|
|
|
|
while (*node) {
|
|
parent = *node;
|
|
msend = container_of(*node, struct message_send, node);
|
|
|
|
cmp = cmp_sorted_msend(pos, msend);
|
|
if (cmp < 0) {
|
|
next = msend;
|
|
node = &(*node)->rb_left;
|
|
} else if (cmp > 0) {
|
|
node = &(*node)->rb_right;
|
|
} else {
|
|
next = msend;
|
|
break;
|
|
}
|
|
}
|
|
|
|
BUG_ON(cmp == 0 && ins);
|
|
|
|
if (ins) {
|
|
rb_link_node(ins, parent, node);
|
|
rb_insert_color(ins, root);
|
|
}
|
|
|
|
return next;
|
|
}
|
|
|
|
static struct message_send *next_sorted_msend(struct message_send *msend)
|
|
{
|
|
struct rb_node *node = rb_next(&msend->node);
|
|
|
|
return node ? rb_entry(node, struct message_send, node) : NULL;
|
|
}
|
|
|
|
#define for_each_sorted_msend(MSEND_, TMP_, ROOT_, POS_) \
|
|
for (MSEND_ = search_sorted_msends(ROOT_, POS_, NULL); \
|
|
MSEND_ != NULL && ({ TMP_ = next_sorted_msend(MSEND_); true; }); \
|
|
MSEND_ = TMP_)
|
|
|
|
static void insert_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend)
|
|
{
|
|
BUG_ON(!RB_EMPTY_NODE(&msend->node));
|
|
|
|
if (nh_is_request(&msend->nh))
|
|
search_sorted_msends(&conn->req_root, le64_to_cpu(msend->nh.id), &msend->node);
|
|
else
|
|
search_sorted_msends(&conn->resp_root, le64_to_cpu(msend->nh.seq), &msend->node);
|
|
}
|
|
|
|
static void erase_sorted_msend(struct scoutfs_net_connection *conn, struct message_send *msend)
|
|
{
|
|
if (!RB_EMPTY_NODE(&msend->node)) {
|
|
if (nh_is_request(&msend->nh))
|
|
rb_erase(&msend->node, &conn->req_root);
|
|
else
|
|
rb_erase(&msend->node, &conn->resp_root);
|
|
RB_CLEAR_NODE(&msend->node);
|
|
}
|
|
}
|
|
|
|
static void move_sorted_msends(struct scoutfs_net_connection *dst_conn, struct rb_root *dst_root,
|
|
struct scoutfs_net_connection *src_conn, struct rb_root *src_root)
|
|
{
|
|
struct message_send *msend;
|
|
struct message_send *tmp;
|
|
|
|
for_each_sorted_msend(msend, tmp, src_root, 0) {
|
|
erase_sorted_msend(src_conn, msend);
|
|
insert_sorted_msend(dst_conn, msend);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Pending requests are uniquely identified by the id they were assigned
|
|
* as they were first put on the send queue.
|
|
*/
|
|
static struct message_send *find_request(struct scoutfs_net_connection *conn, u8 cmd, u64 id)
|
|
{
|
|
struct message_send *msend;
|
|
|
|
assert_spin_locked(&conn->lock);
|
|
|
|
msend = search_sorted_msends(&conn->req_root, id, NULL);
|
|
if (msend && !(msend->nh.cmd == cmd && le64_to_cpu(msend->nh.id) == id))
|
|
msend = NULL;
|
|
|
|
return msend;
|
|
}
|
|
|
|
/*
|
|
* Free a send message by moving it to the send queue and marking it
|
|
* dead. It is removed from the sorted rb roots so it won't be visible
|
|
* as a request for response processing.
|
|
*/
|
|
static void queue_dead_free(struct scoutfs_net_connection *conn, struct message_send *msend)
|
|
{
|
|
assert_spin_locked(&conn->lock);
|
|
|
|
if (WARN_ON_ONCE(msend->dead) ||
|
|
WARN_ON_ONCE(list_empty(&msend->head)))
|
|
return;
|
|
|
|
msend->dead = 1;
|
|
list_move(&msend->head, &conn->send_queue);
|
|
erase_sorted_msend(conn, msend);
|
|
queue_work(conn->workq, &conn->send_work);
|
|
}
|
|
|
|
/*
|
|
* Translate a positive error on the wire to a negative host errno.
|
|
*/
|
|
static inline int net_err_to_host(u8 net_err)
|
|
{
|
|
#undef EXPAND_NET_ERRNO
|
|
#define EXPAND_NET_ERRNO(which) [SCOUTFS_NET_ERR_##which] = which,
|
|
static u8 host_errnos[] = {
|
|
EXPAND_EACH_NET_ERRNO
|
|
};
|
|
|
|
if (net_err == SCOUTFS_NET_ERR_NONE)
|
|
return 0;
|
|
|
|
if (net_err < ARRAY_SIZE(host_errnos) && host_errnos[net_err])
|
|
return -host_errnos[net_err];
|
|
|
|
return -EINVAL;
|
|
}
|
|
|
|
/*
|
|
* Translate a negative host errno to a positive error on the wire.
|
|
*
|
|
* The caller is our kernel run time which should have been careful with
|
|
* errnos. But mistakes happen so let's holler and translate unknown
|
|
* errors. A fun bit of trivia: sparse's array bounds detection once
|
|
* got confused by conditions in WARN_ON_ONCE();
|
|
*/
|
|
static inline u8 net_err_from_host(struct super_block *sb, int error)
|
|
{
|
|
#undef EXPAND_NET_ERRNO
|
|
#define EXPAND_NET_ERRNO(which) [which] = SCOUTFS_NET_ERR_##which,
|
|
static u8 net_errs[] = {
|
|
EXPAND_EACH_NET_ERRNO
|
|
};
|
|
int ind = -error;
|
|
|
|
if (error == 0)
|
|
return SCOUTFS_NET_ERR_NONE;
|
|
|
|
if (error > 0 || ind >= ARRAY_SIZE(net_errs) || net_errs[ind] == 0) {
|
|
static bool warned;
|
|
if (!warned) {
|
|
warned = 1;
|
|
scoutfs_warn(sb, "host errno %d sent as EINVAL",
|
|
error);
|
|
}
|
|
|
|
return -EINVAL;
|
|
}
|
|
|
|
return net_errs[ind];
|
|
}
|
|
|
|
/*
|
|
* Shutdown the connection. This is called by many contexts including
|
|
* work that most complete to finish shutting down. We queue specific
|
|
* shutdown work that can wait on all the connection's other work.
|
|
* We're sure to only queue the shutdown work once.
|
|
*/
|
|
static void shutdown_conn_locked(struct scoutfs_net_connection *conn)
|
|
{
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
|
|
assert_spin_locked(&conn->lock);
|
|
|
|
if (!test_conn_fl(conn, shutting_down)) {
|
|
clear_conn_fl(conn, established);
|
|
set_conn_fl(conn, shutting_down);
|
|
trace_scoutfs_conn_shutdown_queued(conn);
|
|
queue_work(ninf->shutdown_workq, &conn->shutdown_work);
|
|
}
|
|
}
|
|
|
|
static void shutdown_conn(struct scoutfs_net_connection *conn)
|
|
{
|
|
spin_lock(&conn->lock);
|
|
shutdown_conn_locked(conn);
|
|
spin_unlock(&conn->lock);
|
|
}
|
|
|
|
/*
|
|
* Allocate a message and put it on the send queue.
|
|
*
|
|
* A 0 id means that we'll assign the next id from the connection once
|
|
* we hold the lock and is only valid for sending requests.
|
|
*
|
|
* This can race with connections that are either starting up and
|
|
* shutting down. We only directly queue the send work if the
|
|
* connection has passed the greeting and isn't being shut down. At all
|
|
* other times we add new sends to the resend queue.
|
|
*
|
|
* If a non-zero rid is specified then the conn argument is a listening
|
|
* connection and the connection to send the message down is found by
|
|
* searching for the rid in its accepted connections.
|
|
*/
|
|
static int submit_send(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn, u64 rid,
|
|
u8 cmd, u8 flags, u64 id, u8 net_err,
|
|
void *data, u16 data_len,
|
|
scoutfs_net_response_t resp_func, void *resp_data,
|
|
u64 *id_ret)
|
|
{
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct scoutfs_net_connection *acc_conn;
|
|
struct message_send *msend;
|
|
u64 seq;
|
|
|
|
if (WARN_ON_ONCE(cmd >= SCOUTFS_NET_CMD_UNKNOWN) ||
|
|
WARN_ON_ONCE(flags & SCOUTFS_NET_FLAGS_UNKNOWN) ||
|
|
WARN_ON_ONCE(net_err >= SCOUTFS_NET_ERR_UNKNOWN) ||
|
|
WARN_ON_ONCE(data_len > SCOUTFS_NET_MAX_DATA_LEN) ||
|
|
WARN_ON_ONCE(data_len && data == NULL) ||
|
|
WARN_ON_ONCE(net_err && (!(flags & SCOUTFS_NET_FLAG_RESPONSE))) ||
|
|
WARN_ON_ONCE(id == 0 && (flags & SCOUTFS_NET_FLAG_RESPONSE)))
|
|
return -EINVAL;
|
|
|
|
if (scoutfs_forcing_unmount(sb))
|
|
return -ENOLINK;
|
|
|
|
msend = kmalloc(offsetof(struct message_send,
|
|
nh.data[data_len]), GFP_NOFS);
|
|
if (!msend)
|
|
return -ENOMEM;
|
|
|
|
spin_lock_nested(&conn->lock, CONN_LOCK_LISTENER);
|
|
|
|
if (rid != 0) {
|
|
list_for_each_entry(acc_conn, &conn->accepted_list,
|
|
accepted_head) {
|
|
if (acc_conn->rid == rid) {
|
|
spin_lock_nested(&acc_conn->lock,
|
|
CONN_LOCK_ACCEPTED);
|
|
spin_unlock(&conn->lock);
|
|
conn = acc_conn;
|
|
rid = 0;
|
|
break;
|
|
}
|
|
}
|
|
if (rid != 0) {
|
|
spin_unlock(&conn->lock);
|
|
kfree(msend);
|
|
return -ENOTCONN;
|
|
}
|
|
}
|
|
|
|
seq = conn->next_send_seq++;
|
|
if (id == 0)
|
|
id = conn->next_send_id++;
|
|
|
|
msend->resp_func = resp_func;
|
|
msend->resp_data = resp_data;
|
|
msend->dead = 0;
|
|
RB_CLEAR_NODE(&msend->node);
|
|
|
|
msend->nh.seq = cpu_to_le64(seq);
|
|
msend->nh.recv_seq = 0; /* set when sent, not when queued */
|
|
msend->nh.id = cpu_to_le64(id);
|
|
msend->nh.cmd = cmd;
|
|
msend->nh.flags = flags;
|
|
msend->nh.error = net_err;
|
|
memset(msend->nh.__pad, 0, sizeof(msend->nh.__pad));
|
|
msend->nh.data_len = cpu_to_le16(data_len);
|
|
if (data_len)
|
|
memcpy(msend->nh.data, data, data_len);
|
|
|
|
if (test_conn_fl(conn, established) &&
|
|
(test_conn_fl(conn, valid_greeting) ||
|
|
cmd == SCOUTFS_NET_CMD_GREETING)) {
|
|
list_add_tail(&msend->head, &conn->send_queue);
|
|
queue_work(conn->workq, &conn->send_work);
|
|
} else {
|
|
list_add_tail(&msend->head, &conn->resend_queue);
|
|
}
|
|
insert_sorted_msend(conn, msend);
|
|
|
|
if (id_ret)
|
|
*id_ret = le64_to_cpu(msend->nh.id);
|
|
|
|
scoutfs_tseq_add(&ninf->msg_tseq_tree, &msend->tseq_entry);
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Process an incoming request. The greeting should ensure that the
|
|
* sender won't send us unknown commands. We return an error if we see
|
|
* an unknown command because the greeting should agree on an understood
|
|
* protocol. The request function sends a response and returns an error
|
|
* if they couldn't.
|
|
*/
|
|
static int process_request(struct scoutfs_net_connection *conn,
|
|
struct message_recv *mrecv)
|
|
{
|
|
struct super_block *sb = conn->sb;
|
|
scoutfs_net_request_t req_func;
|
|
|
|
if (mrecv->nh.cmd < SCOUTFS_NET_CMD_UNKNOWN)
|
|
req_func = conn->req_funcs[mrecv->nh.cmd];
|
|
else
|
|
req_func = NULL;
|
|
|
|
if (req_func == NULL) {
|
|
scoutfs_inc_counter(sb, net_unknown_request);
|
|
return -EINVAL;
|
|
}
|
|
|
|
return req_func(sb, conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id),
|
|
mrecv->nh.data, le16_to_cpu(mrecv->nh.data_len));
|
|
}
|
|
|
|
static int call_resp_func(struct super_block *sb, struct scoutfs_net_connection *conn,
|
|
scoutfs_net_response_t resp_func, void *resp_data,
|
|
void *resp, unsigned int resp_len, int error)
|
|
{
|
|
if (resp_func)
|
|
return resp_func(sb, conn, resp, resp_len, error, resp_data);
|
|
else
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* An incoming response finds the queued request and calls its response
|
|
* function. The response function for a given request will only be
|
|
* called once. Requests can be canceled while a response is in flight.
|
|
* It's not an error to receive a response to a request that no longer
|
|
* exists.
|
|
*/
|
|
static int process_response(struct scoutfs_net_connection *conn,
|
|
struct message_recv *mrecv)
|
|
{
|
|
struct super_block *sb = conn->sb;
|
|
struct message_send *msend;
|
|
scoutfs_net_response_t resp_func = NULL;
|
|
void *resp_data;
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
msend = find_request(conn, mrecv->nh.cmd, le64_to_cpu(mrecv->nh.id));
|
|
if (msend) {
|
|
resp_func = msend->resp_func;
|
|
resp_data = msend->resp_data;
|
|
queue_dead_free(conn, msend);
|
|
} else {
|
|
scoutfs_inc_counter(sb, net_dropped_response);
|
|
}
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
return call_resp_func(sb, conn, resp_func, resp_data, mrecv->nh.data,
|
|
le16_to_cpu(mrecv->nh.data_len), net_err_to_host(mrecv->nh.error));
|
|
}
|
|
|
|
/*
|
|
* Process an incoming received message in its own concurrent blocking
|
|
* work context.
|
|
*/
|
|
static void scoutfs_net_proc_worker(struct work_struct *work)
|
|
{
|
|
struct message_recv *mrecv = container_of(work, struct message_recv,
|
|
proc_work);
|
|
struct scoutfs_net_connection *conn = mrecv->conn;
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_proc_work_enter(sb, 0, 0);
|
|
|
|
if (nh_is_request(&mrecv->nh))
|
|
ret = process_request(conn, mrecv);
|
|
else
|
|
ret = process_response(conn, mrecv);
|
|
|
|
/* process_one_work explicitly allows freeing work in its func */
|
|
scoutfs_tseq_del(&ninf->msg_tseq_tree, &mrecv->tseq_entry);
|
|
kfree(mrecv);
|
|
|
|
/* shut down the connection if processing returns fatal errors */
|
|
if (ret)
|
|
shutdown_conn(conn);
|
|
|
|
trace_scoutfs_net_proc_work_exit(sb, 0, ret);
|
|
}
|
|
|
|
static void scoutfs_net_ordered_proc_worker(struct work_struct *work)
|
|
{
|
|
struct scoutfs_work_list *wlist = container_of(work, struct scoutfs_work_list, work);
|
|
struct message_recv *mrecv;
|
|
struct message_recv *mrecv__;
|
|
LIST_HEAD(list);
|
|
|
|
spin_lock(&wlist->lock);
|
|
list_splice_init(&wlist->list, &list);
|
|
spin_unlock(&wlist->lock);
|
|
|
|
list_for_each_entry_safe(mrecv, mrecv__, &list, ordered_head) {
|
|
list_del_init(&mrecv->ordered_head);
|
|
scoutfs_net_proc_worker(&mrecv->proc_work);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Some messages require in-order processing. But the scope of the
|
|
* ordering isn't global. In the case of lock messages, it's per lock.
|
|
* So for these messages we hash them to a number of ordered workers who
|
|
* walk a list and call the usual work function in order. This replaced
|
|
* first the proc work detecting OOO and re-ordering, and then only
|
|
* calling proc from the one recv work context.
|
|
*/
|
|
static void queue_ordered_proc(struct scoutfs_net_connection *conn, struct message_recv *mrecv)
|
|
{
|
|
struct scoutfs_work_list *wlist;
|
|
struct scoutfs_net_lock *nl;
|
|
u32 h;
|
|
|
|
if (WARN_ON_ONCE(mrecv->nh.cmd != SCOUTFS_NET_CMD_LOCK ||
|
|
le16_to_cpu(mrecv->nh.data_len) != sizeof(struct scoutfs_net_lock)))
|
|
return scoutfs_net_proc_worker(&mrecv->proc_work);
|
|
|
|
nl = (void *)mrecv->nh.data;
|
|
h = jhash(&nl->key, sizeof(struct scoutfs_key), 0x6fdd3cd5);
|
|
wlist = &conn->ordered_proc_wlists[h % conn->ordered_proc_nr];
|
|
|
|
spin_lock(&wlist->lock);
|
|
list_add_tail(&mrecv->ordered_head, &wlist->list);
|
|
spin_unlock(&wlist->lock);
|
|
queue_work(conn->workq, &wlist->work);
|
|
}
|
|
|
|
/*
|
|
* Free live responses up to and including the seq by marking them dead
|
|
* and moving them to the send queue to be freed.
|
|
*/
|
|
static void free_acked_responses(struct scoutfs_net_connection *conn, u64 seq)
|
|
{
|
|
struct message_send *msend;
|
|
struct message_send *tmp;
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
for_each_sorted_msend(msend, tmp, &conn->resp_root, 0) {
|
|
if (le64_to_cpu(msend->nh.seq) > seq)
|
|
break;
|
|
|
|
queue_dead_free(conn, msend);
|
|
}
|
|
|
|
spin_unlock(&conn->lock);
|
|
}
|
|
|
|
static int k_recvmsg(struct socket *sock, void *buf, unsigned len)
|
|
{
|
|
struct kvec kv = {
|
|
.iov_base = buf,
|
|
.iov_len = len,
|
|
};
|
|
struct msghdr msg = {
|
|
.msg_flags = MSG_NOSIGNAL,
|
|
};
|
|
|
|
return kernel_recvmsg(sock, &msg, &kv, 1, len, msg.msg_flags);
|
|
}
|
|
|
|
static bool invalid_message(struct scoutfs_net_connection *conn,
|
|
struct scoutfs_net_header *nh)
|
|
{
|
|
/* seq and id must be non-zero */
|
|
if (nh->seq == 0 || nh->id == 0)
|
|
return true;
|
|
|
|
/* greeting should negotiate understood protocol */
|
|
if (nh->cmd >= SCOUTFS_NET_CMD_UNKNOWN ||
|
|
(nh->flags & SCOUTFS_NET_FLAGS_UNKNOWN) ||
|
|
nh->error >= SCOUTFS_NET_ERR_UNKNOWN)
|
|
return true;
|
|
|
|
/* payloads have a limit */
|
|
if (le16_to_cpu(nh->data_len) > SCOUTFS_NET_MAX_DATA_LEN)
|
|
return true;
|
|
|
|
/* only responses can carry errors */
|
|
if (nh_is_request(nh) && nh->error != SCOUTFS_NET_ERR_NONE)
|
|
return true;
|
|
|
|
if (nh->cmd == SCOUTFS_NET_CMD_GREETING) {
|
|
/* each endpoint can only receive one greeting per socket */
|
|
if (test_conn_fl(conn, saw_greeting))
|
|
return true;
|
|
|
|
/* servers get greeting requests, clients get responses */
|
|
if (!!conn->listening_conn != !!nh_is_request(nh))
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static int recv_one_message(struct super_block *sb, struct net_info *ninf,
|
|
struct scoutfs_net_connection *conn, struct scoutfs_net_header *nh,
|
|
unsigned int data_len)
|
|
{
|
|
struct message_recv *mrecv;
|
|
int ret;
|
|
|
|
scoutfs_inc_counter(sb, net_recv_messages);
|
|
scoutfs_add_counter(sb, net_recv_bytes, nh_bytes(data_len));
|
|
trace_scoutfs_net_recv_message(sb, &conn->sockname, &conn->peername, nh);
|
|
|
|
/* caller's invalid message checked data len */
|
|
mrecv = kmalloc(offsetof(struct message_recv, nh.data[data_len]), GFP_NOFS);
|
|
if (!mrecv) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
|
|
mrecv->conn = conn;
|
|
INIT_WORK(&mrecv->proc_work, scoutfs_net_proc_worker);
|
|
INIT_LIST_HEAD(&mrecv->ordered_head);
|
|
mrecv->nh = *nh;
|
|
if (data_len)
|
|
memcpy(mrecv->nh.data, (nh + 1), data_len);
|
|
|
|
if (nh->cmd == SCOUTFS_NET_CMD_GREETING) {
|
|
/* greetings are out of band, no seq mechanics */
|
|
set_conn_fl(conn, saw_greeting);
|
|
|
|
} else if (le64_to_cpu(nh->seq) <=
|
|
atomic64_read(&conn->recv_seq)) {
|
|
/* drop any resent duplicated messages */
|
|
scoutfs_inc_counter(sb, net_recv_dropped_duplicate);
|
|
kfree(mrecv);
|
|
ret = 0;
|
|
goto out;
|
|
|
|
} else {
|
|
/* record that we've received sender's seq */
|
|
atomic64_set(&conn->recv_seq, le64_to_cpu(nh->seq));
|
|
/* and free our responses that sender has received */
|
|
free_acked_responses(conn, le64_to_cpu(nh->recv_seq));
|
|
}
|
|
|
|
scoutfs_tseq_add(&ninf->msg_tseq_tree, &mrecv->tseq_entry);
|
|
|
|
/*
|
|
* Initial received greetings are processed inline
|
|
* before any other incoming messages.
|
|
*
|
|
* Incoming requests or responses to the lock client
|
|
* can't handle re-ordering, so they're queued to
|
|
* ordered receive processing work.
|
|
*/
|
|
if (nh->cmd == SCOUTFS_NET_CMD_GREETING)
|
|
scoutfs_net_proc_worker(&mrecv->proc_work);
|
|
else if (nh->cmd == SCOUTFS_NET_CMD_LOCK && !conn->listening_conn)
|
|
queue_ordered_proc(conn, mrecv);
|
|
else
|
|
queue_work(conn->workq, &mrecv->proc_work);
|
|
ret = 0;
|
|
|
|
out:
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Always block receiving from the socket. Errors trigger shutting down
|
|
* the connection.
|
|
*/
|
|
static void scoutfs_net_recv_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, recv_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct socket *sock = conn->sock;
|
|
struct scoutfs_net_header *nh;
|
|
struct page *page = NULL;
|
|
unsigned int data_len;
|
|
int hdr_off;
|
|
int rx_off;
|
|
int size;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_recv_work_enter(sb, 0, 0);
|
|
|
|
page = alloc_page(GFP_NOFS);
|
|
if (!page) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
|
|
hdr_off = 0;
|
|
rx_off = 0;
|
|
|
|
for (;;) {
|
|
/* receive the header */
|
|
ret = k_recvmsg(sock, page_address(page) + rx_off, PAGE_SIZE - rx_off);
|
|
if (ret <= 0) {
|
|
ret = -ECONNABORTED;
|
|
goto out;
|
|
}
|
|
|
|
rx_off += ret;
|
|
|
|
for (;;) {
|
|
size = rx_off - hdr_off;
|
|
if (size < sizeof(struct scoutfs_net_header))
|
|
break;
|
|
|
|
nh = page_address(page) + hdr_off;
|
|
|
|
/* receiving an invalid message breaks the connection */
|
|
if (invalid_message(conn, nh)) {
|
|
scoutfs_inc_counter(sb, net_recv_invalid_message);
|
|
ret = -EBADMSG;
|
|
break;
|
|
}
|
|
|
|
data_len = le16_to_cpu(nh->data_len);
|
|
if (sizeof(struct scoutfs_net_header) + data_len > size)
|
|
break;
|
|
|
|
ret = recv_one_message(sb, ninf, conn, nh, data_len);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
hdr_off += sizeof(struct scoutfs_net_header) + data_len;
|
|
}
|
|
|
|
if ((PAGE_SIZE - rx_off) <
|
|
(sizeof(struct scoutfs_net_header) + SCOUTFS_NET_MAX_DATA_LEN)) {
|
|
if (size)
|
|
memmove(page_address(page), page_address(page) + hdr_off, size);
|
|
hdr_off = 0;
|
|
rx_off = size;
|
|
}
|
|
}
|
|
|
|
out:
|
|
__free_page(page);
|
|
|
|
if (ret)
|
|
scoutfs_inc_counter(sb, net_recv_error);
|
|
|
|
/* recv stopping always shuts down the connection */
|
|
shutdown_conn(conn);
|
|
|
|
trace_scoutfs_net_recv_work_exit(sb, 0, ret);
|
|
}
|
|
|
|
/*
|
|
* This consumes the kvec.
|
|
*/
|
|
static int k_sendmsg_full(struct socket *sock, struct kvec *kv, unsigned long nr_segs, size_t count)
|
|
{
|
|
int ret = 0;
|
|
|
|
while (count > 0) {
|
|
struct msghdr msg = {
|
|
.msg_flags = MSG_NOSIGNAL,
|
|
};
|
|
|
|
ret = kernel_sendmsg(sock, &msg, kv, nr_segs, count);
|
|
if (ret <= 0) {
|
|
ret = -ECONNABORTED;
|
|
break;
|
|
}
|
|
|
|
count -= ret;
|
|
if (count) {
|
|
while (nr_segs > 0 && ret >= kv->iov_len) {
|
|
ret -= kv->iov_len;
|
|
kv++;
|
|
nr_segs--;
|
|
}
|
|
if (nr_segs > 0 && ret > 0) {
|
|
kv->iov_base += ret;
|
|
kv->iov_len -= ret;
|
|
}
|
|
BUG_ON(nr_segs == 0);
|
|
}
|
|
ret = 0;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void free_msend(struct net_info *ninf, struct scoutfs_net_connection *conn,
|
|
struct message_send *msend)
|
|
{
|
|
list_del_init(&msend->head);
|
|
erase_sorted_msend(conn, msend);
|
|
scoutfs_tseq_del(&ninf->msg_tseq_tree, &msend->tseq_entry);
|
|
kfree(msend);
|
|
}
|
|
|
|
/*
|
|
* Each connection has a single worker that sends queued messages down
|
|
* the connection's socket. The work is queued whenever a message is
|
|
* put on the send queue. The worker uses blocking sends so that we
|
|
* don't have to worry about resuming partial sends or hooking into
|
|
* data_ready. Send errors shut down the connection.
|
|
*
|
|
* The worker is responsible for freeing messages so that other contexts
|
|
* don't have to worry about freeing a message while we're blocked
|
|
* sending it without the lock held.
|
|
*
|
|
* We set the current recv_seq on every outgoing frame as it represents
|
|
* the current connection state, not the state back when each message
|
|
* was first queued.
|
|
*/
|
|
static void scoutfs_net_send_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, send_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct message_send *msend;
|
|
struct message_send *_msend_;
|
|
struct kvec kv[16];
|
|
unsigned long nr_segs;
|
|
size_t count;
|
|
int len;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_send_work_enter(sb, 0, 0);
|
|
|
|
for (;;) {
|
|
nr_segs = 0;
|
|
count = 0;
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
|
|
if (msend->dead) {
|
|
free_msend(ninf, conn, msend);
|
|
continue;
|
|
}
|
|
|
|
len = nh_bytes(le16_to_cpu(msend->nh.data_len));
|
|
|
|
if ((msend->nh.cmd == SCOUTFS_NET_CMD_FAREWELL) &&
|
|
nh_is_response(&msend->nh)) {
|
|
set_conn_fl(conn, saw_farewell);
|
|
}
|
|
|
|
msend->nh.recv_seq = cpu_to_le64(atomic64_read(&conn->recv_seq));
|
|
|
|
scoutfs_inc_counter(sb, net_send_messages);
|
|
scoutfs_add_counter(sb, net_send_bytes, len);
|
|
trace_scoutfs_net_send_message(sb, &conn->sockname,
|
|
&conn->peername, &msend->nh);
|
|
|
|
count += len;
|
|
kv[nr_segs].iov_base = &msend->nh;
|
|
kv[nr_segs].iov_len = len;
|
|
if (++nr_segs == ARRAY_SIZE(kv))
|
|
break;
|
|
|
|
}
|
|
spin_unlock(&conn->lock);
|
|
|
|
if (nr_segs == 0) {
|
|
ret = 0;
|
|
goto out;
|
|
}
|
|
|
|
ret = k_sendmsg_full(conn->sock, kv, nr_segs, count);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
spin_lock(&conn->lock);
|
|
list_for_each_entry_safe(msend, _msend_, &conn->send_queue, head) {
|
|
msend->nh.recv_seq = 0;
|
|
|
|
/* resend if it wasn't freed while we sent */
|
|
if (!msend->dead)
|
|
list_move_tail(&msend->head, &conn->resend_queue);
|
|
|
|
if (--nr_segs == 0)
|
|
break;
|
|
}
|
|
spin_unlock(&conn->lock);
|
|
}
|
|
|
|
out:
|
|
if (ret) {
|
|
scoutfs_inc_counter(sb, net_send_error);
|
|
shutdown_conn(conn);
|
|
}
|
|
|
|
trace_scoutfs_net_send_work_exit(sb, 0, ret);
|
|
}
|
|
|
|
/*
|
|
* Listening conns try to destroy accepted conns. Workqueues model
|
|
* flushing work as acquiring a workqueue class lock so it thinks that
|
|
* this is a deadlock because it doesn't know about our hierarchy of
|
|
* workqueues. The workqueue lockdep_map is private so we can't set a
|
|
* subclass to differentiate between listening and accepted conn
|
|
* workqueues. Instead we queue final conn destruction off to a longer
|
|
* lived specific workqueue that has a different class.
|
|
*/
|
|
static void scoutfs_net_destroy_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, destroy_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct scoutfs_net_connection *listener;
|
|
struct message_send *msend;
|
|
struct message_send *tmp;
|
|
|
|
trace_scoutfs_net_destroy_work_enter(sb, 0, 0);
|
|
trace_scoutfs_conn_destroy_start(conn);
|
|
|
|
WARN_ON_ONCE(conn->sock != NULL);
|
|
WARN_ON_ONCE(!list_empty(&conn->accepted_list));
|
|
|
|
/* tell callers that accepted connection finally done */
|
|
if (conn->listening_conn && conn->notify_down)
|
|
conn->notify_down(sb, conn, conn->info, conn->rid);
|
|
|
|
list_splice_init(&conn->resend_queue, &conn->send_queue);
|
|
list_for_each_entry_safe(msend, tmp, &conn->send_queue, head)
|
|
free_msend(ninf, conn, msend);
|
|
|
|
/* accepted sockets are removed from their listener's list */
|
|
if (conn->listening_conn) {
|
|
listener = conn->listening_conn;
|
|
|
|
spin_lock(&listener->lock);
|
|
list_del_init(&conn->accepted_head);
|
|
if (list_empty(&listener->accepted_list))
|
|
wake_up(&listener->waitq);
|
|
spin_unlock(&listener->lock);
|
|
}
|
|
|
|
destroy_workqueue(conn->workq);
|
|
scoutfs_tseq_del(&ninf->conn_tseq_tree, &conn->tseq_entry);
|
|
kfree(conn->info);
|
|
kfree(conn->ordered_proc_wlists);
|
|
trace_scoutfs_conn_destroy_free(conn);
|
|
kfree(conn);
|
|
|
|
trace_scoutfs_net_destroy_work_exit(sb, 0, 0);
|
|
}
|
|
|
|
static void destroy_conn(struct scoutfs_net_connection *conn)
|
|
{
|
|
struct net_info *ninf = SCOUTFS_SB(conn->sb)->net_info;
|
|
|
|
queue_work(ninf->destroy_workq, &conn->destroy_work);
|
|
}
|
|
|
|
/*
|
|
* By default, TCP would maintain a connection to an unresponsive peer
|
|
* for a very long time indeed. We can't do that because quorum
|
|
* members will only participate in an election when they don't have a
|
|
* healthy connection to a server. We use the KEEPALIVE* and
|
|
* TCP_USER_TIMEOUT options to ensure that we'll break an unresponsive
|
|
* connection and return to the quorum and client connection paths to
|
|
* try and establish a new connection to an active server.
|
|
*
|
|
* The TCP_KEEP* and TCP_USER_TIMEOUT option interaction is subtle.
|
|
* TCP_USER_TIMEOUT only applies if there is unacked written data in the
|
|
* send queue. It doesn't work if the connection is idle. Adding
|
|
* keepalive probes with user_timeout set changes how the keepalive
|
|
* timeout is calculated. CNT no longer matters. Each time
|
|
* additional probes (not the first) are sent the user timeout is
|
|
* checked against the last time data was received. If none of the
|
|
* keepalives are responded to then eventually the user timeout applies.
|
|
*
|
|
* Given all this, we start with the overall unresponsive timeout. Then
|
|
* we set the probes to start sending towards the end of the timeout.
|
|
* We give it a few tries for a successful response before the timeout
|
|
* elapses during the probe timer processing after the unsuccessful
|
|
* probes.
|
|
*/
|
|
static int sock_opts_and_names(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
struct socket *sock)
|
|
{
|
|
struct scoutfs_mount_options opts;
|
|
int optval;
|
|
int ret;
|
|
|
|
scoutfs_options_read(sb, &opts);
|
|
|
|
/* we use a keepalive timeout instead of send timeout */
|
|
ret = kc_sock_set_sndtimeo(sock, 0);
|
|
if (ret)
|
|
goto out;
|
|
|
|
/* not checked when user_timeout != 0, but for clarity */
|
|
optval = UNRESPONSIVE_PROBES;
|
|
ret = kc_sock_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
|
|
&optval, sizeof(optval));
|
|
if (ret)
|
|
goto out;
|
|
|
|
optval = (opts.tcp_keepalive_timeout_ms / MSEC_PER_SEC) - UNRESPONSIVE_PROBES;
|
|
ret = kc_tcp_sock_set_keepidle(sock, optval);
|
|
if (ret)
|
|
goto out;
|
|
|
|
optval = 1;
|
|
ret = kc_tcp_sock_set_keepintvl(sock, optval);
|
|
if (ret)
|
|
goto out;
|
|
|
|
optval = opts.tcp_keepalive_timeout_ms;
|
|
ret = kc_tcp_sock_set_user_timeout(sock, optval);
|
|
if (ret)
|
|
goto out;
|
|
|
|
optval = 1;
|
|
ret = kc_sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
|
|
&optval, sizeof(optval));
|
|
if (ret)
|
|
goto out;
|
|
|
|
ret = kc_tcp_sock_set_nodelay(sock);
|
|
if (ret)
|
|
goto out;
|
|
|
|
ret = kc_kernel_getsockname(sock, (struct sockaddr *)&conn->sockname);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
ret = kc_kernel_getpeername(sock, (struct sockaddr *)&conn->peername);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
ret = 0;
|
|
|
|
conn->last_peername = conn->peername;
|
|
|
|
out:
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Each bound and listening connection has long running work that blocks
|
|
* accepting new connections. The listening socket has been setup by
|
|
* the time this is queued.
|
|
*
|
|
* Any errors on the listening sock tear down all the connections that
|
|
* were accepted.
|
|
*/
|
|
static void scoutfs_net_listen_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, listen_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct scoutfs_net_connection *acc_conn;
|
|
DECLARE_WAIT_QUEUE_HEAD(waitq);
|
|
struct socket *acc_sock;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_listen_work_enter(sb, 0, 0);
|
|
|
|
for (;;) {
|
|
ret = kernel_accept(conn->sock, &acc_sock, 0);
|
|
if (ret < 0)
|
|
break;
|
|
|
|
acc_sock->sk->sk_allocation = GFP_NOFS;
|
|
|
|
/* inherit accepted request funcs from listening conn */
|
|
acc_conn = scoutfs_net_alloc_conn(sb, conn->notify_up,
|
|
conn->notify_down,
|
|
conn->info_size,
|
|
conn->req_funcs, "accepted");
|
|
/*
|
|
* scoutfs_net_alloc_conn() can fail due to ENOMEM. If this
|
|
* is the only thing that does so, there's no harm in trying
|
|
* to see if kernel_accept() can get enough memory to try accepting
|
|
* a new connection again. If that then fails with ENOMEM, it'll
|
|
* shut down the conn anyway. So just retry here.
|
|
*/
|
|
if (!acc_conn) {
|
|
sock_release(acc_sock);
|
|
continue;
|
|
}
|
|
|
|
ret = sock_opts_and_names(sb, acc_conn, acc_sock);
|
|
if (ret) {
|
|
sock_release(acc_sock);
|
|
destroy_conn(acc_conn);
|
|
continue;
|
|
}
|
|
|
|
scoutfs_info(sb, "server accepted "SIN_FMT" -> "SIN_FMT,
|
|
SIN_ARG(&acc_conn->sockname),
|
|
SIN_ARG(&acc_conn->peername));
|
|
|
|
/* acc_conn isn't visible, conn unlock orders stores */
|
|
spin_lock(&conn->lock);
|
|
|
|
acc_conn->sock = acc_sock;
|
|
acc_conn->listening_conn = conn;
|
|
set_conn_fl(acc_conn, established);
|
|
list_add_tail(&acc_conn->accepted_head, &conn->accepted_list);
|
|
|
|
trace_scoutfs_conn_accept(acc_conn);
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
queue_work(acc_conn->workq, &acc_conn->recv_work);
|
|
}
|
|
|
|
/* listening stopping shuts down connection */
|
|
shutdown_conn(conn);
|
|
|
|
trace_scoutfs_net_listen_work_exit(sb, 0, ret);
|
|
}
|
|
|
|
/*
|
|
* Try once to connect to the caller's address. This is racing with
|
|
* shutdown if the caller frees the connection while we're connecting.
|
|
* Shutdown will wait for our executing work to finish.
|
|
*/
|
|
static void scoutfs_net_connect_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, connect_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct socket *sock;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_connect_work_enter(sb, 0, 0);
|
|
|
|
ret = kc_sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
|
|
if (ret)
|
|
goto out;
|
|
|
|
sock->sk->sk_allocation = GFP_NOFS;
|
|
|
|
/* caller specified connect timeout, defaults to 1 sec */
|
|
ret = kc_sock_set_sndtimeo(sock, conn->connect_timeout_ms / MSEC_PER_SEC);
|
|
if (ret) {
|
|
sock_release(sock);
|
|
goto out;
|
|
}
|
|
|
|
/* shutdown now owns sock, can break blocking connect */
|
|
spin_lock(&conn->lock);
|
|
conn->sock = sock;
|
|
spin_unlock(&conn->lock);
|
|
|
|
trace_scoutfs_conn_connect_start(conn);
|
|
|
|
ret = kernel_connect(sock, (struct sockaddr *)&conn->connect_sin,
|
|
sizeof(struct sockaddr_in), 0);
|
|
if (ret)
|
|
goto out;
|
|
|
|
ret = sock_opts_and_names(sb, conn, sock);
|
|
if (ret)
|
|
goto out;
|
|
|
|
scoutfs_info(sb, "client connected "SIN_FMT" -> "SIN_FMT,
|
|
SIN_ARG(&conn->sockname),
|
|
SIN_ARG(&conn->peername));
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
/* clear greeting state for next negotiation */
|
|
clear_conn_fl(conn, valid_greeting);
|
|
set_conn_fl(conn, established);
|
|
wake_up(&conn->waitq);
|
|
|
|
trace_scoutfs_conn_connect_complete(conn);
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
queue_work(conn->workq, &conn->recv_work);
|
|
out:
|
|
if (ret)
|
|
shutdown_conn(conn);
|
|
|
|
trace_scoutfs_net_connect_work_exit(sb, 0, ret);
|
|
}
|
|
|
|
static bool empty_accepted_list(struct scoutfs_net_connection *conn)
|
|
{
|
|
bool empty;
|
|
|
|
spin_lock(&conn->lock);
|
|
empty = list_empty(&conn->accepted_list);
|
|
spin_unlock(&conn->lock);
|
|
|
|
return empty;
|
|
}
|
|
|
|
/*
|
|
* Safely shut down an active connection. This can be triggered by
|
|
* errors in workers or by an external call to free the connection. The
|
|
* shutting down flag ensures that this only executes once for each live
|
|
* socket.
|
|
*/
|
|
static void scoutfs_net_shutdown_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, shutdown_work);
|
|
struct super_block *sb = conn->sb;
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct scoutfs_net_connection *listener;
|
|
struct scoutfs_net_connection *acc_conn;
|
|
scoutfs_net_response_t resp_func;
|
|
struct message_send *msend;
|
|
struct message_send *tmp;
|
|
unsigned long delay;
|
|
void *resp_data;
|
|
|
|
trace_scoutfs_net_shutdown_work_enter(sb, 0, 0);
|
|
trace_scoutfs_conn_shutdown_start(conn);
|
|
|
|
/* connected and accepted conns print a message */
|
|
if (conn->peername.sin_port != 0)
|
|
scoutfs_info(sb, "%s "SIN_FMT" -> "SIN_FMT,
|
|
conn->listening_conn ? "server closing" :
|
|
"client disconnected",
|
|
SIN_ARG(&conn->sockname),
|
|
SIN_ARG(&conn->peername));
|
|
|
|
/* ensure that sockets return errors, wakes blocked socket work */
|
|
if (conn->sock)
|
|
kernel_sock_shutdown(conn->sock, SHUT_RDWR);
|
|
|
|
/* wait for socket and proc work to finish, includes chained work */
|
|
drain_workqueue(conn->workq);
|
|
|
|
/* tear down the sock now that all work is done */
|
|
if (conn->sock) {
|
|
sock_release(conn->sock);
|
|
conn->sock = NULL;
|
|
}
|
|
|
|
memset(&conn->peername, 0, sizeof(conn->peername));
|
|
|
|
/* listening connections shut down all the connections they accepted */
|
|
spin_lock_nested(&conn->lock, CONN_LOCK_LISTENER);
|
|
list_for_each_entry(acc_conn, &conn->accepted_list, accepted_head) {
|
|
spin_lock_nested(&acc_conn->lock, CONN_LOCK_ACCEPTED);
|
|
shutdown_conn_locked(acc_conn);
|
|
spin_unlock(&acc_conn->lock);
|
|
}
|
|
spin_unlock(&conn->lock);
|
|
|
|
/* free any conns waiting for reconnection */
|
|
cancel_delayed_work_sync(&conn->reconn_free_dwork);
|
|
queue_delayed_work(conn->workq, &conn->reconn_free_dwork, 0);
|
|
/* relies on delay 0 scheduling immediately so no timer to cancel */
|
|
flush_delayed_work(&conn->reconn_free_dwork);
|
|
|
|
/* and wait for accepted conn shutdown work to finish */
|
|
wait_event(conn->waitq, empty_accepted_list(conn));
|
|
|
|
/*
|
|
* Forced unmount will cause net submit to fail once it's
|
|
* started and it calls shutdown to interrupt any previous
|
|
* senders waiting for a response. The response callbacks can
|
|
* do quite a lot of work so we're careful to call them outside
|
|
* the lock.
|
|
*/
|
|
if (scoutfs_forcing_unmount(sb)) {
|
|
spin_lock(&conn->lock);
|
|
list_splice_tail_init(&conn->send_queue, &conn->resend_queue);
|
|
while ((msend = list_first_entry_or_null(&conn->resend_queue,
|
|
struct message_send, head))) {
|
|
resp_func = msend->resp_func;
|
|
resp_data = msend->resp_data;
|
|
free_msend(ninf, conn, msend);
|
|
spin_unlock(&conn->lock);
|
|
|
|
call_resp_func(sb, conn, resp_func, resp_data, NULL, 0, -ECONNABORTED);
|
|
|
|
spin_lock(&conn->lock);
|
|
}
|
|
spin_unlock(&conn->lock);
|
|
}
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
/* greetings aren't resent across sockets */
|
|
list_splice_tail_init(&conn->send_queue, &conn->resend_queue);
|
|
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head) {
|
|
if (msend->nh.cmd == SCOUTFS_NET_CMD_GREETING)
|
|
free_msend(ninf, conn, msend);
|
|
}
|
|
|
|
clear_conn_fl(conn, saw_greeting);
|
|
|
|
/* signal connect failure */
|
|
memset(&conn->connect_sin, 0, sizeof(conn->connect_sin));
|
|
wake_up(&conn->waitq);
|
|
|
|
/* resolve racing with listener shutdown with locked shutting_down */
|
|
if (conn->listening_conn &&
|
|
(test_conn_fl(conn->listening_conn, shutting_down) ||
|
|
test_conn_fl(conn, saw_farewell))) {
|
|
|
|
/* free accepted sockets after farewell or listener shutdown */
|
|
spin_unlock(&conn->lock);
|
|
destroy_conn(conn);
|
|
|
|
} else {
|
|
|
|
if (conn->listening_conn) {
|
|
/* server accepted sockets wait for reconnect */
|
|
listener = conn->listening_conn;
|
|
delay = msecs_to_jiffies(CLIENT_RECONNECT_TIMEOUT_MS);
|
|
set_conn_fl(conn, reconn_wait);
|
|
conn->reconn_deadline = jiffies + delay;
|
|
queue_delayed_work(listener->workq,
|
|
&listener->reconn_free_dwork, delay);
|
|
} else {
|
|
/* clients and listeners can retry */
|
|
clear_conn_fl(conn, shutting_down);
|
|
if (conn->notify_down)
|
|
conn->notify_down(sb, conn, conn->info,
|
|
conn->rid);
|
|
}
|
|
|
|
trace_scoutfs_conn_shutdown_complete(conn);
|
|
spin_unlock(&conn->lock);
|
|
}
|
|
|
|
trace_scoutfs_net_shutdown_work_exit(sb, 0, 0);
|
|
}
|
|
|
|
/*
|
|
* Free any connections that have been shutdown for too long without the
|
|
* client reconnecting. This runs in work on the listening connection.
|
|
* It's racing with connection attempts searching for shutdown
|
|
* connections to steal state from. Shutdown cancels the work and waits
|
|
* for it to finish.
|
|
*
|
|
* Connections are currently freed without the lock held so this walks
|
|
* the entire list every time it frees a connection. This is irritating
|
|
* but timed out connections are rare and client counts are relatively
|
|
* low given a cpu's ability to burn through the list.
|
|
*/
|
|
static void scoutfs_net_reconn_free_worker(struct work_struct *work)
|
|
{
|
|
DEFINE_CONN_FROM_WORK(conn, work, reconn_free_dwork.work);
|
|
struct super_block *sb = conn->sb;
|
|
struct scoutfs_net_connection *acc;
|
|
unsigned long now = jiffies;
|
|
unsigned long deadline = 0;
|
|
bool requeue = false;
|
|
int ret;
|
|
|
|
trace_scoutfs_net_reconn_free_work_enter(sb, 0, 0);
|
|
|
|
restart:
|
|
spin_lock(&conn->lock);
|
|
list_for_each_entry(acc, &conn->accepted_list, accepted_head) {
|
|
|
|
if (test_conn_fl(acc, reconn_wait) &&
|
|
!test_conn_fl(acc, reconn_freeing) &&
|
|
(test_conn_fl(conn, shutting_down) ||
|
|
time_after_eq(now, acc->reconn_deadline))) {
|
|
set_conn_fl(acc, reconn_freeing);
|
|
spin_unlock(&conn->lock);
|
|
if (!test_conn_fl(conn, shutting_down)) {
|
|
scoutfs_info(sb, "client "SIN_FMT" reconnect timed out, fencing",
|
|
SIN_ARG(&acc->last_peername));
|
|
ret = scoutfs_fence_start(sb, acc->rid,
|
|
acc->last_peername.sin_addr.s_addr,
|
|
SCOUTFS_FENCE_CLIENT_RECONNECT);
|
|
if (ret) {
|
|
scoutfs_err(sb, "client fence returned err %d, shutting down server",
|
|
ret);
|
|
scoutfs_server_stop(sb);
|
|
}
|
|
}
|
|
destroy_conn(acc);
|
|
goto restart;
|
|
}
|
|
|
|
/* calc delay of next work, can drift a bit */
|
|
if (test_conn_fl(acc, reconn_wait) &&
|
|
!test_conn_fl(acc, reconn_freeing) &&
|
|
(!requeue || time_before(now, deadline))) {
|
|
requeue = true;
|
|
deadline = acc->reconn_deadline;
|
|
}
|
|
}
|
|
spin_unlock(&conn->lock);
|
|
|
|
if (requeue)
|
|
queue_delayed_work(conn->workq, &conn->reconn_free_dwork,
|
|
deadline - now);
|
|
|
|
trace_scoutfs_net_reconn_free_work_exit(sb, 0, 0);
|
|
}
|
|
|
|
/*
|
|
* Accepted connections inherit the callbacks from their listening
|
|
* connection.
|
|
*
|
|
* notify_up is called once a valid greeting is received. rid is
|
|
* non-zero on accepted sockets once they've seen a valid greeting.
|
|
* Connected and listening connections have a rid of 0.
|
|
*
|
|
* notify_down is always called as connections are shut down. It can be
|
|
* called without notify_up ever being called. The rid is only
|
|
* non-zero for accepted connections.
|
|
*/
|
|
struct scoutfs_net_connection *
|
|
scoutfs_net_alloc_conn(struct super_block *sb,
|
|
scoutfs_net_notify_t notify_up,
|
|
scoutfs_net_notify_t notify_down, size_t info_size,
|
|
scoutfs_net_request_t *req_funcs, char *name_suffix)
|
|
{
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct scoutfs_net_connection *conn;
|
|
unsigned int nr;
|
|
unsigned int i;
|
|
|
|
nr = min_t(unsigned int, num_possible_cpus(),
|
|
PAGE_SIZE / sizeof(struct scoutfs_work_list));
|
|
|
|
conn = kzalloc(sizeof(struct scoutfs_net_connection), GFP_NOFS);
|
|
if (conn) {
|
|
if (info_size)
|
|
conn->info = kzalloc(info_size, GFP_NOFS);
|
|
conn->ordered_proc_wlists = kmalloc_array(nr, sizeof(struct scoutfs_work_list),
|
|
GFP_NOFS);
|
|
conn->workq = alloc_workqueue("scoutfs_net_%s",
|
|
WQ_UNBOUND | WQ_NON_REENTRANT, 0,
|
|
name_suffix);
|
|
}
|
|
if (!conn || (info_size && !conn->info) || !conn->workq || !conn->ordered_proc_wlists) {
|
|
if (conn) {
|
|
kfree(conn->info);
|
|
kfree(conn->ordered_proc_wlists);
|
|
if (conn->workq)
|
|
destroy_workqueue(conn->workq);
|
|
kfree(conn);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
conn->sb = sb;
|
|
conn->notify_up = notify_up;
|
|
conn->notify_down = notify_down;
|
|
conn->info_size = info_size;
|
|
conn->req_funcs = req_funcs;
|
|
spin_lock_init(&conn->lock);
|
|
init_waitqueue_head(&conn->waitq);
|
|
conn->sockname.sin_family = AF_INET;
|
|
conn->peername.sin_family = AF_INET;
|
|
conn->last_peername.sin_family = AF_INET;
|
|
INIT_LIST_HEAD(&conn->accepted_head);
|
|
INIT_LIST_HEAD(&conn->accepted_list);
|
|
conn->next_send_seq = 1;
|
|
conn->next_send_id = 1;
|
|
atomic64_set(&conn->recv_seq, 0);
|
|
INIT_LIST_HEAD(&conn->send_queue);
|
|
INIT_LIST_HEAD(&conn->resend_queue);
|
|
conn->req_root = RB_ROOT;
|
|
conn->resp_root = RB_ROOT;
|
|
INIT_WORK(&conn->listen_work, scoutfs_net_listen_worker);
|
|
INIT_WORK(&conn->connect_work, scoutfs_net_connect_worker);
|
|
INIT_WORK(&conn->send_work, scoutfs_net_send_worker);
|
|
INIT_WORK(&conn->recv_work, scoutfs_net_recv_worker);
|
|
INIT_WORK(&conn->shutdown_work, scoutfs_net_shutdown_worker);
|
|
INIT_WORK(&conn->destroy_work, scoutfs_net_destroy_worker);
|
|
INIT_DELAYED_WORK(&conn->reconn_free_dwork,
|
|
scoutfs_net_reconn_free_worker);
|
|
|
|
conn->ordered_proc_nr = nr;
|
|
for (i = 0; i < nr; i++) {
|
|
INIT_WORK(&conn->ordered_proc_wlists[i].work, scoutfs_net_ordered_proc_worker);
|
|
spin_lock_init(&conn->ordered_proc_wlists[i].lock);
|
|
INIT_LIST_HEAD(&conn->ordered_proc_wlists[i].list);
|
|
}
|
|
|
|
scoutfs_tseq_add(&ninf->conn_tseq_tree, &conn->tseq_entry);
|
|
trace_scoutfs_conn_alloc(conn);
|
|
|
|
return conn;
|
|
}
|
|
|
|
/*
|
|
* Give the caller the client rid of the connection. This used by rare
|
|
* server processing callers who want to send async responses after
|
|
* request processing has returned. We didn't want the churn of
|
|
* providing the requesting rid to all the request handlers, but we
|
|
* probably should.
|
|
*/
|
|
u64 scoutfs_net_client_rid(struct scoutfs_net_connection *conn)
|
|
{
|
|
return conn->rid;
|
|
}
|
|
|
|
/*
|
|
* Shutdown the connection. Once this returns no network traffic
|
|
* or work will be executing. The caller can then connect or bind and
|
|
* listen again. Additional shutdown calls will already find it shutdown.
|
|
*/
|
|
void scoutfs_net_shutdown(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn)
|
|
{
|
|
shutdown_conn(conn);
|
|
flush_work(&conn->shutdown_work);
|
|
flush_work(&conn->destroy_work);
|
|
}
|
|
|
|
/*
|
|
* Destroy the connection after the shutdown work has stopped all concurrent
|
|
* processing on the connection.
|
|
*/
|
|
void scoutfs_net_free_conn(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn)
|
|
{
|
|
if (conn) {
|
|
scoutfs_net_shutdown(sb, conn);
|
|
destroy_conn(conn);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Associate a bound socket with the caller's connection. We call bind
|
|
* and listen to assign the listening address and give it to the caller.
|
|
*
|
|
* If this returns success then the caller has to call either listen or
|
|
* free_conn.
|
|
*/
|
|
int scoutfs_net_bind(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
struct sockaddr_in *sin)
|
|
{
|
|
struct socket *sock = NULL;
|
|
int addrlen;
|
|
int optval;
|
|
int ret;
|
|
|
|
/* caller state machine shouldn't let this happen */
|
|
if (WARN_ON_ONCE(conn->sock))
|
|
return -EINVAL;
|
|
|
|
ret = kc_sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
|
|
if (ret)
|
|
goto out;
|
|
|
|
sock->sk->sk_allocation = GFP_NOFS;
|
|
|
|
optval = 1;
|
|
ret = kc_sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
|
|
&optval, sizeof(optval));
|
|
if (ret)
|
|
goto out;
|
|
|
|
addrlen = sizeof(struct sockaddr_in);
|
|
ret = kernel_bind(sock, (struct sockaddr *)sin, addrlen);
|
|
if (ret)
|
|
goto out;
|
|
|
|
ret = kernel_listen(sock, 255);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
ret = kc_kernel_getsockname(sock, (struct sockaddr *)&conn->sockname);
|
|
if (ret < 0)
|
|
goto out;
|
|
|
|
ret = 0;
|
|
|
|
conn->sock = sock;
|
|
*sin = conn->sockname;
|
|
|
|
out:
|
|
if (ret < 0 && sock)
|
|
sock_release(sock);
|
|
return ret;
|
|
}
|
|
|
|
/*
|
|
* Kick off blocking background work to accept connections from the
|
|
* connection's listening socket that was created with a previous bind
|
|
* call.
|
|
*
|
|
* The callback notify_down will be called once the listening socket is
|
|
* shut down either by errors or the caller freeing the conn.
|
|
*/
|
|
void scoutfs_net_listen(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn)
|
|
{
|
|
queue_work(conn->workq, &conn->listen_work);
|
|
}
|
|
|
|
/*
|
|
* Return once a connection attempt has completed either successfully
|
|
* or in error.
|
|
*/
|
|
static bool connect_result(struct scoutfs_net_connection *conn, int *error)
|
|
{
|
|
bool done = false;
|
|
|
|
spin_lock(&conn->lock);
|
|
if (test_conn_fl(conn, established)) {
|
|
done = true;
|
|
*error = 0;
|
|
} else if (test_conn_fl(conn, shutting_down) ||
|
|
conn->connect_sin.sin_family == 0) {
|
|
done = true;
|
|
*error = -ESHUTDOWN;
|
|
}
|
|
trace_scoutfs_conn_connect_result(conn);
|
|
spin_unlock(&conn->lock);
|
|
|
|
return done;
|
|
}
|
|
|
|
/*
|
|
* Connect to the given address. An error is returned if the socket was
|
|
* not connected before the given timeout. The connection isn't fully
|
|
* active until the connecting caller starts greeting negotiation by
|
|
* sending the initial greeting request.
|
|
*
|
|
* The conn notify_down callback can be called as the connection is
|
|
* shutdown before this returns.
|
|
*/
|
|
int scoutfs_net_connect(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
struct sockaddr_in *sin, unsigned long timeout_ms)
|
|
{
|
|
int ret = 0;
|
|
|
|
spin_lock(&conn->lock);
|
|
conn->connect_sin = *sin;
|
|
conn->connect_timeout_ms = timeout_ms;
|
|
spin_unlock(&conn->lock);
|
|
|
|
queue_work(conn->workq, &conn->connect_work);
|
|
wait_event(conn->waitq, connect_result(conn, &ret));
|
|
return ret;
|
|
}
|
|
|
|
static void set_valid_greeting(struct scoutfs_net_connection *conn)
|
|
{
|
|
assert_spin_locked(&conn->lock);
|
|
|
|
/* recv should have dropped invalid duplicate greeting messages */
|
|
BUG_ON(test_conn_fl(conn, valid_greeting));
|
|
|
|
set_conn_fl(conn, valid_greeting);
|
|
list_splice_tail_init(&conn->resend_queue, &conn->send_queue);
|
|
queue_work(conn->workq, &conn->send_work);
|
|
}
|
|
|
|
/*
|
|
* The client has received a valid greeting from the server. Send
|
|
* can proceed and we might need to reset our recv state if we reconnected
|
|
* to a new server.
|
|
*/
|
|
void scoutfs_net_client_greeting(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
bool new_server)
|
|
{
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
struct message_send *msend;
|
|
struct message_send *tmp;
|
|
|
|
/* only called on client connections :/ */
|
|
BUG_ON(conn->listening_conn);
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
if (new_server) {
|
|
atomic64_set(&conn->recv_seq, 0);
|
|
list_for_each_entry_safe(msend, tmp, &conn->resend_queue, head){
|
|
if (nh_is_response(&msend->nh))
|
|
free_msend(ninf, conn, msend);
|
|
}
|
|
}
|
|
|
|
set_valid_greeting(conn);
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
/* client up/down drives reconnect */
|
|
if (conn->notify_up)
|
|
conn->notify_up(sb, conn, conn->info, 0);
|
|
}
|
|
|
|
/*
|
|
* The calling server has received a valid greeting from a client. If
|
|
* the client is reconnecting to us then we need to find its old
|
|
* connection that held its state and transfer it to this connection
|
|
* (connection and socket life cycles make this easier than migrating
|
|
* the socket between the connections).
|
|
*
|
|
* The previous connection that holds the client's state might still be
|
|
* in active use depending on network failure and work processing races.
|
|
* We shut it down before migrating its message state. We can be
|
|
* processing greetings from multiple reconnecting sockets that are all
|
|
* referring to the same original connection. We use the increasing
|
|
* greeting id to have the most recent connection attempt win.
|
|
*
|
|
* A node can be reconnecting to us for the first time. It will notice
|
|
* the new server term and take steps to recover.
|
|
*
|
|
* A client can be reconnecting to us after we've destroyed their state.
|
|
* This is fatal for the client if they just took too long to reconnect.
|
|
* But this can also happen if something disconnects the socket after
|
|
* we've sent a farewell response before the client received it. In
|
|
* this case we let the client reconnect so we can resend the farewell
|
|
* response and they can disconnect cleanly.
|
|
*
|
|
* At this point our connection is idle except for send submissions and
|
|
* shutdown being queued. We have exclusive access to the previous conn
|
|
* once it's shutdown and we set _freeing.
|
|
*/
|
|
void scoutfs_net_server_greeting(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u64 rid, u64 greeting_id,
|
|
bool reconnecting, bool first_contact,
|
|
bool farewell)
|
|
{
|
|
struct scoutfs_net_connection *listener;
|
|
struct scoutfs_net_connection *reconn;
|
|
struct scoutfs_net_connection *acc;
|
|
|
|
/* only called on accepted server connections :/ */
|
|
BUG_ON(!conn->listening_conn);
|
|
|
|
/* see if we have a previous conn for the client's sent rid */
|
|
reconn = NULL;
|
|
if (reconnecting) {
|
|
listener = conn->listening_conn;
|
|
restart:
|
|
spin_lock_nested(&listener->lock, CONN_LOCK_LISTENER);
|
|
list_for_each_entry(acc, &listener->accepted_list,
|
|
accepted_head) {
|
|
if (acc->rid != rid ||
|
|
acc->greeting_id >= greeting_id ||
|
|
test_conn_fl(acc, reconn_freeing))
|
|
continue;
|
|
|
|
if (!test_conn_fl(acc, reconn_wait)) {
|
|
spin_lock_nested(&acc->lock,
|
|
CONN_LOCK_ACCEPTED);
|
|
shutdown_conn_locked(acc);
|
|
spin_unlock(&acc->lock);
|
|
spin_unlock(&listener->lock);
|
|
msleep(10); /* XXX might be freed :/ */
|
|
goto restart;
|
|
}
|
|
|
|
reconn = acc;
|
|
set_conn_fl(reconn, reconn_freeing);
|
|
break;
|
|
}
|
|
spin_unlock(&listener->lock);
|
|
}
|
|
|
|
/* drop a connection if we can't find its necessary old conn */
|
|
if (reconnecting && !reconn && !first_contact && !farewell) {
|
|
shutdown_conn(conn);
|
|
return;
|
|
}
|
|
|
|
/* migrate state from previous conn for this reconnecting rid */
|
|
if (reconn) {
|
|
spin_lock(&conn->lock);
|
|
|
|
assign_conn_fl(conn, reconn, saw_farewell);
|
|
conn->next_send_seq = reconn->next_send_seq;
|
|
conn->next_send_id = reconn->next_send_id;
|
|
atomic64_set(&conn->recv_seq, atomic64_read(&reconn->recv_seq));
|
|
|
|
/* reconn should be idle while in reconn_wait */
|
|
BUG_ON(!list_empty(&reconn->send_queue));
|
|
/* queued greeting response is racing, can be in send or resend queue */
|
|
list_splice_tail_init(&reconn->resend_queue, &conn->resend_queue);
|
|
move_sorted_msends(conn, &conn->req_root, reconn, &reconn->req_root);
|
|
move_sorted_msends(conn, &conn->resp_root, reconn, &reconn->resp_root);
|
|
|
|
/* new conn info is unused, swap, old won't call down */
|
|
swap(conn->info, reconn->info);
|
|
reconn->notify_down = NULL;
|
|
|
|
trace_scoutfs_conn_reconn_migrate(conn);
|
|
spin_unlock(&conn->lock);
|
|
|
|
/* we set _freeing */
|
|
destroy_conn(reconn);
|
|
}
|
|
|
|
spin_lock(&conn->lock);
|
|
|
|
conn->rid = rid;
|
|
conn->greeting_id = greeting_id;
|
|
set_valid_greeting(conn);
|
|
|
|
spin_unlock(&conn->lock);
|
|
|
|
/* only call notify_up the first time we see the rid */
|
|
if (conn->notify_up && first_contact)
|
|
conn->notify_up(sb, conn, conn->info, rid);
|
|
}
|
|
|
|
/*
|
|
* Submit a request down the connection. It's up to the caller to
|
|
* ensure that the conn is allocated. Sends submitted when the
|
|
* connection isn't established will be resent in order the next time
|
|
* it's established.
|
|
*/
|
|
int scoutfs_net_submit_request(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u8 cmd, void *arg, u16 arg_len,
|
|
scoutfs_net_response_t resp_func,
|
|
void *resp_data, u64 *id_ret)
|
|
{
|
|
return submit_send(sb, conn, 0, cmd, 0, 0, 0, arg, arg_len,
|
|
resp_func, resp_data, id_ret);
|
|
}
|
|
|
|
/*
|
|
* Send a request to a specific rid that was accepted by this listening
|
|
* connection.
|
|
*/
|
|
int scoutfs_net_submit_request_node(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u64 rid, u8 cmd,
|
|
void *arg, u16 arg_len,
|
|
scoutfs_net_response_t resp_func,
|
|
void *resp_data, u64 *id_ret)
|
|
{
|
|
return submit_send(sb, conn, rid, cmd, 0, 0, 0, arg, arg_len,
|
|
resp_func, resp_data, id_ret);
|
|
}
|
|
|
|
/*
|
|
* Send a response. Responses don't get callbacks and use the request's
|
|
* id so caller's don't need to get an id in return.
|
|
*
|
|
* An error is returned if the response could not be sent.
|
|
*/
|
|
int scoutfs_net_response(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u8 cmd, u64 id, int error, void *resp, u16 resp_len)
|
|
{
|
|
if (error) {
|
|
resp = NULL;
|
|
resp_len = 0;
|
|
}
|
|
|
|
return submit_send(sb, conn, 0, cmd, SCOUTFS_NET_FLAG_RESPONSE, id,
|
|
net_err_from_host(sb, error), resp, resp_len,
|
|
NULL, NULL, NULL);
|
|
}
|
|
|
|
int scoutfs_net_response_node(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u64 rid, u8 cmd, u64 id, int error,
|
|
void *resp, u16 resp_len)
|
|
{
|
|
if (error) {
|
|
resp = NULL;
|
|
resp_len = 0;
|
|
}
|
|
|
|
return submit_send(sb, conn, rid, cmd, SCOUTFS_NET_FLAG_RESPONSE,
|
|
id, net_err_from_host(sb, error), resp, resp_len,
|
|
NULL, NULL, NULL);
|
|
}
|
|
|
|
struct sync_request_completion {
|
|
struct completion comp;
|
|
void *resp;
|
|
unsigned int resp_len;
|
|
int error;
|
|
};
|
|
|
|
static int sync_response(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
void *resp, unsigned int resp_len,
|
|
int error, void *data)
|
|
{
|
|
struct sync_request_completion *sreq = data;
|
|
|
|
if (error == 0 && resp_len != sreq->resp_len)
|
|
error = -EMSGSIZE;
|
|
|
|
if (error)
|
|
sreq->error = error;
|
|
else if (resp_len)
|
|
memcpy(sreq->resp, resp, resp_len);
|
|
|
|
complete(&sreq->comp);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Send a request and wait for a response to be copied into the given
|
|
* buffer. Errors returned can come from the remote request processing
|
|
* or local failure to send.
|
|
*
|
|
* The wait for the response is interruptible and can return
|
|
* -ERESTARTSYS if it is interrupted.
|
|
*
|
|
* -EOVERFLOW is returned if the response message's data_length doesn't
|
|
* match the caller's resp_len buffer.
|
|
*/
|
|
int scoutfs_net_sync_request(struct super_block *sb,
|
|
struct scoutfs_net_connection *conn,
|
|
u8 cmd, void *arg, unsigned arg_len,
|
|
void *resp, size_t resp_len)
|
|
{
|
|
struct sync_request_completion sreq;
|
|
int ret;
|
|
u64 id;
|
|
|
|
init_completion(&sreq.comp);
|
|
sreq.resp = resp;
|
|
sreq.resp_len = resp_len;
|
|
sreq.error = 0;
|
|
|
|
ret = scoutfs_net_submit_request(sb, conn, cmd, arg, arg_len,
|
|
sync_response, &sreq, &id);
|
|
|
|
if (ret == 0) {
|
|
wait_for_completion(&sreq.comp);
|
|
ret = sreq.error;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void net_tseq_show_conn(struct seq_file *m,
|
|
struct scoutfs_tseq_entry *ent)
|
|
{
|
|
struct scoutfs_net_connection *conn =
|
|
container_of(ent, struct scoutfs_net_connection, tseq_entry);
|
|
|
|
seq_printf(m, "name "SIN_FMT" peer "SIN_FMT" rid %016llx greeting_id %llu vg %u est %u sd %u sg %u sf %u rw %u rf %u cto_ms rdl_j %lu %lu nss %llu rs %llu nsi %llu\n",
|
|
SIN_ARG(&conn->sockname), SIN_ARG(&conn->peername),
|
|
conn->rid, conn->greeting_id,
|
|
test_conn_fl(conn, valid_greeting),
|
|
test_conn_fl(conn, established),
|
|
test_conn_fl(conn, shutting_down),
|
|
test_conn_fl(conn, saw_greeting),
|
|
test_conn_fl(conn, saw_farewell),
|
|
test_conn_fl(conn, reconn_wait),
|
|
test_conn_fl(conn, reconn_freeing),
|
|
conn->connect_timeout_ms, conn->reconn_deadline,
|
|
conn->next_send_seq, (u64)atomic64_read(&conn->recv_seq),
|
|
conn->next_send_id);
|
|
}
|
|
|
|
/*
|
|
* How's this for sneaky?! We line up the structs so that the entries
|
|
* and function pointers are at the same offsets. recv's function
|
|
* pointer value is known and can't be found in send's.
|
|
*/
|
|
static bool tseq_entry_is_recv(struct scoutfs_tseq_entry *ent)
|
|
{
|
|
struct message_recv *mrecv =
|
|
container_of(ent, struct message_recv, tseq_entry);
|
|
|
|
BUILD_BUG_ON(offsetof(struct message_recv, tseq_entry) !=
|
|
offsetof(struct message_send, tseq_entry));
|
|
BUILD_BUG_ON(offsetof(struct message_recv, proc_work.func) !=
|
|
offsetof(struct message_send, resp_func));
|
|
|
|
return mrecv->proc_work.func == scoutfs_net_proc_worker;
|
|
}
|
|
|
|
static void net_tseq_show_msg(struct seq_file *m,
|
|
struct scoutfs_tseq_entry *ent)
|
|
{
|
|
struct message_send *msend;
|
|
struct message_recv *mrecv;
|
|
|
|
if (tseq_entry_is_recv(ent)) {
|
|
mrecv = container_of(ent, struct message_recv, tseq_entry);
|
|
|
|
seq_printf(m, "recv "SNH_FMT"\n", SNH_ARG(&mrecv->nh));
|
|
} else {
|
|
msend = container_of(ent, struct message_send, tseq_entry);
|
|
|
|
seq_printf(m, "send "SNH_FMT"\n", SNH_ARG(&msend->nh));
|
|
}
|
|
}
|
|
|
|
int scoutfs_net_setup(struct super_block *sb)
|
|
{
|
|
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
|
|
struct net_info *ninf;
|
|
int ret;
|
|
|
|
/* fail the build if host errnos don't fit in the u8 mapping arrays */
|
|
#undef EXPAND_NET_ERRNO
|
|
#define EXPAND_NET_ERRNO(which) BUILD_BUG_ON(which >= U8_MAX);
|
|
EXPAND_EACH_NET_ERRNO
|
|
|
|
ninf = kzalloc(sizeof(struct net_info), GFP_KERNEL);
|
|
if (!ninf) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
sbi->net_info = ninf;
|
|
|
|
scoutfs_tseq_tree_init(&ninf->conn_tseq_tree, net_tseq_show_conn);
|
|
scoutfs_tseq_tree_init(&ninf->msg_tseq_tree, net_tseq_show_msg);
|
|
|
|
ninf->shutdown_workq = alloc_workqueue("scoutfs_net_shutdown",
|
|
WQ_UNBOUND | WQ_NON_REENTRANT,
|
|
0);
|
|
ninf->destroy_workq = alloc_workqueue("scoutfs_net_destroy",
|
|
WQ_UNBOUND | WQ_NON_REENTRANT,
|
|
0);
|
|
if (!ninf->shutdown_workq || !ninf->destroy_workq) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
|
|
ninf->conn_tseq_dentry = scoutfs_tseq_create("connections",
|
|
sbi->debug_root,
|
|
&ninf->conn_tseq_tree);
|
|
if (!ninf->conn_tseq_dentry) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
|
|
ninf->msg_tseq_dentry = scoutfs_tseq_create("messages",
|
|
sbi->debug_root,
|
|
&ninf->msg_tseq_tree);
|
|
if (!ninf->msg_tseq_dentry) {
|
|
ret = -ENOMEM;
|
|
goto out;
|
|
}
|
|
|
|
ret = 0;
|
|
out:
|
|
if (ret)
|
|
scoutfs_net_destroy(sb);
|
|
return ret;
|
|
}
|
|
|
|
void scoutfs_net_destroy(struct super_block *sb)
|
|
{
|
|
struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb);
|
|
struct net_info *ninf = SCOUTFS_SB(sb)->net_info;
|
|
|
|
if (ninf) {
|
|
if (ninf->shutdown_workq)
|
|
destroy_workqueue(ninf->shutdown_workq);
|
|
if (ninf->destroy_workq)
|
|
destroy_workqueue(ninf->destroy_workq);
|
|
debugfs_remove(ninf->conn_tseq_dentry);
|
|
debugfs_remove(ninf->msg_tseq_dentry);
|
|
kfree(ninf);
|
|
sbi->net_info = NULL;
|
|
}
|
|
}
|