boot_strapper: Enable range_streamer code in bootstrap

Add code to actually stream data from other nodes during bootstrap.

I tested with the following:

   1) stat a node 1

   2) insert data into node 1

   3) start node 2

I can see from the logger that data is streamed correctly from node 1
to node 2.
This commit is contained in:
Asias He
2015-10-12 16:15:54 +08:00
parent 0d1e5c3961
commit cf9d9e2ced
3 changed files with 28 additions and 30 deletions

View File

@@ -38,40 +38,36 @@
#include "dht/boot_strapper.hh"
#include "service/storage_service.hh"
#include "dht/range_streamer.hh"
#include "gms/failure_detector.hh"
#include "log.hh"
logging::logger logger("boot_strapper");
namespace dht {
future<> boot_strapper::bootstrap() {
// FIXME: Stream data from other nodes
service::get_local_storage_service().finish_bootstrapping();
return make_ready_future<>();
#if 0
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process");
logger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
streamer.addRanges(keyspaceName, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
range_streamer streamer(_db, _token_metadata, _tokens, _address, "Bootstrap");
streamer.add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
std::vector<range<token>> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
streamer.add_ranges(keyspace_name, ranges);
}
try
{
streamer.fetchAsync().get();
StorageService.instance.finishBootstrapping();
}
catch (InterruptedException e)
{
throw new RuntimeException("Interrupted while waiting on boostrap to complete. Bootstrap will have to be restarted.");
}
catch (ExecutionException e)
{
throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause());
}
#endif
return streamer.fetch_async().then_wrapped([this] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
throw std::runtime_error(sprint("Error during boostrap: %s", std::current_exception()));
}
service::get_local_storage_service().finish_bootstrapping();
return make_ready_future<>();
});
}
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata metadata, database& db) {

View File

@@ -49,14 +49,16 @@ class boot_strapper {
using inet_address = gms::inet_address;
using token_metadata = locator::token_metadata;
using token = dht::token;
distributed<database>& _db;
/* endpoint that needs to be bootstrapped */
inet_address _address;
/* token of the node being bootstrapped. */
std::unordered_set<token> _tokens;
token_metadata _token_metadata;
public:
boot_strapper(inet_address addr, std::unordered_set<token> tokens, token_metadata tmd)
: _address(addr)
boot_strapper(distributed<database>& db, inet_address addr, std::unordered_set<token> tokens, token_metadata tmd)
: _db(db)
, _address(addr)
, _tokens(tokens)
, _token_metadata(tmd) {
}

View File

@@ -389,7 +389,7 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
throw std::runtime_error("Unable to contact any seeds!");
}
set_mode(mode::JOINING, "Starting to bootstrap...", true);
dht::boot_strapper bs(get_broadcast_address(), tokens, _token_metadata);
dht::boot_strapper bs(_db, get_broadcast_address(), tokens, _token_metadata);
bs.bootstrap().get(); // handles token update
logger.info("Bootstrap completed! for the tokens {}", tokens);
}