From f57ba6902b044ed06db8d83c2eb0ee9acebd125e Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 25 Dec 2015 10:40:31 +0800 Subject: [PATCH] storage_service: Introduce ring_delay_ms option It is hard-coded as 30 seconds at the moment. Usage: $ scylla --ring-delay-ms 5000 Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra. --- db/batchlog_manager.cc | 7 +++---- db/config.hh | 3 ++- gms/gossiper.cc | 19 +++++++++---------- service/storage_service.cc | 26 +++++++++++++++----------- service/storage_service.hh | 16 ++-------------- 5 files changed, 31 insertions(+), 40 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index e78a1c0077..1cbe037395 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -56,6 +56,7 @@ #include "unimplemented.hh" #include "db/config.hh" #include "gms/failure_detector.hh" +#include "service/storage_service.hh" static logging::logger logger("batchlog_manager"); @@ -87,10 +88,8 @@ future<> db::batchlog_manager::start() { ); }); }); - _timer.arm( - lowres_clock::now() - + std::chrono::milliseconds( - service::storage_service::RING_DELAY)); + auto ring_delay = service::get_local_storage_service().get_ring_delay(); + _timer.arm(lowres_clock::now() + ring_delay); } return make_ready_future<>(); } diff --git a/db/config.hh b/db/config.hh index 10628c3458..a2144b4ac8 100644 --- a/db/config.hh +++ b/db/config.hh @@ -750,7 +750,8 @@ public: val(replace_token, sstring, "", Used, "The tokens of the node to replace. Same as -Dcassandra.replace_token in cassandra.") \ val(replace_address, sstring, "", Used, "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.") \ val(replace_address_first_boot, sstring, "", Used, "Like replace_address option, but if the node has been bootstrapped sucessfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.") \ - val(override_decommission, bool, false, Used, "Set true to force a decommissioned node to join the cluster") + val(override_decommission, bool, false, Used, "Set true to force a decommissioned node to join the cluster") \ + val(ring_delay_ms, uint32_t, 30 * 1000, Used, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.") /* done! */ #define _make_value_member(name, type, deflt, status, desc, ...) \ diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 2bf632998a..fa6dc93f23 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -91,11 +91,8 @@ void gossiper::set_seeds(std::set _seeds) { } std::chrono::milliseconds gossiper::quarantine_delay() { - return std::chrono::milliseconds(service::storage_service::RING_DELAY * 2); -} - -static auto storage_service_ring_delay() { - return std::chrono::milliseconds(service::storage_service::RING_DELAY); + auto& ss = service::get_local_storage_service(); + return ss.get_ring_delay() * 2; } auto& storage_service_value_factory() { @@ -762,8 +759,9 @@ future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id // remember this node's generation int generation = state.get_heart_beat_state().get_generation(); logger.info("Removing host: {}", host_id); - logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint); - sleep(storage_service_ring_delay()).get(); + auto ring_delay = service::get_local_storage_service().get_ring_delay(); + logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); + sleep(ring_delay).get(); // make sure it did not change auto& eps = endpoint_state_map.at(endpoint); if (eps.get_heart_beat_state().get_generation() != generation) { @@ -823,9 +821,9 @@ future<> gossiper::assassinate_endpoint(sstring address) { int generation = ep_state.get_heart_beat_state().get_generation(); int heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version(); - logger.info("Sleeping for {} ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint); + logger.info("Sleeping for {} ms to ensure {} does not change", ss.get_ring_delay().count(), endpoint); // make sure it did not change - sleep(storage_service_ring_delay()).get(); + sleep(ss.get_ring_delay()).get(); auto it = endpoint_state_map.find(endpoint); if (it == endpoint_state_map.end()) { @@ -1335,7 +1333,8 @@ future<> gossiper::do_shadow_round() { return make_ready_future<>(); }).get(); } - if (clk::now() > t + storage_service_ring_delay() * 60) { + auto& ss = service::get_local_storage_service(); + if (clk::now() > t + ss.get_ring_delay() * 60) { throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)")); } if (this->_in_shadow_round) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 02b616f4f4..6581ba7389 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -76,8 +76,6 @@ namespace service { static logging::logger logger("storage_service"); -int storage_service::RING_DELAY = storage_service::get_ring_delay(); - distributed _the_storage_service; static int get_generation_number() { @@ -325,7 +323,7 @@ void storage_service::join_token_ring(int delay) { } } } else { - sleep(std::chrono::milliseconds(RING_DELAY)).get(); + sleep(get_ring_delay()).get(); } std::stringstream ss; ss << _bootstrap_tokens; @@ -420,8 +418,8 @@ void storage_service::bootstrap(std::unordered_set tokens) { // if not an existing token then bootstrap gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get(); gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)).get(); - set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true); - sleep(std::chrono::milliseconds(RING_DELAY)).get(); + set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", get_ring_delay().count()), true); + sleep(get_ring_delay()).get(); } else { // Dont set any state for the node which is bootstrapping the existing token... _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); @@ -1640,9 +1638,9 @@ future<> storage_service::decommission() { logger.debug("DECOMMISSIONING"); ss.start_leaving().get(); // FIXME: long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout()); - long timeout = ss.get_ring_delay(); - ss.set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout), true); - sleep(std::chrono::milliseconds(timeout)).get(); + auto timeout = ss.get_ring_delay(); + ss.set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout.count()), true); + sleep(timeout).get(); logger.debug("DECOMMISSIONING: unbootstrap starts"); ss.unbootstrap(); @@ -2139,7 +2137,7 @@ void storage_service::leave_ring() { auto& gossiper = gms::get_local_gossiper(); auto expire_time = gossiper.compute_expire_time().time_since_epoch().count(); gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)).get(); - auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL); + auto delay = std::max(get_ring_delay(), gms::gossiper::INTERVAL); logger.info("Announcing that I have left the ring for {}ms", delay.count()); sleep(delay).get(); } @@ -2600,8 +2598,8 @@ future<> storage_service::move(token new_token) { gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get(); ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true); - ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true); - sleep(std::chrono::milliseconds(RING_DELAY)).get(); + ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", ss.get_ring_delay().count()), true); + sleep(ss.get_ring_delay()).get(); storage_service::range_relocator relocator(std::unordered_set{new_token}, keyspaces_to_process); @@ -2627,5 +2625,11 @@ std::map storage_service::get_token_to_endpoint_map() { return _token_metadata.get_normal_and_bootstrapping_token_to_endpoint_map(); } +std::chrono::milliseconds storage_service::get_ring_delay() { + auto ring_delay = _db.local().get_config().ring_delay_ms(); + logger.trace("Set RING_DELAY to {}ms", ring_delay); + return std::chrono::milliseconds(ring_delay); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 88918de9ca..39694ae14d 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -123,7 +123,6 @@ public: storage_service(distributed& db) : _db(db) { } - static int RING_DELAY; // delay after which we assume ring has stablized // Needed by distributed<> future<> stop(); @@ -153,21 +152,10 @@ private: inet_address get_broadcast_address() const { return utils::fb_utilities::get_broadcast_address(); } - static int get_ring_delay() { -#if 0 - String newdelay = System.getProperty("cassandra.ring_delay_ms"); - if (newdelay != null) - { - logger.info("Overriding RING_DELAY to {}ms", newdelay); - return Integer.parseInt(newdelay); - } - else -#endif - return 30 * 1000; - } /* This abstraction maintains the token/endpoint metadata information */ token_metadata _token_metadata; public: + std::chrono::milliseconds get_ring_delay(); gms::versioned_value::factory value_factory; #if 0 public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner()); @@ -363,7 +351,7 @@ public: #endif public: future<> init_server() { - return init_server(RING_DELAY); + return init_server(get_ring_delay().count()); } future<> init_server(int delay);