/* * Copyright (C) 2018 Versity Software, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License v2 as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. */ #include #include #include #include #include #include #include #include #include #include #include #include "format.h" #include "counters.h" #include "inode.h" #include "block.h" #include "btree.h" #include "scoutfs_trace.h" #include "msg.h" #include "server.h" #include "net.h" #include "lock_server.h" #include "endian_swap.h" #include "quorum.h" #include "trans.h" #include "srch.h" #include "alloc.h" #include "forest.h" /* * Every active mount can act as the server that listens on a net * connection and accepts connections from all the other mounts acting * as clients. * * The server is started by the mount that is elected leader by quorum. * If it sees errors it shuts down the server in the hopes that another * mount will become the leader and have less trouble. */ struct server_info { struct super_block *sb; spinlock_t lock; wait_queue_head_t waitq; struct workqueue_struct *wq; struct work_struct work; int err; bool shutting_down; struct completion start_comp; struct sockaddr_in listen_sin; u64 term; struct scoutfs_net_connection *conn; /* request processing coordinates shared commits */ struct rw_semaphore commit_rwsem; struct llist_head commit_waiters; struct work_struct commit_work; /* server tracks seq use */ struct rw_semaphore seq_rwsem; struct list_head clients; unsigned long nr_clients; /* track clients waiting in unmmount for farewell response */ struct mutex farewell_mutex; struct list_head farewell_requests; struct work_struct farewell_work; struct mutex alloc_mutex; /* swap between two fs meta roots to increase time to reuse */ struct scoutfs_alloc_root *meta_avail; struct scoutfs_alloc_root *meta_freed; /* server's meta allocators alternate between persistent heads */ struct scoutfs_alloc alloc; int other_ind; struct scoutfs_alloc_list_head *other_avail; struct scoutfs_alloc_list_head *other_freed; struct scoutfs_block_writer wri; struct mutex logs_mutex; struct mutex srch_mutex; /* stable versions stored from commits, given in locks and rpcs */ seqcount_t roots_seqcount; struct scoutfs_net_roots roots; }; #define DECLARE_SERVER_INFO(sb, name) \ struct server_info *name = SCOUTFS_SB(sb)->server_info /* * The server tracks each connected client. */ struct server_client_info { u64 rid; struct list_head head; }; struct commit_waiter { struct completion comp; struct llist_node node; int ret; }; static void stop_server(struct server_info *server) { /* wait_event/wake_up provide barriers */ server->shutting_down = true; wake_up(&server->waitq); } /* * Hold the shared rwsem that lets multiple holders modify blocks in the * current commit and prevents the commit worker from acquiring the * exclusive write lock to write the commit. * * This is exported for server components isolated in their own files * (lock_server) and which are not called directly by the server core * (async timeout work). */ int scoutfs_server_hold_commit(struct super_block *sb) { DECLARE_SERVER_INFO(sb, server); scoutfs_inc_counter(sb, server_commit_hold); down_read(&server->commit_rwsem); return 0; } /* * This is called while holding the commit and returns once the commit * is successfully written. Many holders can all wait for all holders * to drain before their shared commit is applied and they're all woken. * * It's important to realize that our commit_waiter list node might be * serviced by a currently executing commit work that is blocked waiting * for the holders to release the commit_rwsem. This caller can return * from wait_for_commit() while another future commit_work is still * queued. * * This could queue delayed work but we're first trying to have batching * work by having concurrent modification line up behind a commit in * flight. Once the commit finishes it'll unlock and hopefully everyone * will race to make their changes and they'll all be applied by the * next commit after that. */ int scoutfs_server_apply_commit(struct super_block *sb, int err) { DECLARE_SERVER_INFO(sb, server); struct commit_waiter cw; if (err == 0) { cw.ret = 0; init_completion(&cw.comp); llist_add(&cw.node, &server->commit_waiters); scoutfs_inc_counter(sb, server_commit_queue); queue_work(server->wq, &server->commit_work); } up_read(&server->commit_rwsem); if (err == 0) { wait_for_completion(&cw.comp); err = cw.ret; } return err; } void scoutfs_server_get_roots(struct super_block *sb, struct scoutfs_net_roots *roots) { DECLARE_SERVER_INFO(sb, server); unsigned int seq; do { seq = read_seqcount_begin(&server->roots_seqcount); *roots = server->roots; } while (read_seqcount_retry(&server->roots_seqcount, seq)); } static void set_roots(struct server_info *server, struct scoutfs_btree_root *fs_root, struct scoutfs_btree_root *logs_root, struct scoutfs_btree_root *srch_root) { preempt_disable(); write_seqcount_begin(&server->roots_seqcount); server->roots.fs_root = *fs_root; server->roots.logs_root = *logs_root; server->roots.srch_root = *srch_root; write_seqcount_end(&server->roots_seqcount); preempt_enable(); } /* * Concurrent request processing dirties blocks in a commit and makes * the modifications persistent before replying. We'd like to batch * these commits as much as is reasonable so that we don't degrade to a * few IO round trips per request. * * Getting that batching right is bound up in the concurrency of request * processing so a clear way to implement the batched commits is to * implement commits with a single pending work func like the * processing. * * Processing paths acquire the rwsem for reading while they're making * multiple dependent changes. When they're done and want it persistent * they add themselves to the list of waiters and queue the commit work. * This work runs, acquires the lock to exclude other writers, and * performs the commit. Readers can run concurrently with these * commits. */ static void scoutfs_server_commit_func(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, commit_work); struct super_block *sb = server->sb; struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct commit_waiter *cw; struct commit_waiter *pos; struct llist_node *node; int ret; trace_scoutfs_server_commit_work_enter(sb, 0, 0); scoutfs_inc_counter(sb, server_commit_worker); down_write(&server->commit_rwsem); /* make sure next avail has sufficient blocks */ ret = scoutfs_alloc_fill_list(sb, &server->alloc, &server->wri, server->other_avail, server->meta_avail, SCOUTFS_SERVER_META_FILL_LO, SCOUTFS_SERVER_META_FILL_TARGET); if (ret) { scoutfs_err(sb, "server error refilling avail: %d", ret); goto out; } /* merge freed blocks into extents, might be partial */ ret = scoutfs_alloc_empty_list(sb, &server->alloc, &server->wri, server->meta_freed, server->other_freed); if (ret) { scoutfs_err(sb, "server error emptying freed: %d", ret); goto out; } ret = scoutfs_alloc_prepare_commit(sb, &server->alloc, &server->wri); if (ret < 0) { scoutfs_err(sb, "server error prepare alloc commit: %d", ret); goto out; } ret = scoutfs_block_writer_write(sb, &server->wri); if (ret) { scoutfs_err(sb, "server error writing btree blocks: %d", ret); goto out; } super->server_meta_avail[server->other_ind ^ 1] = server->alloc.avail; super->server_meta_freed[server->other_ind ^ 1] = server->alloc.freed; ret = scoutfs_write_super(sb, super); if (ret) { scoutfs_err(sb, "server error writing super block: %d", ret); goto out; } set_roots(server, &super->fs_root, &super->logs_root, &super->srch_root); /* swizzle the active and idle server alloc/freed heads */ server->other_ind ^= 1; server->alloc.avail = super->server_meta_avail[server->other_ind ^ 1]; server->alloc.freed = super->server_meta_freed[server->other_ind ^ 1]; server->other_avail = &super->server_meta_avail[server->other_ind]; server->other_freed = &super->server_meta_freed[server->other_ind]; /* swap avail/free if avail gets low and freed is high */ if (le64_to_cpu(server->meta_avail->total_len) <= SCOUTFS_SERVER_META_ALLOC_MIN && le64_to_cpu(server->meta_freed->total_len) > SCOUTFS_SERVER_META_ALLOC_MIN) swap(server->meta_avail, server->meta_freed); ret = 0; out: node = llist_del_all(&server->commit_waiters); /* waiters always wait on completion, cw could be free after complete */ llist_for_each_entry_safe(cw, pos, node, node) { cw->ret = ret; complete(&cw->comp); } up_write(&server->commit_rwsem); trace_scoutfs_server_commit_work_exit(sb, 0, ret); } static int server_alloc_inodes(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_inode_alloc ial = { 0, }; __le64 lecount; u64 ino; u64 nr; int ret; if (arg_len != sizeof(lecount)) { ret = -EINVAL; goto out; } memcpy(&lecount, arg, arg_len); ret = scoutfs_server_hold_commit(sb); if (ret) goto out; spin_lock(&sbi->next_ino_lock); ino = le64_to_cpu(super->next_ino); nr = min(le64_to_cpu(lecount), U64_MAX - ino); le64_add_cpu(&super->next_ino, nr); spin_unlock(&sbi->next_ino_lock); ret = scoutfs_server_apply_commit(sb, ret); if (ret == 0) { ial.ino = cpu_to_le64(ino); ial.nr = cpu_to_le64(nr); } out: return scoutfs_net_response(sb, conn, cmd, id, ret, &ial, sizeof(ial)); } /* * Refill the destination root if it's fallen below the lo threshold by * moving from the src root to bring it up to the target. */ static int alloc_move_refill(struct super_block *sb, struct scoutfs_alloc_root *dst, struct scoutfs_alloc_root *src, u64 lo, u64 target) { DECLARE_SERVER_INFO(sb, server); if (le64_to_cpu(dst->total_len) >= lo) return 0; return scoutfs_alloc_move(sb, &server->alloc, &server->wri, dst, src, min(target - le64_to_cpu(dst->total_len), le64_to_cpu(src->total_len))); } static int alloc_move_empty(struct super_block *sb, struct scoutfs_alloc_root *dst, struct scoutfs_alloc_root *src) { DECLARE_SERVER_INFO(sb, server); return scoutfs_alloc_move(sb, &server->alloc, &server->wri, dst, src, le64_to_cpu(src->total_len)); } /* * Give the client roots to all the trees that they'll use to build * their transaction. * * We make sure that their alloc trees have sufficient blocks to * allocate metadata and data for the transaction. We merge their freed * trees back into the core allocators. They're were committed with the * previous transaction so they're stable and can now be reused, even by * the server in this commit. */ static int server_get_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; u64 rid = scoutfs_net_client_rid(conn); DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees lt; struct scoutfs_key key; int ret; if (arg_len != 0) { ret = -EINVAL; goto out; } ret = scoutfs_server_hold_commit(sb); if (ret) goto out; mutex_lock(&server->logs_mutex); scoutfs_key_init_log_trees(&key, rid, U64_MAX); ret = scoutfs_btree_prev(sb, &super->logs_root, &key, &iref); if (ret < 0 && ret != -ENOENT) goto unlock; if (ret == 0) { if (iref.val_len == sizeof(struct scoutfs_log_trees)) { key = *iref.key; memcpy(<, iref.val, iref.val_len); if (le64_to_cpu(key.sklt_rid) != rid) ret = -ENOENT; } else { ret = -EIO; } scoutfs_btree_put_iref(&iref); if (ret == -EIO) goto unlock; } /* initialize new roots if we don't have any */ if (ret == -ENOENT) { key.sklt_rid = cpu_to_le64(rid); key.sklt_nr = cpu_to_le64(1); memset(<, 0, sizeof(lt)); lt.rid = key.sklt_rid; lt.nr = key.sklt_nr; } /* return freed to server for emptying, refill avail */ mutex_lock(&server->alloc_mutex); ret = scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, <.meta_freed) ?: alloc_move_empty(sb, &super->data_alloc, <.data_freed) ?: scoutfs_alloc_fill_list(sb, &server->alloc, &server->wri, <.meta_avail, server->meta_avail, SCOUTFS_SERVER_META_FILL_LO, SCOUTFS_SERVER_META_FILL_TARGET) ?: alloc_move_refill(sb, <.data_avail, &super->data_alloc, SCOUTFS_SERVER_DATA_FILL_LO, SCOUTFS_SERVER_DATA_FILL_TARGET); mutex_unlock(&server->alloc_mutex); if (ret < 0) goto unlock; /* update client's log tree's item */ ret = scoutfs_btree_force(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); unlock: mutex_unlock(&server->logs_mutex); ret = scoutfs_server_apply_commit(sb, ret); out: WARN_ON_ONCE(ret < 0); return scoutfs_net_response(sb, conn, cmd, id, ret, <, sizeof(lt)); } /* * The client is sending the roots of all the btree blocks that they * wrote to their free space for their transaction. Make it persistent * by referencing the roots from their log item in the logs root and * committing. */ static int server_commit_log_trees(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees lt; struct scoutfs_key key; int ret; if (arg_len != sizeof(struct scoutfs_log_trees)) { ret = -EINVAL; goto out; } /* don't modify the caller's log_trees */ memcpy(<, arg, sizeof(struct scoutfs_log_trees)); ret = scoutfs_server_hold_commit(sb); if (ret < 0) { scoutfs_err(sb, "server error preparing commit: %d", ret); goto out; } mutex_lock(&server->logs_mutex); /* find the client's existing item */ scoutfs_key_init_log_trees(&key, le64_to_cpu(lt.rid), le64_to_cpu(lt.nr)); ret = scoutfs_btree_lookup(sb, &super->logs_root, &key, &iref); if (ret < 0) { scoutfs_err(sb, "server error finding client logs: %d", ret); goto unlock; } if (ret == 0) scoutfs_btree_put_iref(&iref); /* try to rotate the srch log when big enough */ mutex_lock(&server->srch_mutex); ret = scoutfs_srch_rotate_log(sb, &server->alloc, &server->wri, &super->srch_root, <.srch_file); mutex_unlock(&server->srch_mutex); if (ret < 0) { scoutfs_err(sb, "server error, rotating srch log: %d", ret); goto unlock; } ret = scoutfs_btree_update(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); if (ret < 0) scoutfs_err(sb, "server error updating client logs: %d", ret); unlock: mutex_unlock(&server->logs_mutex); ret = scoutfs_server_apply_commit(sb, ret); if (ret < 0) scoutfs_err(sb, "server error commiting client logs: %d", ret); out: WARN_ON_ONCE(ret < 0); return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); } /* * Give the client the most recent version of the fs btrees that are * visible in persistent storage. We don't want to accidentally give * them our in-memory dirty version. This can be racing with commits. */ static int server_get_roots(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_net_roots roots; int ret; if (arg_len != 0) { memset(&roots, 0, sizeof(roots)); ret = -EINVAL; } else { scoutfs_server_get_roots(sb, &roots); ret = 0; } return scoutfs_net_response(sb, conn, cmd, id, 0, &roots, sizeof(roots)); } /* * A client is being evicted so we want to reclaim resources from their * log tree items. The item trees and bloom refs stay around to be read * and eventually merged and we reclaim all the allocator items. * * The caller holds the commit rwsem which means we do all this work in * one server commit. We'll need to keep the total amount of blocks in * trees in check. * * By the time we're evicting a client they've either synced their data * or have been forcefully removed. The free blocks in the allocator * roots are stable and can be merged back into allocator items for use * without risking overwriting stable data. * * We can return an error without fully reclaiming all the log item's * referenced data. */ static int reclaim_log_trees(struct super_block *sb, u64 rid) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; DECLARE_SERVER_INFO(sb, server); SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_log_trees lt; struct scoutfs_key key; int ret; int err; mutex_lock(&server->logs_mutex); /* find the client's existing item */ scoutfs_key_init_log_trees(&key, rid, 0); ret = scoutfs_btree_next(sb, &super->logs_root, &key, &iref); if (ret == 0) { if (iref.val_len == sizeof(struct scoutfs_log_trees)) { key = *iref.key; memcpy(<, iref.val, iref.val_len); if (le64_to_cpu(key.sklt_rid) != rid) ret = -ENOENT; } else { ret = -EIO; } scoutfs_btree_put_iref(&iref); } if (ret < 0) { if (ret == -ENOENT) ret = 0; goto out; } /* * All of these can return errors after having modified the * allocator trees. We have to try and update the roots in the * log item. */ mutex_lock(&server->alloc_mutex); ret = scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, <.meta_freed) ?: scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, <.meta_avail) ?: alloc_move_empty(sb, &super->data_alloc, <.data_avail) ?: alloc_move_empty(sb, &super->data_alloc, <.data_freed); mutex_unlock(&server->alloc_mutex); err = scoutfs_btree_update(sb, &server->alloc, &server->wri, &super->logs_root, &key, <, sizeof(lt)); BUG_ON(err != 0); /* alloc and log item roots out of sync */ out: mutex_unlock(&server->logs_mutex); return ret; } static void init_trans_seq_key(struct scoutfs_key *key, u64 seq, u64 rid) { *key = (struct scoutfs_key) { .sk_zone = SCOUTFS_TRANS_SEQ_ZONE, .skts_trans_seq = cpu_to_le64(seq), .skts_rid = cpu_to_le64(rid), }; } /* * Give the client the next sequence number for their transaction. They * provide their previous transaction sequence number that they've * committed. * * We track the sequence numbers of transactions that clients have open. * This limits the transaction sequence numbers that can be returned in * the index of inodes by meta and data transaction numbers. We * communicate the largest possible sequence number to clients via an * rpc. * * The transaction sequence tracking is stored in a btree so it is * shared across servers. Final entries are removed when processing a * client's farewell or when it's removed. */ static int server_advance_seq(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; __le64 their_seq; __le64 next_seq; u64 rid = scoutfs_net_client_rid(conn); struct scoutfs_key key; int ret; if (arg_len != sizeof(__le64)) { ret = -EINVAL; goto out; } memcpy(&their_seq, arg, sizeof(their_seq)); ret = scoutfs_server_hold_commit(sb); if (ret) goto out; down_write(&server->seq_rwsem); if (their_seq != 0) { init_trans_seq_key(&key, le64_to_cpu(their_seq), rid); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->trans_seqs, &key); if (ret < 0 && ret != -ENOENT) goto unlock; } next_seq = super->next_trans_seq; le64_add_cpu(&super->next_trans_seq, 1); trace_scoutfs_trans_seq_advance(sb, rid, le64_to_cpu(their_seq), le64_to_cpu(next_seq)); init_trans_seq_key(&key, le64_to_cpu(next_seq), rid); ret = scoutfs_btree_insert(sb, &server->alloc, &server->wri, &super->trans_seqs, &key, NULL, 0); unlock: up_write(&server->seq_rwsem); ret = scoutfs_server_apply_commit(sb, ret); out: return scoutfs_net_response(sb, conn, cmd, id, ret, &next_seq, sizeof(next_seq)); } /* * Remove any transaction sequences owned by the client. They must have * committed any final transaction by the time they get here via sending * their farewell message. This can be called multiple times as the * client's farewell is retransmitted so it's OK to not find any * entries. This is called with the server commit rwsem held. */ static int remove_trans_seq(struct super_block *sb, u64 rid) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; SCOUTFS_BTREE_ITEM_REF(iref); struct scoutfs_key key; int ret = 0; down_write(&server->seq_rwsem); init_trans_seq_key(&key, 0, 0); for (;;) { ret = scoutfs_btree_next(sb, &super->trans_seqs, &key, &iref); if (ret < 0) { if (ret == -ENOENT) ret = 0; break; } key = *iref.key; scoutfs_btree_put_iref(&iref); if (le64_to_cpu(key.skts_rid) == rid) { trace_scoutfs_trans_seq_farewell(sb, rid, le64_to_cpu(key.skts_trans_seq)); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->trans_seqs, &key); break; } scoutfs_key_inc(&key); } up_write(&server->seq_rwsem); return ret; } /* * Give the calling client the last valid trans_seq that it can return * in results from the indices of trans seqs to inodes. These indices * promise to only advance so we can't return results past those that * are still outstanding and not yet visible in the indices. If there * are no outstanding transactions (what? how?) we give them the max * possible sequence. */ static int server_get_last_seq(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; SCOUTFS_BTREE_ITEM_REF(iref); u64 rid = scoutfs_net_client_rid(conn); struct scoutfs_key key; __le64 last_seq = 0; int ret; if (arg_len != 0) { ret = -EINVAL; goto out; } down_read(&server->seq_rwsem); init_trans_seq_key(&key, 0, 0); ret = scoutfs_btree_next(sb, &super->trans_seqs, &key, &iref); if (ret == 0) { key = *iref.key; scoutfs_btree_put_iref(&iref); last_seq = key.skts_trans_seq; } else if (ret == -ENOENT) { last_seq = super->next_trans_seq; ret = 0; } le64_add_cpu(&last_seq, -1ULL); trace_scoutfs_trans_seq_last(sb, rid, le64_to_cpu(last_seq)); up_read(&server->seq_rwsem); out: return scoutfs_net_response(sb, conn, cmd, id, ret, &last_seq, sizeof(last_seq)); } static int server_lock(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { u64 rid = scoutfs_net_client_rid(conn); if (arg_len != sizeof(struct scoutfs_net_lock)) return -EINVAL; return scoutfs_lock_server_request(sb, rid, id, arg); } static int lock_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) { u64 rid = scoutfs_net_client_rid(conn); if (resp_len != sizeof(struct scoutfs_net_lock)) return -EINVAL; return scoutfs_lock_server_response(sb, rid, resp); } int scoutfs_server_lock_request(struct super_block *sb, u64 rid, struct scoutfs_net_lock *nl) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_submit_request_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK, nl, sizeof(*nl), lock_response, NULL, NULL); } int scoutfs_server_lock_response(struct super_block *sb, u64 rid, u64 id, struct scoutfs_net_lock_grant_response *gr) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_response_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK, id, 0, gr, sizeof(*gr)); } static bool invalid_recover(struct scoutfs_net_lock_recover *nlr, unsigned long bytes) { return ((bytes < sizeof(*nlr)) || (bytes != offsetof(struct scoutfs_net_lock_recover, locks[le16_to_cpu(nlr->nr)]))); } static int lock_recover_response(struct super_block *sb, struct scoutfs_net_connection *conn, void *resp, unsigned int resp_len, int error, void *data) { u64 rid = scoutfs_net_client_rid(conn); if (invalid_recover(resp, resp_len)) return -EINVAL; return scoutfs_lock_server_recover_response(sb, rid, resp); } int scoutfs_server_lock_recover_request(struct super_block *sb, u64 rid, struct scoutfs_key *key) { struct server_info *server = SCOUTFS_SB(sb)->server_info; return scoutfs_net_submit_request_node(sb, server->conn, rid, SCOUTFS_NET_CMD_LOCK_RECOVER, key, sizeof(*key), lock_recover_response, NULL, NULL); } static int server_srch_get_compact(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); u64 rid = scoutfs_net_client_rid(conn); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_srch_compact *sc = NULL; int ret; if (arg_len != 0) { ret = -EINVAL; goto out; } sc = kzalloc(sizeof(struct scoutfs_srch_compact), GFP_NOFS); if (sc == NULL) { ret = -ENOMEM; goto out; } ret = scoutfs_server_hold_commit(sb); if (ret) goto out; mutex_lock(&server->srch_mutex); ret = scoutfs_srch_get_compact(sb, &server->alloc, &server->wri, &super->srch_root, rid, sc); mutex_unlock(&server->srch_mutex); if (ret == 0 && sc->nr == 0) ret = -ENOENT; if (ret < 0) goto apply; mutex_lock(&server->alloc_mutex); ret = scoutfs_alloc_fill_list(sb, &server->alloc, &server->wri, &sc->meta_avail, server->meta_avail, SCOUTFS_SERVER_META_FILL_LO, SCOUTFS_SERVER_META_FILL_TARGET) ?: scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, &sc->meta_freed); mutex_unlock(&server->alloc_mutex); if (ret < 0) goto apply; mutex_lock(&server->srch_mutex); ret = scoutfs_srch_update_compact(sb, &server->alloc, &server->wri, &super->srch_root, rid, sc); mutex_unlock(&server->srch_mutex); apply: ret = scoutfs_server_apply_commit(sb, ret); WARN_ON_ONCE(ret < 0 && ret != -ENOENT); /* XXX leaked busy item */ out: ret = scoutfs_net_response(sb, conn, cmd, id, ret, sc, sizeof(struct scoutfs_srch_compact)); kfree(sc); return ret; } /* * Commit the client's compaction. Their freed allocator contains the * source srch files blocks that are currently in use which can't be * available for allocation until after the commit. We move them into * freed so they won't satisfy allocations. */ static int server_srch_commit_compact(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { DECLARE_SERVER_INFO(sb, server); u64 rid = scoutfs_net_client_rid(conn); struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_srch_compact *sc; struct scoutfs_alloc_list_head av; struct scoutfs_alloc_list_head fr; int ret; if (arg_len != sizeof(struct scoutfs_srch_compact)) { ret = -EINVAL; goto out; } sc = arg; ret = scoutfs_server_hold_commit(sb); if (ret) goto out; mutex_lock(&server->srch_mutex); ret = scoutfs_srch_commit_compact(sb, &server->alloc, &server->wri, &super->srch_root, rid, sc, &av, &fr); mutex_unlock(&server->srch_mutex); if (ret < 0) /* XXX very bad, leaks allocators */ goto apply; /* reclaim allocators if they were set by _srch_commit_ */ mutex_lock(&server->alloc_mutex); ret = scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, &av) ?: scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, &fr); mutex_unlock(&server->alloc_mutex); apply: ret = scoutfs_server_apply_commit(sb, ret); out: WARN_ON(ret < 0); /* XXX leaks allocators */ return scoutfs_net_response(sb, conn, cmd, id, ret, NULL, 0); } static void init_mounted_client_key(struct scoutfs_key *key, u64 rid) { *key = (struct scoutfs_key) { .sk_zone = SCOUTFS_MOUNTED_CLIENT_ZONE, .skmc_rid = cpu_to_le64(rid), }; } static int insert_mounted_client(struct super_block *sb, u64 rid, u64 gr_flags) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_mounted_client_btree_val mcv; struct scoutfs_key key; init_mounted_client_key(&key, rid); mcv.flags = 0; if (gr_flags & SCOUTFS_NET_GREETING_FLAG_VOTER) mcv.flags |= SCOUTFS_MOUNTED_CLIENT_VOTER; return scoutfs_btree_insert(sb, &server->alloc, &server->wri, &super->mounted_clients, &key, &mcv, sizeof(mcv)); } /* * Remove the record of a mounted client. The record can already be * removed if we're processing a farewell on behalf of a client that * already had a previous server process its farewell. * * When we remove the last mounted client that's voting we write a new * quorum block with the updated unmount_barrier. * * The caller has to serialize with farewell processing. */ static int delete_mounted_client(struct super_block *sb, u64 rid) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_key key; int ret; init_mounted_client_key(&key, rid); ret = scoutfs_btree_delete(sb, &server->alloc, &server->wri, &super->mounted_clients, &key); if (ret == -ENOENT) ret = 0; return ret; } /* * Remove all the busy items for srch compactions that the mount might * have been responsible for and reclaim all their allocators. The freed * allocator could still contain stable srch file blknos. */ static int cancel_srch_compact(struct super_block *sb, u64 rid) { DECLARE_SERVER_INFO(sb, server); struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_alloc_list_head av; struct scoutfs_alloc_list_head fr; int ret; for (;;) { mutex_lock(&server->srch_mutex); ret = scoutfs_srch_cancel_compact(sb, &server->alloc, &server->wri, &super->srch_root, rid, &av, &fr); mutex_unlock(&server->srch_mutex); if (ret < 0) { if (ret == -ENOENT) ret = 0; break; } mutex_lock(&server->alloc_mutex); ret = scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, &av) ?: scoutfs_alloc_splice_list(sb, &server->alloc, &server->wri, server->other_freed, &fr); mutex_unlock(&server->alloc_mutex); if (WARN_ON_ONCE(ret < 0)) break; } return ret; } /* * Process an incoming greeting request in the server from the client. * We try to send responses to failed greetings so that the sender can * log some detail before shutting down. A failure to send a greeting * response shuts down the connection. * * If a client reconnects they'll send their previously received * serer_term in their greeting request. * * XXX The logic of this has gotten convoluted. The lock server can * send a recovery request so it needs to be called after the core net * greeting call enables messages. But we want the greeting reply to be * sent first, so we currently queue it on the send queue before * enabling messages. That means that a lot of errors that happen after * the reply can't be sent to the client. They'll just see a disconnect * and won't know what's happened. This all needs to be refactored. */ static int server_greeting(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_net_greeting *gr = arg; struct scoutfs_net_greeting greet; DECLARE_SERVER_INFO(sb, server); __le64 umb = 0; bool reconnecting; bool first_contact; bool farewell; int ret = 0; int err; if (arg_len != sizeof(struct scoutfs_net_greeting)) { ret = -EINVAL; goto send_err; } if (gr->fsid != super->hdr.fsid) { scoutfs_warn(sb, "client sent fsid 0x%llx, server has 0x%llx", le64_to_cpu(gr->fsid), le64_to_cpu(super->hdr.fsid)); ret = -EINVAL; goto send_err; } if (gr->format_hash != super->format_hash) { scoutfs_warn(sb, "client sent format 0x%llx, server has 0x%llx", le64_to_cpu(gr->format_hash), le64_to_cpu(super->format_hash)); ret = -EINVAL; goto send_err; } if (gr->server_term == 0) { ret = scoutfs_server_hold_commit(sb); if (ret < 0) goto send_err; spin_lock(&server->lock); umb = super->unmount_barrier; spin_unlock(&server->lock); mutex_lock(&server->farewell_mutex); ret = insert_mounted_client(sb, le64_to_cpu(gr->rid), le64_to_cpu(gr->flags)); mutex_unlock(&server->farewell_mutex); ret = scoutfs_server_apply_commit(sb, ret); queue_work(server->wq, &server->farewell_work); } else { umb = gr->unmount_barrier; } send_err: err = ret; greet.fsid = super->hdr.fsid; greet.format_hash = super->format_hash; greet.server_term = cpu_to_le64(server->term); greet.unmount_barrier = umb; greet.rid = gr->rid; greet.flags = 0; /* queue greeting response to be sent first once messaging enabled */ ret = scoutfs_net_response(sb, conn, cmd, id, err, &greet, sizeof(greet)); if (ret == 0 && err) ret = err; if (ret) goto out; /* have the net core enable messaging and resend */ reconnecting = gr->server_term != 0; first_contact = le64_to_cpu(gr->server_term) != server->term; if (gr->flags & cpu_to_le64(SCOUTFS_NET_GREETING_FLAG_FAREWELL)) farewell = true; else farewell = false; scoutfs_net_server_greeting(sb, conn, le64_to_cpu(gr->rid), id, reconnecting, first_contact, farewell); /* lock server might send recovery request */ if (le64_to_cpu(gr->server_term) != server->term) { /* we're now doing two commits per greeting, not great */ ret = scoutfs_server_hold_commit(sb); if (ret) goto out; ret = scoutfs_lock_server_greeting(sb, le64_to_cpu(gr->rid), gr->server_term != 0); ret = scoutfs_server_apply_commit(sb, ret); if (ret) goto out; } out: return ret; } struct farewell_request { struct list_head entry; u64 net_id; u64 rid; }; static bool invalid_mounted_client_item(struct scoutfs_btree_item_ref *iref) { return (iref->val_len != sizeof(struct scoutfs_mounted_client_btree_val)); } /* * This work processes farewell requests asynchronously. Requests from * voting clients can be held until only the final quorum remains and * they've all sent farewell requests. * * When we remove the last mounted client record for the last voting * client then we increase the unmount_barrier and write it to the super * block. If voting clients don't get their farewell response they'll * see the greater umount_barrier in the super and will know that their * farewell has been processed and that they can exit. * * Responses that are waiting for clients who aren't voting are * immediately sent. Clients that don't have a mounted client record * have already had their farewell processed by another server and can * proceed. * * Farewell responses are unique in that sending them causes the server * to shutdown the connection to the client next time the socket * disconnects. If the socket is destroyed before the client gets the * response they'll reconnect and we'll see them as a brand new client * who immediately sends a farewell. It'll be processed and it all * works out. * * If this worker sees an error it assumes that this sever is done for * and that another had better take its place. */ static void farewell_worker(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, farewell_work); struct super_block *sb = server->sb; struct scoutfs_super_block *super = &SCOUTFS_SB(sb)->super; struct scoutfs_mounted_client_btree_val *mcv; struct farewell_request *tmp; struct farewell_request *fw; SCOUTFS_BTREE_ITEM_REF(iref); unsigned int nr_unmounting = 0; unsigned int nr_mounted = 0; struct scoutfs_key key; LIST_HEAD(reqs); LIST_HEAD(send); bool deleted = false; bool voting; bool more_reqs; int ret; /* grab all the requests that are waiting */ mutex_lock(&server->farewell_mutex); list_splice_init(&server->farewell_requests, &reqs); mutex_unlock(&server->farewell_mutex); /* count how many reqs requests are from voting clients */ nr_unmounting = 0; list_for_each_entry_safe(fw, tmp, &reqs, entry) { init_mounted_client_key(&key, fw->rid); ret = scoutfs_btree_lookup(sb, &super->mounted_clients, &key, &iref); if (ret == 0 && invalid_mounted_client_item(&iref)) { scoutfs_btree_put_iref(&iref); ret = -EIO; } if (ret < 0) { if (ret == -ENOENT) { list_move_tail(&fw->entry, &send); continue; } goto out; } mcv = iref.val; voting = (mcv->flags & SCOUTFS_MOUNTED_CLIENT_VOTER) != 0; scoutfs_btree_put_iref(&iref); if (!voting) { list_move_tail(&fw->entry, &send); continue; } nr_unmounting++; } /* see how many mounted clients could vote for quorum */ init_mounted_client_key(&key, 0); for (;;) { ret = scoutfs_btree_next(sb, &super->mounted_clients, &key, &iref); if (ret == 0 && invalid_mounted_client_item(&iref)) { scoutfs_btree_put_iref(&iref); ret = -EIO; } if (ret != 0) { if (ret == -ENOENT) break; goto out; } key = *iref.key; mcv = iref.val; if (mcv->flags & SCOUTFS_MOUNTED_CLIENT_VOTER) nr_mounted++; scoutfs_btree_put_iref(&iref); scoutfs_key_inc(&key); } /* send as many responses as we can to maintain quorum */ while ((fw = list_first_entry_or_null(&reqs, struct farewell_request, entry)) && (nr_mounted > super->quorum_count || nr_unmounting >= nr_mounted)) { list_move_tail(&fw->entry, &send); nr_mounted--; nr_unmounting--; deleted = true; } /* process and send farewell responses */ list_for_each_entry_safe(fw, tmp, &send, entry) { ret = scoutfs_server_hold_commit(sb); if (ret) goto out; ret = scoutfs_lock_server_farewell(sb, fw->rid) ?: remove_trans_seq(sb, fw->rid) ?: reclaim_log_trees(sb, fw->rid) ?: delete_mounted_client(sb, fw->rid) ?: cancel_srch_compact(sb, fw->rid); ret = scoutfs_server_apply_commit(sb, ret); if (ret) goto out; } /* update the unmount barrier if we deleted all voting clients */ if (deleted && nr_mounted == 0) { ret = scoutfs_server_hold_commit(sb); if (ret) goto out; le64_add_cpu(&super->unmount_barrier, 1); ret = scoutfs_server_apply_commit(sb, ret); if (ret) goto out; } /* and finally send all the responses */ list_for_each_entry_safe(fw, tmp, &send, entry) { ret = scoutfs_net_response_node(sb, server->conn, fw->rid, SCOUTFS_NET_CMD_FAREWELL, fw->net_id, 0, NULL, 0); if (ret) break; list_del_init(&fw->entry); kfree(fw); } ret = 0; out: mutex_lock(&server->farewell_mutex); more_reqs = !list_empty(&server->farewell_requests); list_splice_init(&reqs, &server->farewell_requests); list_splice_init(&send, &server->farewell_requests); mutex_unlock(&server->farewell_mutex); if (ret < 0) stop_server(server); else if (more_reqs && !server->shutting_down) queue_work(server->wq, &server->farewell_work); } static void free_farewell_requests(struct super_block *sb, u64 rid) { struct server_info *server = SCOUTFS_SB(sb)->server_info; struct farewell_request *tmp; struct farewell_request *fw; mutex_lock(&server->farewell_mutex); list_for_each_entry_safe(fw, tmp, &server->farewell_requests, entry) { if (rid == 0 || fw->rid == rid) { list_del_init(&fw->entry); kfree(fw); } } mutex_unlock(&server->farewell_mutex); } /* * The server is receiving a farewell message from a client that is * unmounting. It won't send any more requests and once it receives our * response it will not reconnect. * * XXX we should make sure that all our requests to the client have finished * before we respond. Locking will have its own messaging for orderly * shutdown. That leaves compaction which will be addressed as part of * the larger work of recovering compactions that were in flight when * a client crashed. */ static int server_farewell(struct super_block *sb, struct scoutfs_net_connection *conn, u8 cmd, u64 id, void *arg, u16 arg_len) { struct server_info *server = SCOUTFS_SB(sb)->server_info; u64 rid = scoutfs_net_client_rid(conn); struct farewell_request *fw; if (arg_len != 0) return -EINVAL; /* XXX tear down if we fence, or if we shut down */ fw = kmalloc(sizeof(struct farewell_request), GFP_NOFS); if (fw == NULL) return -ENOMEM; fw->rid = rid; fw->net_id = id; mutex_lock(&server->farewell_mutex); list_add_tail(&fw->entry, &server->farewell_requests); mutex_unlock(&server->farewell_mutex); queue_work(server->wq, &server->farewell_work); /* response will be sent later */ return 0; } static scoutfs_net_request_t server_req_funcs[] = { [SCOUTFS_NET_CMD_GREETING] = server_greeting, [SCOUTFS_NET_CMD_ALLOC_INODES] = server_alloc_inodes, [SCOUTFS_NET_CMD_GET_LOG_TREES] = server_get_log_trees, [SCOUTFS_NET_CMD_COMMIT_LOG_TREES] = server_commit_log_trees, [SCOUTFS_NET_CMD_GET_ROOTS] = server_get_roots, [SCOUTFS_NET_CMD_ADVANCE_SEQ] = server_advance_seq, [SCOUTFS_NET_CMD_GET_LAST_SEQ] = server_get_last_seq, [SCOUTFS_NET_CMD_LOCK] = server_lock, [SCOUTFS_NET_CMD_SRCH_GET_COMPACT] = server_srch_get_compact, [SCOUTFS_NET_CMD_SRCH_COMMIT_COMPACT] = server_srch_commit_compact, [SCOUTFS_NET_CMD_FAREWELL] = server_farewell, }; static void server_notify_up(struct super_block *sb, struct scoutfs_net_connection *conn, void *info, u64 rid) { struct server_client_info *sci = info; DECLARE_SERVER_INFO(sb, server); if (rid != 0) { sci->rid = rid; spin_lock(&server->lock); list_add_tail(&sci->head, &server->clients); server->nr_clients++; trace_scoutfs_server_client_up(sb, rid, server->nr_clients); spin_unlock(&server->lock); } } static void server_notify_down(struct super_block *sb, struct scoutfs_net_connection *conn, void *info, u64 rid) { struct server_client_info *sci = info; DECLARE_SERVER_INFO(sb, server); if (rid != 0) { spin_lock(&server->lock); list_del_init(&sci->head); server->nr_clients--; trace_scoutfs_server_client_down(sb, rid, server->nr_clients); spin_unlock(&server->lock); free_farewell_requests(sb, rid); } else { stop_server(server); } } static void scoutfs_server_worker(struct work_struct *work) { struct server_info *server = container_of(work, struct server_info, work); struct super_block *sb = server->sb; struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct scoutfs_super_block *super = &sbi->super; struct scoutfs_net_connection *conn = NULL; DECLARE_WAIT_QUEUE_HEAD(waitq); struct sockaddr_in sin; LIST_HEAD(conn_list); u64 max_vers; int ret; int err; trace_scoutfs_server_work_enter(sb, 0, 0); sin = server->listen_sin; scoutfs_info(sb, "server setting up at "SIN_FMT, SIN_ARG(&sin)); conn = scoutfs_net_alloc_conn(sb, server_notify_up, server_notify_down, sizeof(struct server_client_info), server_req_funcs, "server"); if (!conn) { ret = -ENOMEM; goto out; } ret = scoutfs_net_bind(sb, conn, &sin); if (ret) { scoutfs_err(sb, "server failed to bind to "SIN_FMT", err %d%s", SIN_ARG(&sin), ret, ret == -EADDRNOTAVAIL ? " (Bad address?)" : ""); goto out; } if (ret) goto out; /* start up the server subsystems before accepting */ ret = scoutfs_read_super(sb, super); if (ret < 0) goto shutdown; set_roots(server, &super->fs_root, &super->logs_root, &super->srch_root); scoutfs_block_writer_init(sb, &server->wri); /* prepare server alloc for this transaction, larger first */ if (le64_to_cpu(super->server_meta_avail[0].total_nr) < le64_to_cpu(super->server_meta_avail[1].total_nr)) server->other_ind = 0; else server->other_ind = 1; scoutfs_alloc_init(&server->alloc, &super->server_meta_avail[server->other_ind ^ 1], &super->server_meta_freed[server->other_ind ^ 1]); server->other_avail = &super->server_meta_avail[server->other_ind]; server->other_freed = &super->server_meta_freed[server->other_ind]; /* use largest meta_alloc to start */ server->meta_avail = &super->meta_alloc[0]; server->meta_freed = &super->meta_alloc[1]; if (le64_to_cpu(server->meta_freed->total_len) > le64_to_cpu(server->meta_avail->total_len)) swap(server->meta_avail, server->meta_freed); ret = scoutfs_forest_get_max_vers(sb, super, &max_vers); if (ret) { scoutfs_err(sb, "server couldn't find max item vers: %d", ret); goto shutdown; } ret = scoutfs_lock_server_setup(sb, &server->alloc, &server->wri, max_vers); if (ret) goto shutdown; /* * Write our address in the super before it's possible for net * processing to start writing the super as part of * transactions. In theory clients could be trying to connect * to our address without having seen it in the super (maybe * they saw it a long time ago). */ scoutfs_addr_from_sin(&super->server_addr, &sin); super->quorum_server_term = cpu_to_le64(server->term); ret = scoutfs_write_super(sb, super); if (ret < 0) goto shutdown; /* start accepting connections and processing work */ server->conn = conn; scoutfs_net_listen(sb, conn); scoutfs_info(sb, "server ready at "SIN_FMT, SIN_ARG(&sin)); complete(&server->start_comp); /* wait_event/wake_up provide barriers */ wait_event_interruptible(server->waitq, server->shutting_down); shutdown: scoutfs_info(sb, "server shutting down at "SIN_FMT, SIN_ARG(&sin)); /* wait for request processing */ scoutfs_net_shutdown(sb, conn); /* wait for commit queued by request processing */ flush_work(&server->commit_work); server->conn = NULL; scoutfs_lock_server_destroy(sb); out: scoutfs_quorum_clear_leader(sb); scoutfs_net_free_conn(sb, conn); scoutfs_info(sb, "server stopped at "SIN_FMT, SIN_ARG(&sin)); trace_scoutfs_server_work_exit(sb, 0, ret); /* * Always try to clear our presence in the super so that we're * not fenced. We do this last because other mounts will try to * reach quorum the moment they see zero here. The later we do * this the longer we have to finish shutdown while clients * timeout. */ err = scoutfs_read_super(sb, super); if (err == 0) { super->quorum_fenced_term = cpu_to_le64(server->term); memset(&super->server_addr, 0, sizeof(super->server_addr)); err = scoutfs_write_super(sb, super); } if (err < 0) { scoutfs_err(sb, "failed to clear election term %llu at "SIN_FMT", this mount could be fenced", server->term, SIN_ARG(&sin)); } server->err = ret; complete(&server->start_comp); } /* * Wait for the server to successfully start. If this returns error then * the super block's fence_term has been set to the new server's term so * that it won't be fenced. */ int scoutfs_server_start(struct super_block *sb, struct sockaddr_in *sin, u64 term) { DECLARE_SERVER_INFO(sb, server); server->err = 0; server->shutting_down = false; server->listen_sin = *sin; server->term = term; init_completion(&server->start_comp); queue_work(server->wq, &server->work); wait_for_completion(&server->start_comp); return server->err; } /* * Start shutdown on the server but don't want for it to finish. */ void scoutfs_server_abort(struct super_block *sb) { DECLARE_SERVER_INFO(sb, server); stop_server(server); } /* * Once the server is stopped we give the caller our election info * which might have been modified while we were running. */ void scoutfs_server_stop(struct super_block *sb) { DECLARE_SERVER_INFO(sb, server); stop_server(server); /* XXX not sure both are needed */ cancel_work_sync(&server->work); cancel_work_sync(&server->commit_work); } int scoutfs_server_setup(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct server_info *server; server = kzalloc(sizeof(struct server_info), GFP_KERNEL); if (!server) return -ENOMEM; server->sb = sb; spin_lock_init(&server->lock); init_waitqueue_head(&server->waitq); INIT_WORK(&server->work, scoutfs_server_worker); init_rwsem(&server->commit_rwsem); init_llist_head(&server->commit_waiters); INIT_WORK(&server->commit_work, scoutfs_server_commit_func); init_rwsem(&server->seq_rwsem); INIT_LIST_HEAD(&server->clients); mutex_init(&server->farewell_mutex); INIT_LIST_HEAD(&server->farewell_requests); INIT_WORK(&server->farewell_work, farewell_worker); mutex_init(&server->alloc_mutex); mutex_init(&server->logs_mutex); mutex_init(&server->srch_mutex); seqcount_init(&server->roots_seqcount); server->wq = alloc_workqueue("scoutfs_server", WQ_UNBOUND | WQ_NON_REENTRANT, 0); if (!server->wq) { kfree(server); return -ENOMEM; } sbi->server_info = server; return 0; } /* * The caller should have already stopped but we do the same just in * case. */ void scoutfs_server_destroy(struct super_block *sb) { struct scoutfs_sb_info *sbi = SCOUTFS_SB(sb); struct server_info *server = sbi->server_info; if (server) { stop_server(server); /* wait for server work to wait for everything to shut down */ cancel_work_sync(&server->work); /* recv work/compaction could have left commit_work queued */ cancel_work_sync(&server->commit_work); /* pending farewell requests are another server's problem */ cancel_work_sync(&server->farewell_work); free_farewell_requests(sb, 0); trace_scoutfs_server_workqueue_destroy(sb, 0, 0); destroy_workqueue(server->wq); kfree(server); sbi->server_info = NULL; } }