From cf9d9e2ceddd280b99d6934eb7babe08538bf9af Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 12 Oct 2015 16:15:54 +0800 Subject: [PATCH] boot_strapper: Enable range_streamer code in bootstrap Add code to actually stream data from other nodes during bootstrap. I tested with the following: 1) stat a node 1 2) insert data into node 1 3) start node 2 I can see from the logger that data is streamed correctly from node 1 to node 2. --- dht/boot_strapper.cc | 50 ++++++++++++++++++-------------------- dht/boot_strapper.hh | 6 +++-- service/storage_service.cc | 2 +- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index 16dd99670a..6eba8c9a7d 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -38,40 +38,36 @@ #include "dht/boot_strapper.hh" #include "service/storage_service.hh" +#include "dht/range_streamer.hh" +#include "gms/failure_detector.hh" +#include "log.hh" + +logging::logger logger("boot_strapper"); namespace dht { future<> boot_strapper::bootstrap() { - // FIXME: Stream data from other nodes - service::get_local_storage_service().finish_bootstrapping(); - return make_ready_future<>(); -#if 0 - if (logger.isDebugEnabled()) - logger.debug("Beginning bootstrap process"); + logger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens()); - RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap"); - streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); - - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - { - AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - streamer.addRanges(keyspaceName, strategy.getPendingAddressRanges(tokenMetadata, tokens, address)); + range_streamer streamer(_db, _token_metadata, _tokens, _address, "Bootstrap"); + streamer.add_source_filter(std::make_unique(gms::get_local_failure_detector())); + for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) { + auto& ks = _db.local().find_keyspace(keyspace_name); + auto& strategy = ks.get_replication_strategy(); + std::vector> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address); + logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges); + streamer.add_ranges(keyspace_name, ranges); } - try - { - streamer.fetchAsync().get(); - StorageService.instance.finishBootstrapping(); - } - catch (InterruptedException e) - { - throw new RuntimeException("Interrupted while waiting on boostrap to complete. Bootstrap will have to be restarted."); - } - catch (ExecutionException e) - { - throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause()); - } -#endif + return streamer.fetch_async().then_wrapped([this] (auto&& f) { + try { + auto state = f.get0(); + } catch (...) { + throw std::runtime_error(sprint("Error during boostrap: %s", std::current_exception())); + } + service::get_local_storage_service().finish_bootstrapping(); + return make_ready_future<>(); + }); } std::unordered_set boot_strapper::get_bootstrap_tokens(token_metadata metadata, database& db) { diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index 79abdd1a26..e3ee79d9aa 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -49,14 +49,16 @@ class boot_strapper { using inet_address = gms::inet_address; using token_metadata = locator::token_metadata; using token = dht::token; + distributed& _db; /* endpoint that needs to be bootstrapped */ inet_address _address; /* token of the node being bootstrapped. */ std::unordered_set _tokens; token_metadata _token_metadata; public: - boot_strapper(inet_address addr, std::unordered_set tokens, token_metadata tmd) - : _address(addr) + boot_strapper(distributed& db, inet_address addr, std::unordered_set tokens, token_metadata tmd) + : _db(db) + , _address(addr) , _tokens(tokens) , _token_metadata(tmd) { } diff --git a/service/storage_service.cc b/service/storage_service.cc index 8bdc04daca..072014e6a8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -389,7 +389,7 @@ void storage_service::bootstrap(std::unordered_set tokens) { throw std::runtime_error("Unable to contact any seeds!"); } set_mode(mode::JOINING, "Starting to bootstrap...", true); - dht::boot_strapper bs(get_broadcast_address(), tokens, _token_metadata); + dht::boot_strapper bs(_db, get_broadcast_address(), tokens, _token_metadata); bs.bootstrap().get(); // handles token update logger.info("Bootstrap completed! for the tokens {}", tokens); }