Merge "Enable more code in storage_service" from Asias

"The most visiable change to user is, now it takes less time to boot up the
non-seed node."
This commit is contained in:
Avi Kivity
2015-08-06 11:07:00 +03:00
7 changed files with 270 additions and 235 deletions

View File

@@ -8,8 +8,8 @@
#include "gms/gossiper.hh"
#include "service/storage_service.hh"
future<> init_storage_service() {
return service::init_storage_service().then([] {
future<> init_storage_service(distributed<database>& db) {
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
}

View File

@@ -5,7 +5,9 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <db/config.hh>
#include <seastar/core/distributed.hh>
#include "db/config.hh"
#include "database.hh"
future<> init_storage_service();
future<> init_storage_service(distributed<database>& db);
future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider);

View File

@@ -114,8 +114,8 @@ int main(int ac, char** av) {
using namespace locator;
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).then([] {
engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
}).then([] {
return init_storage_service();
}).then([&db] {
return init_storage_service(db);
}).then([&db, cfg] {
return db.start(std::move(*cfg)).then([&db] {
engine().at_exit([&db] { return db.stop(); });

View File

@@ -11,6 +11,13 @@
#include "log.hh"
#include "service/migration_manager.hh"
#include "to_string.hh"
#include "gms/gossiper.hh"
#include <seastar/core/thread.hh>
#include <sstream>
using token = dht::token;
using UUID = utils::UUID;
using inet_address = gms::inet_address;
namespace service {
@@ -20,94 +27,133 @@ int storage_service::RING_DELAY = storage_service::get_ring_delay();
distributed<storage_service> _the_storage_service;
bool is_replacing() {
// FIXME: DatabaseDescriptor.isReplacing()
return false;
}
bool is_auto_bootstrap() {
// FIXME: DatabaseDescriptor.isAutoBootstrap()
return true;
}
std::set<inet_address> get_seeds() {
// FIXME: DatabaseDescriptor.getSeeds()
auto& gossiper = gms::get_local_gossiper();
return gossiper.get_seeds();
}
std::set<inet_address> get_replace_tokens() {
// FIXME: DatabaseDescriptor.getReplaceTokens()
return {};
}
std::experimental::optional<UUID> get_replace_node() {
// FIXME: DatabaseDescriptor.getReplaceNode()
return {};
}
std::experimental::optional<inet_address> get_replace_address() {
// FIXME: DatabaseDescriptor.getReplaceAddress()
return {};
}
bool get_property_join_ring() {
// FIXME: Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
return true;
}
bool get_property_rangemovement() {
// FIXME: Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")
return true;
}
bool get_property_load_ring_state() {
// FIXME: Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))
return true;
}
bool storage_service::should_bootstrap() {
// FIXME: Currently, we do boostrap if we are not a seed node.
// return DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
auto& gossiper = gms::get_local_gossiper();
auto seeds = gossiper.get_seeds();
return !seeds.count(get_broadcast_address());
return is_auto_bootstrap() && !get_seeds().count(get_broadcast_address());
}
future<> storage_service::prepare_to_join() {
if (!_joined) {
#if 0
if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
if (DatabaseDescriptor.isReplacing())
{
if (SystemKeyspace.bootstrapComplete())
throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
if (!DatabaseDescriptor.isAutoBootstrap())
throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
_bootstrap_tokens = prepareReplacementInfo();
appStates.put(ApplicationState.TOKENS, valueFactory.tokens(_bootstrap_tokens));
appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
}
else if (should_bootstrap())
{
checkForEndpointCollision();
}
#endif
auto f = make_ready_future<>();
if (should_bootstrap()) {
f = check_for_endpoint_collision();
}
if (_joined) {
return make_ready_future<>();
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([] {
return db::system_keyspace::get_local_host_id();
}).then([this] (auto local_host_id) {
std::map<gms::application_state, gms::versioned_value> app_states;
std::map<gms::application_state, gms::versioned_value> app_states;
auto f = make_ready_future<>();
if (is_replacing() && !get_property_join_ring()) {
throw std::runtime_error("Cannot set both join_ring=false and attempt to replace a node");
}
if (get_replace_tokens().size() > 0 || get_replace_node()) {
throw std::runtime_error("Replace method removed; use cassandra.replace_address instead");
}
if (is_replacing()) {
// if (SystemKeyspace.bootstrapComplete())
// throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
if (!is_auto_bootstrap()) {
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
}
_bootstrap_tokens = prepare_replacement_info();
app_states.emplace(gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens));
app_states.emplace(gms::application_state::STATUS, value_factory.hibernate(true));
} else if (should_bootstrap()) {
f = check_for_endpoint_collision();
}
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
return f.then([app_states = std::move(app_states)] {
return db::system_keyspace::get_local_host_id();
}).then([this, app_states = std::move(app_states)] (auto local_host_id) mutable {
_token_metadata.update_host_id(local_host_id, this->get_broadcast_address());
// FIXME: DatabaseDescriptor.getBroadcastRpcAddress()
auto broadcast_rpc_address = this->get_broadcast_address();
app_states.emplace(gms::application_state::NET_VERSION, value_factory.network_version());
app_states.emplace(gms::application_state::HOST_ID, value_factory.host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, value_factory.rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
logger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start(generation_number, app_states).then([this] {
auto& gossiper = gms::get_local_gossiper();
gossiper.register_(this);
using namespace std::chrono;
auto now = high_resolution_clock::now().time_since_epoch();
int generation_number = duration_cast<seconds>(now).count();
// FIXME: SystemKeyspace.incrementAndGetGeneration()
print("Start gossiper service ...\n");
return gossiper.start(generation_number, app_states).then([this] {
#if SS_DEBUG
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
auto& proxy = service::get_local_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}
return make_ready_future<>();
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
auto& proxy = service::get_local_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
#endif
});
}
future<> storage_service::join_token_ring(int delay) {
auto f = make_ready_future<>();
return seastar::async([this, delay] {
_joined = true;
#if 0
// We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
@@ -125,64 +171,42 @@ future<> storage_service::join_token_ring(int delay) {
SystemKeyspace.bootstrapInProgress(),
SystemKeyspace.bootstrapComplete(),
DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()));
if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
#endif
if (is_auto_bootstrap() && /* !SystemKeyspace.bootstrapComplete() && */ get_seeds().count(get_broadcast_address())) {
logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
}
if (should_bootstrap()) {
auto elapsed = make_shared<int>(0);
auto stop_cond = [elapsed, delay] {
// FIXME
// if we see schema, we can proceed to the next check directly
// if (!Schema.instance.getVersion().equals(Schema.emptyVersion)) {
// return true;
// }
if (*elapsed < delay) {
return false;
}
return true;
};
f = do_until(stop_cond, [elapsed] {
auto t = 1000;
return sleep(std::chrono::milliseconds(t)).then([elapsed, t] {
*elapsed += t;
});
}).then([this] {
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata);
return bootstrap(_bootstrap_tokens);
});
#if 0
if (SystemKeyspace.bootstrapInProgress())
logger.warn("Detected previous bootstrap failure; retrying");
else
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
setMode(Mode.JOINING, "waiting for ring information", true);
#endif
set_mode(mode::JOINING, "waiting for ring information", true);
// first sleep the delay to make sure we see all our peers
for (int i = 0; i < delay; i += 1000)
{
for (int i = 0; i < delay; i += 1000) {
// if we see schema, we can proceed to the next check directly
if (!Schema.instance.getVersion().equals(Schema.emptyVersion))
{
logger.debug("got schema: {}", Schema.instance.getVersion());
if (_db.local().get_version() != database::empty_version) {
logger.debug("got schema: {}", _db.local().get_version());
break;
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
sleep(std::chrono::seconds(1)).get();
}
#if 0
// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
while (!MigrationManager.isReadyForBootstrap())
{
setMode(Mode.JOINING, "waiting for schema information to complete", true);
set_mode(mode::JOINING, "waiting for schema information to complete", true);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
setMode(Mode.JOINING, "waiting for pending range calculation", true);
PendingRangeCalculatorService.instance.blockUntilFinished();
setMode(Mode.JOINING, "calculation complete, ready to bootstrap", true);
if (logger.isDebugEnabled())
logger.debug("... got ring + schema info");
#endif
set_mode(mode::JOINING, "schema complete, ready to bootstrap", true);
set_mode(mode::JOINING, "waiting for pending range calculation", true);
//PendingRangeCalculatorService.instance.blockUntilFinished();
set_mode(mode::JOINING, "calculation complete, ready to bootstrap", true);
logger.debug("... got ring + schema info");
#if 0
if (Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")) &&
(
_token_metadata.getBootstrapTokens().valueSet().size() > 0 ||
@@ -190,71 +214,52 @@ future<> storage_service::join_token_ring(int delay) {
_token_metadata.getMovingEndpoints().size() > 0
))
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
#endif
if (!DatabaseDescriptor.isReplacing())
{
if (_token_metadata.isMember(FBUtilities.getBroadcastAddress()))
{
String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
throw new UnsupportedOperationException(s);
if (!is_replacing()) {
if (_token_metadata.is_member(get_broadcast_address())) {
throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)");
}
setMode(Mode.JOINING, "getting bootstrap token", true);
_bootstrap_tokens = BootStrapper.getBootstrapTokens(_token_metadata);
}
else
{
if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
{
try
{
// Sleep additionally to make sure that the server actually is not alive
// and giving it more time to gossip if alive.
Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
set_mode(mode::JOINING, "getting bootstrap token", true);
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata);
} else {
auto replace_addr = get_replace_address();
if (replace_addr && *replace_addr != get_broadcast_address()) {
// Sleep additionally to make sure that the server actually is not alive
// and giving it more time to gossip if alive.
// FIXME: LoadBroadcaster.BROADCAST_INTERVAL
std::chrono::milliseconds broadcast_interval(60 * 1000);
sleep(broadcast_interval).get();
// check for operator errors...
for (Token token : _bootstrap_tokens)
{
InetAddress existing = _token_metadata.getEndpoint(token);
if (existing != null)
{
for (auto token : _bootstrap_tokens) {
auto existing = _token_metadata.get_endpoint(token);
if (existing) {
#if 0
long nanoDelay = delay * 1000000L;
if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.nanoTime() - nanoDelay))
throw new UnsupportedOperationException("Cannot replace a live node... ");
current.add(existing);
}
else
{
throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!");
}
}
}
else
{
try
{
Thread.sleep(RING_DELAY);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
setMode(Mode.JOINING, "Replacing a node with token(s): " + _bootstrap_tokens, true);
}
bootstrap(_bootstrap_tokens);
assert !_is_bootstrap_mode; // bootstrap will block until finished
#endif
} else {
throw std::runtime_error(sprint("Cannot replace token %s which does not exist!", token));
}
}
} else {
sleep(std::chrono::milliseconds(RING_DELAY)).get();
}
std::stringstream ss;
ss << _bootstrap_tokens;
set_mode(mode::JOINING, sprint("Replacing a node with token(s): %s", ss.str()), true);
}
bootstrap(_bootstrap_tokens).get();
// FIXME: _is_bootstrap_mode is set to fasle in BootStrapper::bootstrap
// assert(!_is_bootstrap_mode); // bootstrap will block until finished
} else {
// FIXME: DatabaseDescriptor.getNumTokens()
size_t num_tokens = 3;
_bootstrap_tokens = boot_strapper::get_random_tokens(_token_metadata, num_tokens);
logger.info("Generated random tokens. tokens are {}", _bootstrap_tokens);
#if 0
_bootstrap_tokens = SystemKeyspace.getSavedTokens();
if (_bootstrap_tokens.isEmpty())
@@ -285,9 +290,7 @@ future<> storage_service::join_token_ring(int delay) {
}
#endif
}
return f.then([this] {
return set_tokens(_bootstrap_tokens);
set_tokens(_bootstrap_tokens).get();
#if 0
// if we don't have system_traces keyspace at this point, then create it manually
if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null)
@@ -352,18 +355,18 @@ future<> storage_service::bootstrap(std::unordered_set<token> tokens) {
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens));
sleep_time = std::chrono::milliseconds(RING_DELAY);
// setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true);
} else {
// Dont set any state for the node which is bootstrapping the existing token...
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
// SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
}
return sleep(sleep_time).then([tokens = std::move(tokens)] {
return sleep(sleep_time).then([this, tokens = std::move(tokens)] {
auto& gossiper = gms::get_local_gossiper();
if (!gossiper.seen_any_seed()) {
throw std::runtime_error("Unable to contact any seeds!");
}
// setMode(Mode.JOINING, "Starting to bootstrap...", true);
this->set_mode(mode::JOINING, "Starting to bootstrap...", true);
// new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, _token_metadata).bootstrap(); // handles token update
logger.info("Bootstrap completed! for the tokens {}", tokens);
return make_ready_future<>();
@@ -812,7 +815,7 @@ future<> storage_service::set_tokens(std::unordered_set<token> tokens) {
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens));
//setMode(Mode.NORMAL, false);
set_mode(mode::NORMAL, false);
replicate_to_all_cores();
});
}
@@ -1026,5 +1029,65 @@ void storage_service::remove_endpoint(inet_address endpoint) {
});
}
std::unordered_set<token> storage_service::prepare_replacement_info() {
return {};
#if 0
logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
// make magic happen
Gossiper.instance.doShadowRound();
UUID hostId = null;
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
try
{
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
#endif
}
static const std::map<storage_service::mode, sstring> mode_names = {
{storage_service::mode::STARTING, "STARTING"},
{storage_service::mode::NORMAL, "NORMAL"},
{storage_service::mode::JOINING, "JOINING"},
{storage_service::mode::LEAVING, "LEAVING"},
{storage_service::mode::DECOMMISSIONED, "DECOMMISSIONED"},
{storage_service::mode::MOVING, "MOVING"},
{storage_service::mode::DRAINING, "DRAINING"},
{storage_service::mode::DRAINED, "DRAINED"},
};
std::ostream& operator<<(std::ostream& os, const storage_service::mode& m) {
os << mode_names.at(m);
return os;
}
void storage_service::set_mode(mode m, bool log) {
set_mode(m, "", log);
}
void storage_service::set_mode(mode m, sstring msg, bool log) {
_operation_mode = m;
if (log) {
logger.info("{}: {}", m, msg);
} else {
logger.debug("{}: {}", m, msg);
}
}
} // namespace service

View File

@@ -34,6 +34,8 @@
#include "db/system_keyspace.hh"
#include "core/semaphore.hh"
#include "utils/fb_utilities.hh"
#include "database.hh"
#include <seastar/core/distributed.hh>
namespace service {
@@ -57,7 +59,11 @@ class storage_service : public gms::i_endpoint_state_change_subscriber
/* JMX notification serial number counter */
private final AtomicLong notificationSerialNumber = new AtomicLong();
#endif
distributed<database>& _db;
public:
storage_service(distributed<database>& db)
: _db(db) {
}
static int RING_DELAY; // delay after which we assume ring has stablized
// Needed by distributed<>
@@ -139,13 +145,15 @@ private:
bool _joined = false;
public:
enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED };
private:
mode _operation_mode = mode::STARTING;
friend std::ostream& operator<<(std::ostream& os, const mode& mode);
#if 0
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
private double traceProbability = 0.0;
private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
private Mode operationMode = Mode.STARTING;
/* Used for tracking drain progress */
private volatile int totalCFs, remainingCFs;
@@ -325,37 +333,9 @@ public:
throw new IllegalStateException("No configured daemon");
daemon.deactivate();
}
public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
{
logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
// make magic happen
Gossiper.instance.doShadowRound();
UUID hostId = null;
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
try
{
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(get_application_state_value(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
#endif
public:
std::unordered_set<token> prepare_replacement_info();
future<> check_for_endpoint_collision();
#if 0
@@ -455,23 +435,12 @@ public:
{
DatabaseDescriptor.setIncrementalBackupsEnabled(value);
}
private void setMode(Mode m, boolean log)
{
setMode(m, null, log);
}
private void setMode(Mode m, String msg, boolean log)
{
operationMode = m;
String logMsg = msg == null ? m.toString() : String.format("%s: %s", m, msg);
if (log)
logger.info(logMsg);
else
logger.debug(logMsg);
}
#endif
private:
void set_mode(mode m, bool log);
void set_mode(mode m, sstring msg, bool log);
public:
future<> bootstrap(std::unordered_set<token> tokens);
bool is_bootstrap_mode() {
@@ -3310,8 +3279,8 @@ inline future<std::map<dht::token, gms::inet_address>> get_token_to_endpoint() {
});
}
inline future<> init_storage_service() {
return service::get_storage_service().start().then([] {
inline future<> init_storage_service(distributed<database>& db) {
return service::get_storage_service().start(std::ref(db)).then([] {
print("Start Storage service ...\n");
});
}

View File

@@ -192,11 +192,11 @@ public:
}
};
future<> init_once() {
future<> init_once(distributed<database>& db) {
static bool done = false;
if (!done) {
done = true;
return init_storage_service().then([] {
return init_storage_service(db).then([] {
return init_ms_fd_gossiper("127.0.0.1", db::config::seed_provider_type());
});
} else {
@@ -206,9 +206,9 @@ future<> init_once() {
future<::shared_ptr<cql_test_env>> make_env_for_test() {
return locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([] {
return init_once().then([] {
return seastar::async([] {
auto db = ::make_shared<distributed<database>>();
auto db = ::make_shared<distributed<database>>();
return init_once(*db).then([db] {
return seastar::async([db] {
auto cfg = make_lw_shared<db::config>();
cfg->data_file_directories() = {"."};
db->start(std::move(*cfg)).get();

View File

@@ -11,15 +11,16 @@
namespace bpo = boost::program_options;
int main(int ac, char ** av) {
distributed<database> db;
app_template app;
app.add_options()
("seed", bpo::value<std::vector<std::string>>(), "IP address of seed node")
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen");
return app.run(ac, av, [&app] {
return app.run(ac, av, [&db, &app] {
auto config = app.configuration();
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
service::init_storage_service().then([listen, config] {
service::init_storage_service(db).then([listen, config] {
return net::get_messaging_service().start(listen);
}).then([config] {
auto& server = net::get_local_messaging_service();