/* * Copyright (C) 2018 Versity Software, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License v2 as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. */ #include #include #include #include #include #include #include #include #include #include #include #include "format.h" #include "counters.h" #include "inode.h" #include "block.h" #include "radix.h" #include "btree.h" #include "scoutfs_trace.h" #include "msg.h" #include "server.h" #include "net.h" #include "lock_server.h" #include "endian_swap.h" #include "quorum.h" /* * Every active mount can act as the server that listens on a net * connection and accepts connections from all the other mounts acting * as clients. * * The server is started by the mount that is elected leader by quorum. * If it sees errors it shuts down the server in the hopes that another * mount will become the leader and have less trouble. */ struct server_info { struct super_block *sb; spinlock_t lock; wait_queue_head_t waitq; struct workqueue_struct *wq; struct work_struct work; int err; bool shutting_down; struct completion start_comp; struct sockaddr_in listen_sin; u64 term; struct scoutfs_net_connection *conn; /* request processing coordinates shared commits */ struct rw_semaphore commit_rwsem; struct llist_head commit_waiters; struct work_struct commit_work; /* server tracks seq use */ struct rw_semaphore seq_rwsem; struct rw_semaphore alloc_rwsem; struct list_head clients; unsigned long nr_clients; /* track clients waiting in unmmount for farewell response */ struct mutex farewell_mutex; struct list_head farewell_requests; struct work_struct farewell_work; struct scoutfs_radix_allocator alloc; struct scoutfs_block_writer wri; struct mutex logs_mutex; }; #define DECLARE_SERVER_INFO(sb, name) \ struct server_info *name = SCOUTFS_SB(sb)->server_info /* * The server tracks each connected client. */ struct server_client_info { u64 rid; struct list_head head; }; struct commit_waiter { struct completion comp; struct llist_node node; int ret; }; static void stop_server(struct server_info *server) { /* wait_event/wake_up provide barriers */ server->shutting_down = true; wake_up(&server->waitq); } /* * This is called while still holding the rwsem that prevents commits so * that the caller can be sure to be woken by the next commit after they * queue and release the lock. * * It's important to realize that the caller's commit_waiter list node * might be serviced by a currently running commit work while queueing * another work run in the future. This caller can return from * wait_for_commit() while the commit_work is still queued. * * This could queue delayed work but we're first trying to have batching * work by having concurrent modification line up behind a commit in * flight. Once the commit finishes it'll unlock and hopefully everyone * will race to make their changes and they'll all be applied by the * next commit after that. */ static void queue_commit_work(struct server_info *server, struct commit_waiter *cw) { lockdep_assert_held(&server->commit_rwsem); cw->ret = 0; init_completion(&cw->comp); llist_add(&cw->node, &server->commit_waiters); queue_work(server->wq, &server->commit_work); } /* * Wait for a commit during request processing and return its status. */ static inline int wait_for_commit(struct commit_waiter *cw) { wait_for_completion(&cw->comp); return cw->ret; } /* * A core function of request processing is to modify the manifest and * allocator. Often the processing needs to make the modifications * persistent before replying. We'd like to batch these commits as much * as is reasonable so that we don't degrade to a few IO round trips per * request. * * Getting that batching right is bound up in the concurrency of request * processing so a clear way to implement the batched commits is to * implement commits with a single pending work func like the * processing. * * Processing paths acquire the rwsem for reading while they're making * multiple dependent changes. When they're done and want it persistent * they add themselves to the list of waiters and queue the commit work. * This work runs, acquires the lock to exclude other writers, and * performs the commit. Readers can run concurrently with these * commits. */ static void scoutfs_server_commit_func(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, commit_work); struct super_block *sb = server->sb; struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct commit_waiter *cw; struct commit_waiter *pos; struct llist_node *node; int ret; trace_scoutfs_server_commit_work_enter(sb, 0, 0); down_write(&server->commit_rwsem); ret = scoutfs_block_writer_write(sb, &server->wri); if (ret) { scoutfs_err(sb, "server error writing btree blocks: %d", ret); goto out; } super->core_meta_avail = server->alloc.avail; super->core_meta_freed = server->alloc.freed; ret = scoutfs_write_super(sb, super); if (ret) { scoutfs_err(sb, "server error writing super block: %d", ret); goto out; } ret = 0; out: node = llist_del_all(&server->commit_waiters); /* waiters always wait on completion, cw could be free after complete */ llist_for_each_entry_safe(cw, pos, node, node) { cw->ret = ret; complete(&cw->comp); } up_write(&server->commit_rwsem); trace_scoutfs_server_commit_work_exit(sb, 0, ret); } static int server_alloc_inodes(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_inode_alloc ial = { 0, }; struct commit_waiter cw; __le64 lecount; u64 ino; u64 nr; int ret; if (arg_len != sizeof(lecount)) { ret = -EINVAL; goto out; } memcpy(&lecount, arg, arg_len); down_read(&server->commit_rwsem); spin_lock(&sbi->next_ino_lock); ino = le64_to_cpu(super->next_ino); nr = min(le64_to_cpu(lecount), U64_MAX - ino); le64_add_cpu(&super->next_ino, nr); spin_unlock(&sbi->next_ino_lock); queue_commit_work(server, &cw); up_read(&server->commit_rwsem); ial.ino = cpu_to_le64(ino); ial.nr = cpu_to_le64(nr); ret = wait_for_commit(&cw); out: return scoutfs_net_response(sb, conn, cmd, id, ret, &ial, sizeof(ial)); } /* * Give the client references to stable persistent trees that they'll * use to write their next transaction. */ static int server_get_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; u64 rid = scoutfs_net_client_rid(conn); DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees_key ltk; struct scoutfs_log_trees_val ltv; struct scoutfs_log_trees lt; struct commit_waiter cw; u64 count; u64 target; int ret; if (arg_len != 0) { ret = -EINVAL; goto out; } down_read(&server->commit_rwsem); mutex_lock(&server->logs_mutex); memset(<k, 0, sizeof(ltk)); ltk.rid = cpu_to_be64(rid); ltk.nr = cpu_to_be64(U64_MAX); ret = scoutfs_btree_prev(sb, &super->logs_root, <k, sizeof(ltk), &iref); if (ret < 0 && ret != -ENOENT) goto unlock; if (ret == 0) { if (iref.key_len == sizeof(struct scoutfs_log_trees_key) && iref.val_len == sizeof(struct scoutfs_log_trees_val)) { memcpy(<k, iref.key, iref.key_len); memcpy(<v, iref.val, iref.val_len); if (be64_to_cpu(ltk.rid) != rid) ret = -ENOENT; } else { ret = -EIO; } scoutfs_btree_put_iref(&iref); if (ret == -EIO) goto unlock; } /* initialize new roots if we don't have any */ if (ret == -ENOENT) { ltk.rid = cpu_to_be64(rid); ltk.nr = cpu_to_be64(1); memset(<v, 0, sizeof(ltv)); scoutfs_radix_root_init(sb, <v.meta_avail, true); scoutfs_radix_root_init(sb, <v.meta_freed, true); scoutfs_radix_root_init(sb, <v.data_avail, false); scoutfs_radix_root_init(sb, <v.data_freed, false); } /* ensure client has enough free metadata blocks for a transaction */ target = (64*1024*1024) / SCOUTFS_BLOCK_SIZE; if (le64_to_cpu(ltv.meta_avail.ref.sm_total) < target) { count = target - le64_to_cpu(ltv.meta_avail.ref.sm_total); ret = scoutfs_radix_merge(sb, &server->alloc, &server->wri, <v.meta_avail, &server->alloc.avail, &server->alloc.avail, count); if (ret < 0) goto unlock; } /* ensure client has enough free data blocks for a transaction */ target = (2ULL*1024*1024*1024) / SCOUTFS_BLOCK_SIZE; if (le64_to_cpu(ltv.data_avail.ref.sm_total) < target) { count = target - le64_to_cpu(ltv.data_avail.ref.sm_total); ret = scoutfs_radix_merge(sb, &server->alloc, &server->wri, <v.data_avail, &super->core_data_avail, &super->core_data_avail, count); if (ret < 0) goto unlock; } /* update client's log tree's item */ ret = scoutfs_btree_force(sb, &server->alloc, &server->wri, &super->logs_root, <k, sizeof(ltk), <v, sizeof(ltv)); unlock: mutex_unlock(&server->logs_mutex); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) ret = wait_for_commit(&cw); if (ret == 0) { lt.meta_avail = ltv.meta_avail; lt.meta_freed = ltv.meta_freed; lt.item_root = ltv.item_root; lt.bloom_ref = ltv.bloom_ref; lt.data_avail = ltv.data_avail; lt.data_freed = ltv.data_freed; lt.rid = be64_to_le64(ltk.rid); lt.nr = be64_to_le64(ltk.nr); } out: WARN_ON_ONCE(ret < 0); return scoutfs_net_response(sb, conn, cmd, id, ret, <, sizeof(lt)); } /* * The client is sending the roots of all the btree blocks that they * wrote to their free space for their transaction. Make it persistent * by referencing the roots from their log item in the logs root and * committing. */ static int server_commit_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees_key ltk; struct scoutfs_log_trees_val ltv; struct scoutfs_log_trees *lt; struct commit_waiter cw; int ret; if (arg_len != sizeof(struct scoutfs_log_trees)) { ret = -EINVAL; goto out; } lt = arg; down_read(&server->commit_rwsem); mutex_lock(&server->logs_mutex); /* find the client's existing item */ memset(<k, 0, sizeof(ltk)); ltk.rid = le64_to_be64(lt->rid); ltk.nr = le64_to_be64(lt->nr); ret = scoutfs_btree_lookup(sb, &super->logs_root, <k, sizeof(ltk), &iref); if (ret < 0 && ret != -ENOENT) goto unlock; if (ret == 0) { if (iref.val_len == sizeof(struct scoutfs_log_trees_val)) { memcpy(<v, iref.val, iref.val_len); } else { ret = -EIO; } scoutfs_btree_put_iref(&iref); if (ret < 0) goto unlock; } /* XXX probably want to merge free blocks */ ltv.meta_avail = lt->meta_avail; ltv.meta_freed = lt->meta_freed; ltv.item_root = lt->item_root; ltv.bloom_ref = lt->bloom_ref; ltv.data_avail = lt->data_avail; ltv.data_freed = lt->data_freed; ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, &super->logs_root, <k, sizeof(ltk), <v, sizeof(ltv)); unlock: mutex_unlock(&server->logs_mutex); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) ret = wait_for_commit(&cw); out: WARN_ON_ONCE(ret < 0); return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); } /* * A client is being evicted so we want to reclaim resources from their * log tree items. The item trees and bloom refs stay around to be read * and eventually merged and we reclaim all the allocator items. * * The caller holds the commit rwsem which means we do all this work * in one server commit. We'll need to keep the total amount of blocks * in trees in check. * * By the time we're evicting a client they've either synced their data * or have been forcefully removed. The free blocks in the allocator * roots are stable and can be merged back into allocator items for use * without risking overwriting stable data. */ static int reclaim_log_trees(struct super_block *sb, u64 rid) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees_key ltk; struct scoutfs_log_trees_val ltv; int ret; mutex_lock(&server->logs_mutex); down_write(&server->alloc_rwsem); /* find the client's existing item */ ltk.rid = cpu_to_be64(rid); ltk.nr = 0; ret = scoutfs_btree_next(sb, &super->logs_root, <k, sizeof(ltk), &iref); if (ret == 0) { if (iref.key_len == sizeof(struct scoutfs_log_trees_key) && iref.val_len == sizeof(struct scoutfs_log_trees_val)) { memcpy(<k, iref.key, iref.key_len); memcpy(<v, iref.val, iref.val_len); if (be64_to_cpu(ltk.rid) != rid) ret = -ENOENT; } else { ret = -EIO; } scoutfs_btree_put_iref(&iref); } if (ret < 0) { if (ret == -ENOENT) ret = 0; goto out; } ret = scoutfs_radix_merge(sb, &server->alloc, &server->wri, &super->core_data_avail, <v.data_avail, <v.data_avail, le64_to_cpu(ltv.data_avail.ref.sm_total)); if (ret < 0) goto out; ret = scoutfs_radix_merge(sb, &server->alloc, &server->wri, &super->core_data_avail, <v.data_freed, <v.data_freed, le64_to_cpu(ltv.data_freed.ref.sm_total)); out: up_write(&server->alloc_rwsem); mutex_unlock(&server->logs_mutex); return ret; } /* * Give the client the next sequence number for their transaction. They * provide their previous transaction sequence number that they've * committed. * * We track the sequence numbers of transactions that clients have open. * This limits the transaction sequence numbers that can be returned in * the index of inodes by meta and data transaction numbers. We * communicate the largest possible sequence number to clients via an * rpc. * * The transaction sequence tracking is stored in a btree so it is * shared across servers. Final entries are removed when processing a * client's farewell or when it's removed. */ static int server_advance_seq(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct commit_waiter cw; __le64 their_seq; __le64 next_seq; struct scoutfs_trans_seq_btree_key tsk; u64 rid = scoutfs_net_client_rid(conn); int ret; if (arg_len != sizeof(__le64)) { ret = -EINVAL; goto out; } memcpy(&their_seq, arg, sizeof(their_seq)); down_read(&server->commit_rwsem); down_write(&server->seq_rwsem); if (their_seq != 0) { tsk.trans_seq = le64_to_be64(their_seq); tsk.rid = cpu_to_be64(rid); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->trans_seqs, &tsk, sizeof(tsk)); if (ret < 0 && ret != -ENOENT) goto out; } next_seq = super->next_trans_seq; le64_add_cpu(&super->next_trans_seq, 1); trace_scoutfs_trans_seq_advance(sb, rid, le64_to_cpu(their_seq), le64_to_cpu(next_seq)); tsk.trans_seq = le64_to_be64(next_seq); tsk.rid = cpu_to_be64(rid); ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, &super->trans_seqs, &tsk, sizeof(tsk), NULL, 0); out: up_write(&server->seq_rwsem); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) ret = wait_for_commit(&cw); return scoutfs_net_response(sb, conn, cmd, id, ret, &next_seq, sizeof(next_seq)); } /* * Remove any transaction sequences owned by the client. They must have * committed any final transaction by the time they get here via sending * their farewell message. This can be called multiple times as the * client's farewell is retransmitted so it's OK to not find any * entries. This is called with the server commit rwsem held. */ static int remove_trans_seq(struct super_block *sb, u64 rid) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_trans_seq_btree_key tsk; SCOUTFS_BTREE_ITEM_REF(iref); int ret = 0; down_write(&server->seq_rwsem); tsk.trans_seq = 0; tsk.rid = 0; for (;;) { ret = scoutfs_btree_next(sb, &super->trans_seqs, &tsk, sizeof(tsk), &iref); if (ret < 0) { if (ret == -ENOENT) ret = 0; break; } memcpy(&tsk, iref.key, iref.key_len); scoutfs_btree_put_iref(&iref); if (be64_to_cpu(tsk.rid) == rid) { trace_scoutfs_trans_seq_farewell(sb, rid, be64_to_cpu(tsk.trans_seq)); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->trans_seqs, &tsk, sizeof(tsk)); break; } be64_add_cpu(&tsk.trans_seq, 1); tsk.rid = 0; } up_write(&server->seq_rwsem); return ret; } /* * Give the calling client the last valid trans_seq that it can return * in results from the indices of trans seqs to inodes. These indices * promise to only advance so we can't return results past those that * are still outstanding and not yet visible in the indices. If there * are no outstanding transactions (what? how?) we give them the max * possible sequence. */ static int server_get_last_seq(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_trans_seq_btree_key tsk; SCOUTFS_BTREE_ITEM_REF(iref); u64 rid = scoutfs_net_client_rid(conn); __le64 last_seq = 0; int ret; if (arg_len != 0) { ret = -EINVAL; goto out; } down_read(&server->seq_rwsem); tsk.trans_seq = 0; tsk.rid = 0; ret = scoutfs_btree_next(sb, &super->trans_seqs, &tsk, sizeof(tsk), &iref); if (ret == 0) { if (iref.key_len != sizeof(tsk)) { ret = -EINVAL; } else { memcpy(&tsk, iref.key, iref.key_len); last_seq = cpu_to_le64(be64_to_cpu(tsk.trans_seq) - 1); } scoutfs_btree_put_iref(&iref); } else if (ret == -ENOENT) { last_seq = super->next_trans_seq; le64_add_cpu(&last_seq, -1ULL); ret = 0; } trace_scoutfs_trans_seq_last(sb, rid, le64_to_cpu(last_seq)); up_read(&server->seq_rwsem); out: return scoutfs_net_response(sb, conn, cmd, id, ret, &last_seq, sizeof(last_seq)); } /* * Sample the super stats that the client wants for statfs by serializing * with each component. */ static int server_statfs(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_statfs nstatfs; int ret; if (arg_len == 0) { /* uuid and total_segs are constant, so far */ memcpy(nstatfs.uuid, super->uuid, sizeof(nstatfs.uuid)); spin_lock(&sbi->next_ino_lock); nstatfs.next_ino = super->next_ino; spin_unlock(&sbi->next_ino_lock); down_read(&server->alloc_rwsem); nstatfs.total_blocks = super->total_meta_blocks; le64_add_cpu(&nstatfs.total_blocks, le64_to_cpu(super->total_data_blocks)); nstatfs.bfree = super->free_blocks; up_read(&server->alloc_rwsem); ret = 0; } else { ret = -EINVAL; } return scoutfs_net_response(sb, conn, cmd, id, ret, &nstatfs, sizeof(nstatfs)); } static int server_lock(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { u64 rid = scoutfs_net_client_rid(conn); if (arg_len != sizeof(struct scoutfs_net_lock)) return -EINVAL; return scoutfs_lock_server_request(sb, rid, id, arg); } static int lock_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) { u64 rid = scoutfs_net_client_rid(conn); if (resp_len != sizeof(struct scoutfs_net_lock)) return -EINVAL; return scoutfs_lock_server_response(sb, rid, resp); } int scoutfs_server_lock_request(struct super_block *sb, u64 rid, struct scoutfs_net_lock *nl) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_submit_request_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK, nl, sizeof(*nl), lock_response, NULL, NULL); } int scoutfs_server_lock_response(struct super_block *sb, u64 rid, u64 id, struct scoutfs_net_lock *nl) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_response_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK, id, 0, nl, sizeof(*nl)); } static bool invalid_recover(struct scoutfs_net_lock_recover *nlr, unsigned long bytes) { return ((bytes < sizeof(*nlr)) || (bytes != offsetof(struct scoutfs_net_lock_recover, locks[le16_to_cpu(nlr->nr)]))); } static int lock_recover_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) { u64 rid = scoutfs_net_client_rid(conn); if (invalid_recover(resp, resp_len)) return -EINVAL; return scoutfs_lock_server_recover_response(sb, rid, resp); } int scoutfs_server_lock_recover_request(struct super_block *sb, u64 rid, struct scoutfs_key *key) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_submit_request_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK_RECOVER, key, sizeof(*key), lock_recover_response, NULL, NULL); } static int insert_mounted_client(struct super_block *sb, u64 rid, u64 gr_flags) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_mounted_client_btree_key mck; struct scoutfs_mounted_client_btree_val mcv; mck.rid = cpu_to_be64(rid); mcv.flags = 0; if (gr_flags & SCOUTFS_NET_GREETING_FLAG_VOTER) mcv.flags |= SCOUTFS_MOUNTED_CLIENT_VOTER; return scoutfs_btree_insert(sb, &server->alloc, &server->wri, &super->mounted_clients, &mck, sizeof(mck), &mcv, sizeof(mcv)); } /* * Remove the record of a mounted client. The record can already be * removed if we're processing a farewell on behalf of a client that * already had a previous server process its farewell. * * When we remove the last mounted client that's voting we write a new * quorum block with the updated unmount_barrier. * * The caller has to serialize with farewell processing. */ static int delete_mounted_client(struct super_block *sb, u64 rid) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_mounted_client_btree_key mck; int ret; mck.rid = cpu_to_be64(rid); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->mounted_clients, &mck, sizeof(mck)); if (ret == -ENOENT) ret = 0; return ret; } /* * Process an incoming greeting request in the server from the client. * We try to send responses to failed greetings so that the sender can * log some detail before shutting down. A failure to send a greeting * response shuts down the connection. * * If a client reconnects they'll send their previously received * serer_term in their greeting request. * * XXX The logic of this has gotten convoluted. The lock server can * send a recovery request so it needs to be called after the core net * greeting call enables messages. But we want the greeting reply to be * sent first, so we currently queue it on the send queue before * enabling messages. That means that a lot of errors that happen after * the reply can't be sent to the client. They'll just see a disconnect * and won't know what's happened. This all needs to be refactored. */ static int server_greeting(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_net_greeting *gr = arg; struct scoutfs_net_greeting greet; DECLARE_SERVER_INFO(sb, server); struct commit_waiter cw; __le64 umb = 0; bool reconnecting; bool first_contact; bool farewell; int ret = 0; int err; if (arg_len != sizeof(struct scoutfs_net_greeting)) { ret = -EINVAL; goto send_err; } if (gr->fsid != super->hdr.fsid) { scoutfs_warn(sb, "client sent fsid 0x%llx, server has 0x%llx", le64_to_cpu(gr->fsid), le64_to_cpu(super->hdr.fsid)); ret = -EINVAL; goto send_err; } if (gr->format_hash != super->format_hash) { scoutfs_warn(sb, "client sent format 0x%llx, server has 0x%llx", le64_to_cpu(gr->format_hash), le64_to_cpu(super->format_hash)); ret = -EINVAL; goto send_err; } if (gr->server_term == 0) { down_read(&server->commit_rwsem); spin_lock(&server->lock); umb = super->unmount_barrier; spin_unlock(&server->lock); mutex_lock(&server->farewell_mutex); ret = insert_mounted_client(sb, le64_to_cpu(gr->rid), le64_to_cpu(gr->flags)); mutex_unlock(&server->farewell_mutex); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) { ret = wait_for_commit(&cw); queue_work(server->wq, &server->farewell_work); } } else { umb = gr->unmount_barrier; } send_err: err = ret; greet.fsid = super->hdr.fsid; greet.format_hash = super->format_hash; greet.server_term = cpu_to_le64(server->term); greet.unmount_barrier = umb; greet.rid = gr->rid; greet.flags = 0; /* queue greeting response to be sent first once messaging enabled */ ret = scoutfs_net_response(sb, conn, cmd, id, err, &greet, sizeof(greet)); if (ret == 0 && err) ret = err; if (ret) goto out; /* have the net core enable messaging and resend */ reconnecting = gr->server_term != 0; first_contact = le64_to_cpu(gr->server_term) != server->term; if (gr->flags & cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL)) farewell = true; else farewell = false; scoutfs_net_server_greeting(sb, conn, le64_to_cpu(gr->rid), id, reconnecting, first_contact, farewell); /* lock server might send recovery request */ if (le64_to_cpu(gr->server_term) != server->term) { /* we're now doing two commits per greeting, not great */ down_read(&server->commit_rwsem); ret = scoutfs_lock_server_greeting(sb, le64_to_cpu(gr->rid), gr->server_term != 0); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) ret = wait_for_commit(&cw); if (ret) goto out; } out: return ret; } struct farewell_request { struct list_head entry; u64 net_id; u64 rid; }; static bool invalid_mounted_client_item(struct scoutfs_btree_item_ref *iref) { return (iref->key_len != sizeof(struct scoutfs_mounted_client_btree_key)) || (iref->val_len != sizeof(struct scoutfs_mounted_client_btree_val)); } /* * This work processes farewell requests asynchronously. Requests from * voting clients can be held until only the final quorum remains and * they've all sent farewell requests. * * When we remove the last mounted client record for the last voting * client then we increase the unmount_barrier and write it to the super * block. If voting clients don't get their farewell response they'll * see the greater umount_barrier in the super and will know that their * farewell has been processed and that they can exit. * * Responses that are waiting for clients who aren't voting are * immediately sent. Clients that don't have a mounted client record * have already had their farewell processed by another server and can * proceed. * * Farewell responses are unique in that sending them causes the server * to shutdown the connection to the client next time the socket * disconnects. If the socket is destroyed before the client gets the * response they'll reconnect and we'll see them as a brand new client * who immediately sends a farewell. It'll be processed and it all * works out. * * If this worker sees an error it assumes that this sever is done for * and that another had better take its place. */ static void farewell_worker(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, farewell_work); struct super_block *sb = server->sb; struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_mounted_client_btree_key mck; struct scoutfs_mounted_client_btree_val *mcv; struct farewell_request *tmp; struct farewell_request *fw; SCOUTFS_BTREE_ITEM_REF(iref); struct commit_waiter cw; unsigned int nr_unmounting = 0; unsigned int nr_mounted = 0; LIST_HEAD(reqs); LIST_HEAD(send); bool deleted = false; bool voting; bool more_reqs; int ret; /* grab all the requests that are waiting */ mutex_lock(&server->farewell_mutex); list_splice_init(&server->farewell_requests, &reqs); mutex_unlock(&server->farewell_mutex); /* count how many reqs requests are from voting clients */ nr_unmounting = 0; list_for_each_entry_safe(fw, tmp, &reqs, entry) { mck.rid = cpu_to_be64(fw->rid); ret = scoutfs_btree_lookup(sb, &super->mounted_clients, &mck, sizeof(mck), &iref); if (ret == 0 && invalid_mounted_client_item(&iref)) { scoutfs_btree_put_iref(&iref); ret = -EIO; } if (ret < 0) { if (ret == -ENOENT) { list_move_tail(&fw->entry, &send); continue; } goto out; } mcv = iref.val; voting = (mcv->flags & SCOUTFS_MOUNTED_CLIENT_VOTER) != 0; scoutfs_btree_put_iref(&iref); if (!voting) { list_move_tail(&fw->entry, &send); continue; } nr_unmounting++; } /* see how many mounted clients could vote for quorum */ memset(&mck, 0, sizeof(mck)); for (;;) { ret = scoutfs_btree_next(sb, &super->mounted_clients, &mck, sizeof(mck), &iref); if (ret == 0 && invalid_mounted_client_item(&iref)) { scoutfs_btree_put_iref(&iref); ret = -EIO; } if (ret != 0) { if (ret == -ENOENT) break; goto out; } memcpy(&mck, iref.key, sizeof(mck)); mcv = iref.val; if (mcv->flags & SCOUTFS_MOUNTED_CLIENT_VOTER) nr_mounted++; scoutfs_btree_put_iref(&iref); be64_add_cpu(&mck.rid, 1); } /* send as many responses as we can to maintain quorum */ while ((fw = list_first_entry_or_null(&reqs, struct farewell_request, entry)) && (nr_mounted > super->quorum_count || nr_unmounting >= nr_mounted)) { list_move_tail(&fw->entry, &send); nr_mounted--; nr_unmounting--; deleted = true; } /* process and send farewell responses */ list_for_each_entry_safe(fw, tmp, &send, entry) { down_read(&server->commit_rwsem); ret = scoutfs_lock_server_farewell(sb, fw->rid) ?: remove_trans_seq(sb, fw->rid) ?: reclaim_log_trees(sb, fw->rid) ?: delete_mounted_client(sb, fw->rid); if (ret == 0) queue_commit_work(server, &cw); up_read(&server->commit_rwsem); if (ret == 0) ret = wait_for_commit(&cw); if (ret) goto out; } /* update the unmount barrier if we deleted all voting clients */ if (deleted && nr_mounted == 0) { down_read(&server->commit_rwsem); le64_add_cpu(&super->unmount_barrier, 1); queue_commit_work(server, &cw); up_read(&server->commit_rwsem); ret = wait_for_commit(&cw); if (ret) goto out; } /* and finally send all the responses */ list_for_each_entry_safe(fw, tmp, &send, entry) { ret = scoutfs_net_response_node(sb, server->conn, fw->rid, SCOUTFS_NET_CMD_FAREWELL, fw->net_id, 0, NULL, 0); if (ret) break; list_del_init(&fw->entry); kfree(fw); } ret = 0; out: mutex_lock(&server->farewell_mutex); more_reqs = !list_empty(&server->farewell_requests); list_splice_init(&reqs, &server->farewell_requests); list_splice_init(&send, &server->farewell_requests); mutex_unlock(&server->farewell_mutex); if (ret < 0) stop_server(server); else if (more_reqs && !server->shutting_down) queue_work(server->wq, &server->farewell_work); } static void free_farewell_requests(struct super_block *sb, u64 rid) { struct server_info *server = SCOUTFS_SB(sb)->server_info; struct farewell_request *tmp; struct farewell_request *fw; mutex_lock(&server->farewell_mutex); list_for_each_entry_safe(fw, tmp, &server->farewell_requests, entry) { if (rid == 0 || fw->rid == rid) { list_del_init(&fw->entry); kfree(fw); } } mutex_unlock(&server->farewell_mutex); } /* * The server is receiving a farewell message from a client that is * unmounting. It won't send any more requests and once it receives our * response it will not reconnect. * * XXX we should make sure that all our requests to the client have finished * before we respond. Locking will have its own messaging for orderly * shutdown. That leaves compaction which will be addressed as part of * the larger work of recovering compactions that were in flight when * a client crashed. */ static int server_farewell(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct server_info *server = SCOUTFS_SB(sb)->server_info; u64 rid = scoutfs_net_client_rid(conn); struct farewell_request *fw; if (arg_len != 0) return -EINVAL; /* XXX tear down if we fence, or if we shut down */ fw = kmalloc(sizeof(struct farewell_request), GFP_NOFS); if (fw == NULL) return -ENOMEM; fw->rid = rid; fw->net_id = id; mutex_lock(&server->farewell_mutex); list_add_tail(&fw->entry, &server->farewell_requests); mutex_unlock(&server->farewell_mutex); queue_work(server->wq, &server->farewell_work); /* response will be sent later */ return 0; } static scoutfs_net_request_t server_req_funcs[] = { [SCOUTFS_NET_CMD_GREETING] = server_greeting, [SCOUTFS_NET_CMD_ALLOC_INODES] = server_alloc_inodes, [SCOUTFS_NET_CMD_GET_LOG_TREES] = server_get_log_trees, [SCOUTFS_NET_CMD_COMMIT_LOG_TREES] = server_commit_log_trees, [SCOUTFS_NET_CMD_ADVANCE_SEQ] = server_advance_seq, [SCOUTFS_NET_CMD_GET_LAST_SEQ] = server_get_last_seq, [SCOUTFS_NET_CMD_STATFS] = server_statfs, [SCOUTFS_NET_CMD_LOCK] = server_lock, [SCOUTFS_NET_CMD_FAREWELL] = server_farewell, }; static void server_notify_up(struct super_block *sb, struct scoutfs_net_connection *conn, void *info, u64 rid) { struct server_client_info *sci = info; DECLARE_SERVER_INFO(sb, server); if (rid != 0) { sci->rid = rid; spin_lock(&server->lock); list_add_tail(&sci->head, &server->clients); server->nr_clients++; trace_scoutfs_server_client_up(sb, rid, server->nr_clients); spin_unlock(&server->lock); } } static void server_notify_down(struct super_block *sb, struct scoutfs_net_connection *conn, void *info, u64 rid) { struct server_client_info *sci = info; DECLARE_SERVER_INFO(sb, server); if (rid != 0) { spin_lock(&server->lock); list_del_init(&sci->head); server->nr_clients--; trace_scoutfs_server_client_down(sb, rid, server->nr_clients); spin_unlock(&server->lock); free_farewell_requests(sb, rid); } else { stop_server(server); } } static void scoutfs_server_worker(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, work); struct super_block *sb = server->sb; struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_connection *conn = NULL; DECLARE_WAIT_QUEUE_HEAD(waitq); struct sockaddr_in sin; LIST_HEAD(conn_list); int ret; int err; trace_scoutfs_server_work_enter(sb, 0, 0); sin = server->listen_sin; scoutfs_info(sb, "server setting up at "SIN_FMT, SIN_ARG(&sin)); conn = scoutfs_net_alloc_conn(sb, server_notify_up, server_notify_down, sizeof(struct server_client_info), server_req_funcs, "server"); if (!conn) { ret = -ENOMEM; goto out; } ret = scoutfs_net_bind(sb, conn, &sin); if (ret) { scoutfs_err(sb, "server failed to bind to "SIN_FMT", err %d%s", SIN_ARG(&sin), ret, ret == -EADDRNOTAVAIL ? " (Bad address?)" : ""); goto out; } if (ret) goto out; /* start up the server subsystems before accepting */ ret = scoutfs_read_super(sb, super); if (ret < 0) goto shutdown; scoutfs_radix_init_alloc(&server->alloc, &super->core_meta_avail, &super->core_meta_freed); scoutfs_block_writer_init(sb, &server->wri); ret = scoutfs_lock_server_setup(sb, &server->alloc, &server->wri); if (ret) goto shutdown; /* * Write our address in the super before it's possible for net * processing to start writing the super as part of * transactions. In theory clients could be trying to connect * to our address without having seen it in the super (maybe * they saw it a long time ago). */ scoutfs_addr_from_sin(&super->server_addr, &sin); super->quorum_server_term = cpu_to_le64(server->term); ret = scoutfs_write_super(sb, super); if (ret < 0) goto shutdown; /* start accepting connections and processing work */ server->conn = conn; scoutfs_net_listen(sb, conn); scoutfs_info(sb, "server ready at "SIN_FMT, SIN_ARG(&sin)); complete(&server->start_comp); /* wait_event/wake_up provide barriers */ wait_event_interruptible(server->waitq, server->shutting_down); shutdown: scoutfs_info(sb, "server shutting down at "SIN_FMT, SIN_ARG(&sin)); /* wait for request processing */ scoutfs_net_shutdown(sb, conn); /* wait for commit queued by request processing */ flush_work(&server->commit_work); server->conn = NULL; scoutfs_lock_server_destroy(sb); out: scoutfs_quorum_clear_leader(sb); scoutfs_net_free_conn(sb, conn); scoutfs_info(sb, "server stopped at "SIN_FMT, SIN_ARG(&sin)); trace_scoutfs_server_work_exit(sb, 0, ret); /* * Always try to clear our presence in the super so that we're * not fenced. We do this last because other mounts will try to * reach quorum the moment they see zero here. The later we do * this the longer we have to finish shutdown while clients * timeout. */ err = scoutfs_read_super(sb, super); if (err == 0) { super->quorum_fenced_term = cpu_to_le64(server->term); memset(&super->server_addr, 0, sizeof(super->server_addr)); err = scoutfs_write_super(sb, super); } if (err < 0) { scoutfs_err(sb, "failed to clear election term %llu at "SIN_FMT", this mount could be fenced", server->term, SIN_ARG(&sin)); } server->err = ret; complete(&server->start_comp); } /* * Wait for the server to successfully start. If this returns error then * the super block's fence_term has been set to the new server's term so * that it won't be fenced. */ int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin, u64 term) { DECLARE_SERVER_INFO(sb, server); server->err = 0; server->shutting_down = false; server->listen_sin = *sin; server->term = term; init_completion(&server->start_comp); queue_work(server->wq, &server->work); wait_for_completion(&server->start_comp); return server->err; } /* * Start shutdown on the server but don't want for it to finish. */ void scoutfs_server_abort(struct super_block *sb) { DECLARE_SERVER_INFO(sb, server); stop_server(server); } /* * Once the server is stopped we give the caller our election info * which might have been modified while we were running. */ void scoutfs_server_stop(struct super_block *sb) { DECLARE_SERVER_INFO(sb, server); stop_server(server); /* XXX not sure both are needed */ cancel_work_sync(&server->work); cancel_work_sync(&server->commit_work); } int scoutfs_server_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct server_info *server; server = kzalloc(sizeof(struct server_info), GFP_KERNEL); if (!server) return -ENOMEM; server->sb = sb; spin_lock_init(&server->lock); init_waitqueue_head(&server->waitq); INIT_WORK(&server->work, scoutfs_server_worker); init_rwsem(&server->commit_rwsem); init_llist_head(&server->commit_waiters); INIT_WORK(&server->commit_work, scoutfs_server_commit_func); init_rwsem(&server->seq_rwsem); init_rwsem(&server->alloc_rwsem); INIT_LIST_HEAD(&server->clients); mutex_init(&server->farewell_mutex); INIT_LIST_HEAD(&server->farewell_requests); INIT_WORK(&server->farewell_work, farewell_worker); mutex_init(&server->logs_mutex); server->wq = alloc_workqueue("scoutfs_server", WQ_UNBOUND | WQ_NON_REENTRANT, 0); if (!server->wq) { kfree(server); return -ENOMEM; } sbi->server_info = server; return 0; } /* * The caller should have already stopped but we do the same just in * case. */ void scoutfs_server_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct server_info *server = sbi->server_info; if (server) { stop_server(server); /* wait for server work to wait for everything to shut down */ cancel_work_sync(&server->work); /* recv work/compaction could have left commit_work queued */ cancel_work_sync(&server->commit_work); /* pending farewell requests are another server's problem */ cancel_work_sync(&server->farewell_work); free_farewell_requests(sb, 0); trace_scoutfs_server_workqueue_destroy(sb, 0, 0); destroy_workqueue(server->wq); kfree(server); sbi->server_info = NULL; } }