storage_service: Remove the stream_hints
Our hinted handoff implementation will not use the db::system_keyspace::HINTS system table to store hints. No need to stream them. Acked-by: Vlad Zolotarov <vladz@scylladb.com> Message-Id: <3b9190e250b54321ceb87767f4722c7458d41797.1506391500.git.asias@scylladb.com>
This commit is contained in:
@@ -2528,13 +2528,10 @@ void storage_service::unbootstrap() {
|
||||
|
||||
set_mode(mode::LEAVING, "streaming hints to other nodes", true);
|
||||
|
||||
auto hints_success = stream_hints();
|
||||
|
||||
// wait for the transfer runnables to signal the latch.
|
||||
slogger.debug("waiting for stream acks.");
|
||||
try {
|
||||
stream_success.get();
|
||||
hints_success.get();
|
||||
} catch (...) {
|
||||
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
|
||||
throw;
|
||||
@@ -2681,52 +2678,6 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::stream_hints() {
|
||||
// FIXME: flush hits column family
|
||||
#if 0
|
||||
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
|
||||
ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
|
||||
FBUtilities.waitOnFuture(hintsCF.forceFlush());
|
||||
#endif
|
||||
|
||||
// gather all live nodes in the cluster that aren't also leaving
|
||||
auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints();
|
||||
auto beg = candidates.begin();
|
||||
auto end = candidates.end();
|
||||
auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) {
|
||||
return ep == br || !gms::get_local_failure_detector().is_alive(ep);
|
||||
};
|
||||
candidates.erase(std::remove_if(beg, end, remove_fn), end);
|
||||
|
||||
if (candidates.empty()) {
|
||||
slogger.warn("Unable to stream hints since no live endpoints seen");
|
||||
throw std::runtime_error("Unable to stream hints since no live endpoints seen");
|
||||
} else {
|
||||
// stream to the closest peer as chosen by the snitch
|
||||
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
|
||||
snitch->sort_by_proximity(get_broadcast_address(), candidates);
|
||||
auto hints_destination_host = candidates.front();
|
||||
|
||||
// stream all hints -- range list will be a singleton of "the entire ring"
|
||||
dht::token_range_vector ranges = {dht::token_range::make_open_ended_both_sides()};
|
||||
slogger.debug("stream_hints: ranges={}", ranges);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
ranges_per_endpoint[hints_destination_host] = std::move(ranges);
|
||||
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Hints");
|
||||
auto keyspace = db::system_keyspace::NAME;
|
||||
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
|
||||
streamer->add_tx_ranges(keyspace, std::move(ranges_per_endpoint), column_families);
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("stream_hints successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
slogger.warn("stream_hints failed: {}", ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::start_leaving() {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens().get0())).then([this] {
|
||||
|
||||
@@ -1747,7 +1747,6 @@ private:
|
||||
future<> start_leaving();
|
||||
void leave_ring();
|
||||
void unbootstrap();
|
||||
future<> stream_hints();
|
||||
|
||||
public:
|
||||
future<> move(sstring new_token) {
|
||||
|
||||
Reference in New Issue
Block a user