storage_service: Introduce ring_delay_ms option

It is hard-coded as 30 seconds at the moment.

Usage:
$ scylla --ring-delay-ms 5000

Time a node waits to hear from other nodes before joining the ring in
milliseconds.

Same as -Dcassandra.ring_delay_ms in cassandra.
This commit is contained in:
Asias He
2015-12-25 10:40:31 +08:00
parent 9c07ed8db6
commit f57ba6902b
5 changed files with 31 additions and 40 deletions

View File

@@ -56,6 +56,7 @@
#include "unimplemented.hh"
#include "db/config.hh"
#include "gms/failure_detector.hh"
#include "service/storage_service.hh"
static logging::logger logger("batchlog_manager");
@@ -87,10 +88,8 @@ future<> db::batchlog_manager::start() {
);
});
});
_timer.arm(
lowres_clock::now()
+ std::chrono::milliseconds(
service::storage_service::RING_DELAY));
auto ring_delay = service::get_local_storage_service().get_ring_delay();
_timer.arm(lowres_clock::now() + ring_delay);
}
return make_ready_future<>();
}

View File

@@ -750,7 +750,8 @@ public:
val(replace_token, sstring, "", Used, "The tokens of the node to replace. Same as -Dcassandra.replace_token in cassandra.") \
val(replace_address, sstring, "", Used, "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.") \
val(replace_address_first_boot, sstring, "", Used, "Like replace_address option, but if the node has been bootstrapped sucessfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.") \
val(override_decommission, bool, false, Used, "Set true to force a decommissioned node to join the cluster")
val(override_decommission, bool, false, Used, "Set true to force a decommissioned node to join the cluster") \
val(ring_delay_ms, uint32_t, 30 * 1000, Used, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
/* done! */
#define _make_value_member(name, type, deflt, status, desc, ...) \

View File

@@ -91,11 +91,8 @@ void gossiper::set_seeds(std::set<inet_address> _seeds) {
}
std::chrono::milliseconds gossiper::quarantine_delay() {
return std::chrono::milliseconds(service::storage_service::RING_DELAY * 2);
}
static auto storage_service_ring_delay() {
return std::chrono::milliseconds(service::storage_service::RING_DELAY);
auto& ss = service::get_local_storage_service();
return ss.get_ring_delay() * 2;
}
auto& storage_service_value_factory() {
@@ -762,8 +759,9 @@ future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
logger.info("Sleeping for {}ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint);
sleep(storage_service_ring_delay()).get();
auto ring_delay = service::get_local_storage_service().get_ring_delay();
logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint);
sleep(ring_delay).get();
// make sure it did not change
auto& eps = endpoint_state_map.at(endpoint);
if (eps.get_heart_beat_state().get_generation() != generation) {
@@ -823,9 +821,9 @@ future<> gossiper::assassinate_endpoint(sstring address) {
int generation = ep_state.get_heart_beat_state().get_generation();
int heartbeat = ep_state.get_heart_beat_state().get_heart_beat_version();
logger.info("Sleeping for {} ms to ensure {} does not change", service::storage_service::RING_DELAY, endpoint);
logger.info("Sleeping for {} ms to ensure {} does not change", ss.get_ring_delay().count(), endpoint);
// make sure it did not change
sleep(storage_service_ring_delay()).get();
sleep(ss.get_ring_delay()).get();
auto it = endpoint_state_map.find(endpoint);
if (it == endpoint_state_map.end()) {
@@ -1335,7 +1333,8 @@ future<> gossiper::do_shadow_round() {
return make_ready_future<>();
}).get();
}
if (clk::now() > t + storage_service_ring_delay() * 60) {
auto& ss = service::get_local_storage_service();
if (clk::now() > t + ss.get_ring_delay() * 60) {
throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)"));
}
if (this->_in_shadow_round) {

View File

@@ -76,8 +76,6 @@ namespace service {
static logging::logger logger("storage_service");
int storage_service::RING_DELAY = storage_service::get_ring_delay();
distributed<storage_service> _the_storage_service;
static int get_generation_number() {
@@ -325,7 +323,7 @@ void storage_service::join_token_ring(int delay) {
}
}
} else {
sleep(std::chrono::milliseconds(RING_DELAY)).get();
sleep(get_ring_delay()).get();
}
std::stringstream ss;
ss << _bootstrap_tokens;
@@ -420,8 +418,8 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
// if not an existing token then bootstrap
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)).get();
set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true);
sleep(std::chrono::milliseconds(RING_DELAY)).get();
set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", get_ring_delay().count()), true);
sleep(get_ring_delay()).get();
} else {
// Dont set any state for the node which is bootstrapping the existing token...
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
@@ -1640,9 +1638,9 @@ future<> storage_service::decommission() {
logger.debug("DECOMMISSIONING");
ss.start_leaving().get();
// FIXME: long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout());
long timeout = ss.get_ring_delay();
ss.set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout), true);
sleep(std::chrono::milliseconds(timeout)).get();
auto timeout = ss.get_ring_delay();
ss.set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout.count()), true);
sleep(timeout).get();
logger.debug("DECOMMISSIONING: unbootstrap starts");
ss.unbootstrap();
@@ -2139,7 +2137,7 @@ void storage_service::leave_ring() {
auto& gossiper = gms::get_local_gossiper();
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)).get();
auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL);
auto delay = std::max(get_ring_delay(), gms::gossiper::INTERVAL);
logger.info("Announcing that I have left the ring for {}ms", delay.count());
sleep(delay).get();
}
@@ -2600,8 +2598,8 @@ future<> storage_service::move(token new_token) {
gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get();
ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true);
ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true);
sleep(std::chrono::milliseconds(RING_DELAY)).get();
ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", ss.get_ring_delay().count()), true);
sleep(ss.get_ring_delay()).get();
storage_service::range_relocator relocator(std::unordered_set<token>{new_token}, keyspaces_to_process);
@@ -2627,5 +2625,11 @@ std::map<token, inet_address> storage_service::get_token_to_endpoint_map() {
return _token_metadata.get_normal_and_bootstrapping_token_to_endpoint_map();
}
std::chrono::milliseconds storage_service::get_ring_delay() {
auto ring_delay = _db.local().get_config().ring_delay_ms();
logger.trace("Set RING_DELAY to {}ms", ring_delay);
return std::chrono::milliseconds(ring_delay);
}
} // namespace service

View File

@@ -123,7 +123,6 @@ public:
storage_service(distributed<database>& db)
: _db(db) {
}
static int RING_DELAY; // delay after which we assume ring has stablized
// Needed by distributed<>
future<> stop();
@@ -153,21 +152,10 @@ private:
inet_address get_broadcast_address() const {
return utils::fb_utilities::get_broadcast_address();
}
static int get_ring_delay() {
#if 0
String newdelay = System.getProperty("cassandra.ring_delay_ms");
if (newdelay != null)
{
logger.info("Overriding RING_DELAY to {}ms", newdelay);
return Integer.parseInt(newdelay);
}
else
#endif
return 30 * 1000;
}
/* This abstraction maintains the token/endpoint metadata information */
token_metadata _token_metadata;
public:
std::chrono::milliseconds get_ring_delay();
gms::versioned_value::factory value_factory;
#if 0
public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner());
@@ -363,7 +351,7 @@ public:
#endif
public:
future<> init_server() {
return init_server(RING_DELAY);
return init_server(get_ring_delay().count());
}
future<> init_server(int delay);