diff --git a/init.cc b/init.cc index 3bbedb1b76..071bf7fe02 100644 --- a/init.cc +++ b/init.cc @@ -8,8 +8,8 @@ #include "gms/gossiper.hh" #include "service/storage_service.hh" -future<> init_storage_service() { - return service::init_storage_service().then([] { +future<> init_storage_service(distributed& db) { + return service::init_storage_service(db).then([] { engine().at_exit([] { return service::deinit_storage_service(); }); }); } diff --git a/init.hh b/init.hh index 39a5414eb0..8d7b42b5ec 100644 --- a/init.hh +++ b/init.hh @@ -5,7 +5,9 @@ #include #include -#include +#include +#include "db/config.hh" +#include "database.hh" -future<> init_storage_service(); +future<> init_storage_service(distributed& db); future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider); diff --git a/main.cc b/main.cc index b5f31fc404..e2d0db8fb3 100644 --- a/main.cc +++ b/main.cc @@ -114,8 +114,8 @@ int main(int ac, char** av) { using namespace locator; return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).then([] { engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); }); - }).then([] { - return init_storage_service(); + }).then([&db] { + return init_storage_service(db); }).then([&db, cfg] { return db.start(std::move(*cfg)).then([&db] { engine().at_exit([&db] { return db.stop(); }); diff --git a/service/storage_service.cc b/service/storage_service.cc index ecc9326710..22cfadca73 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -11,6 +11,13 @@ #include "log.hh" #include "service/migration_manager.hh" #include "to_string.hh" +#include "gms/gossiper.hh" +#include +#include + +using token = dht::token; +using UUID = utils::UUID; +using inet_address = gms::inet_address; namespace service { @@ -20,94 +27,133 @@ int storage_service::RING_DELAY = storage_service::get_ring_delay(); distributed _the_storage_service; +bool is_replacing() { + // FIXME: DatabaseDescriptor.isReplacing() + return false; +} + +bool is_auto_bootstrap() { + // FIXME: DatabaseDescriptor.isAutoBootstrap() + return true; +} + +std::set get_seeds() { + // FIXME: DatabaseDescriptor.getSeeds() + auto& gossiper = gms::get_local_gossiper(); + return gossiper.get_seeds(); +} + +std::set get_replace_tokens() { + // FIXME: DatabaseDescriptor.getReplaceTokens() + return {}; +} + +std::experimental::optional get_replace_node() { + // FIXME: DatabaseDescriptor.getReplaceNode() + return {}; +} + +std::experimental::optional get_replace_address() { + // FIXME: DatabaseDescriptor.getReplaceAddress() + return {}; +} + +bool get_property_join_ring() { + // FIXME: Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) + return true; +} + +bool get_property_rangemovement() { + // FIXME: Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true") + return true; +} + +bool get_property_load_ring_state() { + // FIXME: Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")) + return true; +} + bool storage_service::should_bootstrap() { // FIXME: Currently, we do boostrap if we are not a seed node. // return DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()); - auto& gossiper = gms::get_local_gossiper(); - auto seeds = gossiper.get_seeds(); - return !seeds.count(get_broadcast_address()); + return is_auto_bootstrap() && !get_seeds().count(get_broadcast_address()); } future<> storage_service::prepare_to_join() { - if (!_joined) { -#if 0 - if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) - throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); - if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null) - throw new RuntimeException("Replace method removed; use cassandra.replace_address instead"); - if (DatabaseDescriptor.isReplacing()) - { - if (SystemKeyspace.bootstrapComplete()) - throw new RuntimeException("Cannot replace address with a node that is already bootstrapped"); - if (!DatabaseDescriptor.isAutoBootstrap()) - throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); - _bootstrap_tokens = prepareReplacementInfo(); - appStates.put(ApplicationState.TOKENS, valueFactory.tokens(_bootstrap_tokens)); - appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); - } - else if (should_bootstrap()) - { - checkForEndpointCollision(); - } -#endif - auto f = make_ready_future<>(); - if (should_bootstrap()) { - f = check_for_endpoint_collision(); - } + if (_joined) { + return make_ready_future<>(); + } - // have to start the gossip service before we can see any info on other nodes. this is necessary - // for bootstrap to get the load info it needs. - // (we won't be part of the storage ring though until we add a counterId to our state, below.) - // Seed the host ID-to-endpoint map with our own ID. - return f.then([] { - return db::system_keyspace::get_local_host_id(); - }).then([this] (auto local_host_id) { - std::map app_states; + std::map app_states; + auto f = make_ready_future<>(); + if (is_replacing() && !get_property_join_ring()) { + throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node"); + } + if (get_replace_tokens().size() > 0 || get_replace_node()) { + throw std::runtime_error("Replace method removed; use cassandra.replace_address instead"); + } + if (is_replacing()) { + // if (SystemKeyspace.bootstrapComplete()) + // throw new RuntimeException("Cannot replace address with a node that is already bootstrapped"); + if (!is_auto_bootstrap()) { + throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); + } + _bootstrap_tokens = prepare_replacement_info(); + app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens)); + app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true)); + } else if (should_bootstrap()) { + f = check_for_endpoint_collision(); + } - _token_metadata.update_host_id(local_host_id, this->get_broadcast_address()); - // FIXME: DatabaseDescriptor.getBroadcastRpcAddress() - auto broadcast_rpc_address = this->get_broadcast_address(); - app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version()); - app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); - app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); - app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version()); - logger.info("Starting up server gossip"); + // have to start the gossip service before we can see any info on other nodes. this is necessary + // for bootstrap to get the load info it needs. + // (we won't be part of the storage ring though until we add a counterId to our state, below.) + // Seed the host ID-to-endpoint map with our own ID. + return f.then([app_states = std::move(app_states)] { + return db::system_keyspace::get_local_host_id(); + }).then([this, app_states = std::move(app_states)] (auto local_host_id) mutable { + _token_metadata.update_host_id(local_host_id, this->get_broadcast_address()); + // FIXME: DatabaseDescriptor.getBroadcastRpcAddress() + auto broadcast_rpc_address = this->get_broadcast_address(); + app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version()); + app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id)); + app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address)); + app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version()); + logger.info("Starting up server gossip"); - auto& gossiper = gms::get_local_gossiper(); - gossiper.register_(this); - using namespace std::chrono; - auto now = high_resolution_clock::now().time_since_epoch(); - int generation_number = duration_cast(now).count(); - // FIXME: SystemKeyspace.incrementAndGetGeneration() - print("Start gossiper service ...\n"); - return gossiper.start(generation_number, app_states).then([this] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.register_(this); + using namespace std::chrono; + auto now = high_resolution_clock::now().time_since_epoch(); + int generation_number = duration_cast(now).count(); + // FIXME: SystemKeyspace.incrementAndGetGeneration() + print("Start gossiper service ...\n"); + return gossiper.start(generation_number, app_states).then([this] { #if SS_DEBUG - gms::get_local_gossiper().debug_show(); - _token_metadata.debug_show(); -#endif - }); - }).then([this] { - // gossip snitch infos (local DC and rack) - gossip_snitch_info(); - auto& proxy = service::get_local_storage_proxy(); - // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) - return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates - -#if 0 - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(FBUtilities.getLocalAddress()); - LoadBroadcaster.instance.startBroadcasting(); - - HintedHandOffManager.instance.start(); - BatchlogManager.instance.start(); + gms::get_local_gossiper().debug_show(); + _token_metadata.debug_show(); #endif }); - } - return make_ready_future<>(); + }).then([this] { + // gossip snitch infos (local DC and rack) + gossip_snitch_info(); + auto& proxy = service::get_local_storage_proxy(); + // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) + return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates + +#if 0 + if (!MessagingService.instance().isListening()) + MessagingService.instance().listen(FBUtilities.getLocalAddress()); + LoadBroadcaster.instance.startBroadcasting(); + + HintedHandOffManager.instance.start(); + BatchlogManager.instance.start(); +#endif + }); } future<> storage_service::join_token_ring(int delay) { - auto f = make_ready_future<>(); + return seastar::async([this, delay] { _joined = true; #if 0 // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed. @@ -125,64 +171,42 @@ future<> storage_service::join_token_ring(int delay) { SystemKeyspace.bootstrapInProgress(), SystemKeyspace.bootstrapComplete(), DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())); - if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) - logger.info("This node will not auto bootstrap because it is configured to be a seed node."); #endif + if (is_auto_bootstrap() && /* !SystemKeyspace.bootstrapComplete() && */ get_seeds().count(get_broadcast_address())) { + logger.info("This node will not auto bootstrap because it is configured to be a seed node."); + } if (should_bootstrap()) { - auto elapsed = make_shared(0); - auto stop_cond = [elapsed, delay] { - // FIXME - // if we see schema, we can proceed to the next check directly - // if (!Schema.instance.getVersion().equals(Schema.emptyVersion)) { - // return true; - // } - if (*elapsed < delay) { - return false; - } - return true; - }; - f = do_until(stop_cond, [elapsed] { - auto t = 1000; - return sleep(std::chrono::milliseconds(t)).then([elapsed, t] { - *elapsed += t; - }); - }).then([this] { - _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata); - return bootstrap(_bootstrap_tokens); - }); #if 0 if (SystemKeyspace.bootstrapInProgress()) logger.warn("Detected previous bootstrap failure; retrying"); else SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS); - setMode(Mode.JOINING, "waiting for ring information", true); +#endif + set_mode(mode::JOINING, "waiting for ring information", true); // first sleep the delay to make sure we see all our peers - for (int i = 0; i < delay; i += 1000) - { + for (int i = 0; i < delay; i += 1000) { // if we see schema, we can proceed to the next check directly - if (!Schema.instance.getVersion().equals(Schema.emptyVersion)) - { - logger.debug("got schema: {}", Schema.instance.getVersion()); + if (_db.local().get_version() != database::empty_version) { + logger.debug("got schema: {}", _db.local().get_version()); break; } - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + sleep(std::chrono::seconds(1)).get(); } +#if 0 // if our schema hasn't matched yet, keep sleeping until it does // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) while (!MigrationManager.isReadyForBootstrap()) { - setMode(Mode.JOINING, "waiting for schema information to complete", true); + set_mode(mode::JOINING, "waiting for schema information to complete", true); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } - setMode(Mode.JOINING, "schema complete, ready to bootstrap", true); - setMode(Mode.JOINING, "waiting for pending range calculation", true); - PendingRangeCalculatorService.instance.blockUntilFinished(); - setMode(Mode.JOINING, "calculation complete, ready to bootstrap", true); - - - if (logger.isDebugEnabled()) - logger.debug("... got ring + schema info"); - +#endif + set_mode(mode::JOINING, "schema complete, ready to bootstrap", true); + set_mode(mode::JOINING, "waiting for pending range calculation", true); + //PendingRangeCalculatorService.instance.blockUntilFinished(); + set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true); + logger.debug("... got ring + schema info"); +#if 0 if (Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")) && ( _token_metadata.getBootstrapTokens().valueSet().size() > 0 || @@ -190,71 +214,52 @@ future<> storage_service::join_token_ring(int delay) { _token_metadata.getMovingEndpoints().size() > 0 )) throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true"); +#endif - if (!DatabaseDescriptor.isReplacing()) - { - if (_token_metadata.isMember(FBUtilities.getBroadcastAddress())) - { - String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"; - throw new UnsupportedOperationException(s); + if (!is_replacing()) { + if (_token_metadata.is_member(get_broadcast_address())) { + throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"); } - setMode(Mode.JOINING, "getting bootstrap token", true); - _bootstrap_tokens = BootStrapper.getBootstrapTokens(_token_metadata); - } - else - { - if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress())) - { - try - { - // Sleep additionally to make sure that the server actually is not alive - // and giving it more time to gossip if alive. - Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + set_mode(mode::JOINING, "getting bootstrap token", true); + _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata); + } else { + auto replace_addr = get_replace_address(); + if (replace_addr && *replace_addr != get_broadcast_address()) { + // Sleep additionally to make sure that the server actually is not alive + // and giving it more time to gossip if alive. + // FIXME: LoadBroadcaster.BROADCAST_INTERVAL + std::chrono::milliseconds broadcast_interval(60 * 1000); + sleep(broadcast_interval).get(); // check for operator errors... - for (Token token : _bootstrap_tokens) - { - InetAddress existing = _token_metadata.getEndpoint(token); - if (existing != null) - { + for (auto token : _bootstrap_tokens) { + auto existing = _token_metadata.get_endpoint(token); + if (existing) { +#if 0 long nanoDelay = delay * 1000000L; if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.nanoTime() - nanoDelay)) throw new UnsupportedOperationException("Cannot replace a live node... "); current.add(existing); - } - else - { - throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!"); - } - } - } - else - { - try - { - Thread.sleep(RING_DELAY); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - } - setMode(Mode.JOINING, "Replacing a node with token(s): " + _bootstrap_tokens, true); - } - - bootstrap(_bootstrap_tokens); - assert !_is_bootstrap_mode; // bootstrap will block until finished #endif + } else { + throw std::runtime_error(sprint("Cannot replace token %s which does not exist!", token)); + } + } + } else { + sleep(std::chrono::milliseconds(RING_DELAY)).get(); + } + std::stringstream ss; + ss << _bootstrap_tokens; + set_mode(mode::JOINING, sprint("Replacing a node with token(s): %s", ss.str()), true); + } + bootstrap(_bootstrap_tokens).get(); + // FIXME: _is_bootstrap_mode is set to fasle in BootStrapper::bootstrap + // assert(!_is_bootstrap_mode); // bootstrap will block until finished } else { // FIXME: DatabaseDescriptor.getNumTokens() size_t num_tokens = 3; _bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens); + logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens); #if 0 _bootstrap_tokens = SystemKeyspace.getSavedTokens(); if (_bootstrap_tokens.isEmpty()) @@ -285,9 +290,7 @@ future<> storage_service::join_token_ring(int delay) { } #endif } - - return f.then([this] { - return set_tokens(_bootstrap_tokens); + set_tokens(_bootstrap_tokens).get(); #if 0 // if we don't have system_traces keyspace at this point, then create it manually if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null) @@ -352,18 +355,18 @@ future<> storage_service::bootstrap(std::unordered_set tokens) { gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)); gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)); sleep_time = std::chrono::milliseconds(RING_DELAY); - // setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true); + set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true); } else { // Dont set any state for the node which is bootstrapping the existing token... _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); // SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress()); } - return sleep(sleep_time).then([tokens = std::move(tokens)] { + return sleep(sleep_time).then([this, tokens = std::move(tokens)] { auto& gossiper = gms::get_local_gossiper(); if (!gossiper.seen_any_seed()) { throw std::runtime_error("Unable to contact any seeds!"); } - // setMode(Mode.JOINING, "Starting to bootstrap...", true); + this->set_mode(mode::JOINING, "Starting to bootstrap...", true); // new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, _token_metadata).bootstrap(); // handles token update logger.info("Bootstrap completed! for the tokens {}", tokens); return make_ready_future<>(); @@ -812,7 +815,7 @@ future<> storage_service::set_tokens(std::unordered_set tokens) { auto& gossiper = gms::get_local_gossiper(); gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)); gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)); - //setMode(Mode.NORMAL, false); + set_mode(mode::NORMAL, false); replicate_to_all_cores(); }); } @@ -1026,5 +1029,65 @@ void storage_service::remove_endpoint(inet_address endpoint) { }); } +std::unordered_set storage_service::prepare_replacement_info() { + return {}; +#if 0 + logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); + if (!MessagingService.instance().isListening()) + MessagingService.instance().listen(FBUtilities.getLocalAddress()); + + // make magic happen + Gossiper.instance.doShadowRound(); + + UUID hostId = null; + // now that we've gossiped at least once, we should be able to find the node we're replacing + if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) + throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); + hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); + try + { + if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) + throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); + Collection tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); + + SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc + Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need + return tokens; + } + catch (IOException e) + { + throw new RuntimeException(e); + } +#endif +} + +static const std::map mode_names = { + {storage_service::mode::STARTING, "STARTING"}, + {storage_service::mode::NORMAL, "NORMAL"}, + {storage_service::mode::JOINING, "JOINING"}, + {storage_service::mode::LEAVING, "LEAVING"}, + {storage_service::mode::DECOMMISSIONED, "DECOMMISSIONED"}, + {storage_service::mode::MOVING, "MOVING"}, + {storage_service::mode::DRAINING, "DRAINING"}, + {storage_service::mode::DRAINED, "DRAINED"}, +}; + +std::ostream& operator<<(std::ostream& os, const storage_service::mode& m) { + os << mode_names.at(m); + return os; +} + +void storage_service::set_mode(mode m, bool log) { + set_mode(m, "", log); +} + +void storage_service::set_mode(mode m, sstring msg, bool log) { + _operation_mode = m; + if (log) { + logger.info("{}: {}", m, msg); + } else { + logger.debug("{}: {}", m, msg); + } +} } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 01dac318ad..a126faf934 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -34,6 +34,8 @@ #include "db/system_keyspace.hh" #include "core/semaphore.hh" #include "utils/fb_utilities.hh" +#include "database.hh" +#include namespace service { @@ -57,7 +59,11 @@ class storage_service : public gms::i_endpoint_state_change_subscriber /* JMX notification serial number counter */ private final AtomicLong notificationSerialNumber = new AtomicLong(); #endif + distributed& _db; public: + storage_service(distributed& db) + : _db(db) { + } static int RING_DELAY; // delay after which we assume ring has stablized // Needed by distributed<> @@ -139,13 +145,15 @@ private: bool _joined = false; +public: + enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }; +private: + mode _operation_mode = mode::STARTING; + friend std::ostream& operator<<(std::ostream& os, const mode& mode); #if 0 /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double traceProbability = 0.0; - private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } - private Mode operationMode = Mode.STARTING; - /* Used for tracking drain progress */ private volatile int totalCFs, remainingCFs; @@ -325,37 +333,9 @@ public: throw new IllegalStateException("No configured daemon"); daemon.deactivate(); } - - public synchronized Collection prepareReplacementInfo() throws ConfigurationException - { - logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress()); - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(FBUtilities.getLocalAddress()); - - // make magic happen - Gossiper.instance.doShadowRound(); - - UUID hostId = null; - // now that we've gossiped at least once, we should be able to find the node we're replacing - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) - throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); - try - { - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null) - throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); - Collection tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS)))); - - SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc - Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need - return tokens; - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } #endif +public: + std::unordered_set prepare_replacement_info(); future<> check_for_endpoint_collision(); #if 0 @@ -455,23 +435,12 @@ public: { DatabaseDescriptor.setIncrementalBackupsEnabled(value); } - - private void setMode(Mode m, boolean log) - { - setMode(m, null, log); - } - - private void setMode(Mode m, String msg, boolean log) - { - operationMode = m; - String logMsg = msg == null ? m.toString() : String.format("%s: %s", m, msg); - if (log) - logger.info(logMsg); - else - logger.debug(logMsg); - } #endif +private: + void set_mode(mode m, bool log); + void set_mode(mode m, sstring msg, bool log); +public: future<> bootstrap(std::unordered_set tokens); bool is_bootstrap_mode() { @@ -3310,8 +3279,8 @@ inline future> get_token_to_endpoint() { }); } -inline future<> init_storage_service() { - return service::get_storage_service().start().then([] { +inline future<> init_storage_service(distributed& db) { + return service::get_storage_service().start(std::ref(db)).then([] { print("Start Storage service ...\n"); }); } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 140a07d6f4..44764e0435 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -192,11 +192,11 @@ public: } }; -future<> init_once() { +future<> init_once(distributed& db) { static bool done = false; if (!done) { done = true; - return init_storage_service().then([] { + return init_storage_service(db).then([] { return init_ms_fd_gossiper("127.0.0.1", db::config::seed_provider_type()); }); } else { @@ -206,9 +206,9 @@ future<> init_once() { future<::shared_ptr> make_env_for_test() { return locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([] { - return init_once().then([] { - return seastar::async([] { - auto db = ::make_shared>(); + auto db = ::make_shared>(); + return init_once(*db).then([db] { + return seastar::async([db] { auto cfg = make_lw_shared(); cfg->data_file_directories() = {"."}; db->start(std::move(*cfg)).get(); diff --git a/tests/gossip.cc b/tests/gossip.cc index abaa56d1a8..711eec45dc 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -11,15 +11,16 @@ namespace bpo = boost::program_options; int main(int ac, char ** av) { + distributed db; app_template app; app.add_options() ("seed", bpo::value>(), "IP address of seed node") ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen"); - return app.run(ac, av, [&app] { + return app.run(ac, av, [&db, &app] { auto config = app.configuration(); logging::logger_registry().set_logger_level("gossip", logging::log_level::trace); const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); - service::init_storage_service().then([listen, config] { + service::init_storage_service(db).then([listen, config] { return net::get_messaging_service().start(listen); }).then([config] { auto& server = net::get_local_messaging_service();