mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge "Repair based node operation" from Asias
" Here is a simple introduction to the node operations scylla supports and some of the issues. - Replace operation It is used to replace a dead node. The token ring does not change. It pulls data from only one of the replicas which might not be the latest copy. - Rebuild operation It is used to get all the data this node owns form other nodes. It pulls data from only one of the replicas which might not be the latest copy. - Bootstrap operation It is used to add a new node into the cluster. The token ring changes. Do no suffer from the "not the latest replica” issue. New node pulls data from existing nodes that are losing the token range. Suffer from failed streaming. We split the ranges in 10 groups and we stream one group at a time. Restream the group if failed, causing unnecessary data transmission on wire. Bootstrap is not resumable. Failure after 99.99% of data is streamed. If we restart the node again, we need to stream all the data again even if the node already has 99.99% of the data. - Decommission operation It is used to remove a live node form the cluster. Token ring changes. Do not suffer “not the latest replica” issue. The leaving node pushes data to existing nodes. It suffers from resumable issue like bootstrap operation. - Removenode operation It is used to remove a dead node out of the cluster. Existing nodes pulls data from other existing nodes for the new ranges it own. It pulls from one of the replicas which might not be the latest copy. To solve all the issues above. We could use repair based node operation. The idea behind repair based node operations is simple: use repair to sync data between replicas instead of streaming. The benefits: - Latest copy is guaranteed - Resumable in nature - No extra data is streamed on wire E.g., rebuild twice, will not stream the same data twice - Unified code path for all the node operations - Free repair operation during bootstrap, replace operation and so on. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test " * 'repair_for_node_ops' of https://github.com/asias/scylla: docs: Add doc for repair_based_node_ops storage_service: Enable node repair based ops for bootstrap storage_service: Enable node repair based ops for decommission storage_service: Enable node repair based ops for replace storage_service: Enable node repair based ops for removenode storage_service: Enable node repair based ops for rebuild storage_service: Use the same tokens as previous bootstrap storage_service: Add is_repair_based_node_ops_enabled helper config: Add enable_repair_based_node_ops repair: Add replace_with_repair repair: Add rebuild_with_repair repair: Add do_rebuild_replace_with_repair repair: Add removenode_with_repair repair: Add decommission_with_repair repair: Add do_decommission_removenode_with_repair repair: Add bootstrap_with_repair repair: Introduce sync_data_using_repair repair: Propagate exception in tracker::run
This commit is contained in:
@@ -681,6 +681,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
|
||||
@@ -270,6 +270,7 @@ public:
|
||||
named_value<sstring> replace_address;
|
||||
named_value<sstring> replace_address_first_boot;
|
||||
named_value<bool> override_decommission;
|
||||
named_value<bool> enable_repair_based_node_ops;
|
||||
named_value<uint32_t> ring_delay_ms;
|
||||
named_value<uint32_t> shadow_round_ms;
|
||||
named_value<uint32_t> fd_max_interval_ms;
|
||||
|
||||
114
docs/repair_based_node_ops.md
Normal file
114
docs/repair_based_node_ops.md
Normal file
@@ -0,0 +1,114 @@
|
||||
# Repair based node operations
|
||||
|
||||
Here below is a simple introduction to the node operations Scylla support and
|
||||
some of the issues that streaming based node operations have.
|
||||
|
||||
- Replace operation
|
||||
|
||||
It is used to replace a dead node. The token ring does not change. Replacing
|
||||
node pulls data from only one of the replicas which might not be the latest
|
||||
copy.
|
||||
|
||||
- Rebuild operation
|
||||
|
||||
It is used to get all the data this node owns form other existing nodes. It
|
||||
pulls data from only one of the replicas which might not be the latest copy.
|
||||
|
||||
- Removenode operation
|
||||
|
||||
It is used to remove a dead node out of the cluster. Existing nodes pull
|
||||
data from other existing nodes for the new ranges it owns. It pulls from one
|
||||
of the replicas which might not be the latest copy.
|
||||
|
||||
- Bootstrap operation
|
||||
|
||||
It is used to add a new node into the cluster. The token ring changes. It
|
||||
does not suffer from the "latest replica” issue. New node pulls data
|
||||
from existing nodes that are losing the token ranges.
|
||||
|
||||
It suffers from failed streaming. We split the ranges in 10 groups and we
|
||||
stream one group at a time. Restream the group if failed, causing
|
||||
unnecessary data transmission on wire.
|
||||
|
||||
Bootstrap is not resumable. With failure after 99.99% of data is streamed,
|
||||
if we restart the node again, we need to stream all the data again even if
|
||||
the node already has 99.99% of the data.
|
||||
|
||||
- Decommission operation
|
||||
|
||||
It is used to remove a live node form the cluster. Token ring changes. It
|
||||
does not suffer from the “latest replica” issue. The leaving node pushes data
|
||||
to existing nodes.
|
||||
|
||||
It suffers from resumable issue like bootstrap operation.
|
||||
|
||||
To solve all the issues above. We use repair based node operations. The idea
|
||||
behind repair based node operations is simple: use repair to synchronize data
|
||||
between replicas instead of using streaming.
|
||||
|
||||
The benefits:
|
||||
|
||||
- Latest copy is guaranteed
|
||||
|
||||
- Resumable in nature
|
||||
|
||||
- No extra data is streamed on wire
|
||||
E.g., rebuild twice, will not stream the same data twice
|
||||
|
||||
- Unified code path for all the node operations
|
||||
|
||||
- Free repair operation during bootstrap, replace operation and so on.
|
||||
|
||||
Select nodes to synchronize with:
|
||||
|
||||
- Replace operation
|
||||
Synchronize with all replica nodes from the local DC
|
||||
|
||||
- Rebuild operation
|
||||
Synchronize with all replica nodes from the local DC or from the DC specified by the user
|
||||
|
||||
- Removenode operation
|
||||
Synchronize with all replica nodes from local DC
|
||||
|
||||
- Bootstrap operation
|
||||
1) If local_dc_replica_nodes = RF, synchronize with 1 node that is losing the
|
||||
range in the local DC
|
||||
|
||||
2) If 0 < local_dc_replica_nodes < RF, synchronize with
|
||||
up to RF/2 + 1 nodes in local DC
|
||||
|
||||
3) If local_dc_replica_nodes = 0, reject the bootstrap operation
|
||||
|
||||
- Decommission operation
|
||||
1) Synchronize with one node that is gaining the range in local DC
|
||||
|
||||
For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in the cluster, n3 is
|
||||
removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2, n4}.
|
||||
|
||||
The decommission node will repair all the ranges the leaving node is
|
||||
responsible for. Choose the decommission node n3 to run repair to synchronize
|
||||
with the new owner node n4 in the local DC.
|
||||
|
||||
2) Synchronize with one node when no node is gaining the range in local DC
|
||||
|
||||
For example, with RF = 3 and 3 nodes n1, n2, n3 in the cluster, n3 is
|
||||
removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2}.
|
||||
|
||||
The decommission node will repair all the ranges the leaving node is
|
||||
responsible for. The cluster is losing data on n3, it has to synchronize with
|
||||
at least one of {n1, n2}, otherwise it might lose the only new replica on n3.
|
||||
Choose the decommission node n3 to run repair to synchronize with one of the
|
||||
replica nodes, e.g., n1, in the local DC.
|
||||
|
||||
Note, it is enough to synchronize with one node instead of quorum number of
|
||||
nodes. For example, with RF = 3, timestamp(B) > timestamp(A),
|
||||
|
||||
n1 n2 n3 n4
|
||||
1) A B B (Before n3 is decommissioned)
|
||||
2) A B (After n3 is decommissioned)
|
||||
3) A B B (After n4 is bootstrapped)
|
||||
|
||||
Suppose we decommission n3 and n3 decides to synchronize with only n2. We end
|
||||
up with n1 has A, n2 has B. Then we add a new node n4 to the cluster, since
|
||||
no node is losing the range, so n4 will sync quorum number of nodes,
|
||||
that is n1 and n2. As a result, n4 will have B instead of A.
|
||||
436
repair/repair.cc
436
repair/repair.cc
@@ -35,6 +35,7 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "database.hh"
|
||||
#include "hashers.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
@@ -333,6 +334,7 @@ future<> tracker::run(int id, std::function<future<> ()> func) {
|
||||
}).handle_exception([this, id] (std::exception_ptr ep) {
|
||||
rlogger.info("repair id {} failed: {}", id, ep);
|
||||
done(id, false);
|
||||
return make_exception_future(std::move(ep));
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -773,6 +775,12 @@ void repair_info::check_in_abort() {
|
||||
}
|
||||
}
|
||||
|
||||
repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) {
|
||||
return neighbors.empty() ?
|
||||
repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts)) :
|
||||
neighbors[range];
|
||||
}
|
||||
|
||||
// Repair a single cf in a single local range.
|
||||
// Comparable to RepairJob in Origin.
|
||||
static future<> repair_cf_range(repair_info& ri,
|
||||
@@ -983,9 +991,22 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
// Comparable to RepairSession in Origin
|
||||
static future<> repair_range(repair_info& ri, const dht::token_range& range) {
|
||||
auto id = utils::UUID_gen::get_time_UUID();
|
||||
return do_with(get_neighbors(ri.db.local(), ri.keyspace, range, ri.data_centers, ri.hosts), [&ri, range, id] (std::vector<gms::inet_address>& neighbors) {
|
||||
repair_neighbors neighbors = ri.get_repair_neighbors(range);
|
||||
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [&ri, range, id] (auto& neighbors, auto& mandatory_neighbors) {
|
||||
auto live_neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors |
|
||||
boost::adaptors::filtered([] (const gms::inet_address& node) { return gms::get_local_gossiper().is_alive(node); }));
|
||||
for (auto& node : mandatory_neighbors) {
|
||||
auto it = std::find(live_neighbors.begin(), live_neighbors.end(), node);
|
||||
if (it == live_neighbors.end()) {
|
||||
ri.nr_failed_ranges++;
|
||||
auto status = format("failed: mandatory neighbor={} is not alive", node);
|
||||
rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.cfs, range, neighbors, live_neighbors, status);
|
||||
ri.abort();
|
||||
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
|
||||
node, ri.keyspace, mandatory_neighbors)));
|
||||
}
|
||||
}
|
||||
if (live_neighbors.size() != neighbors.size()) {
|
||||
ri.nr_failed_ranges++;
|
||||
auto status = live_neighbors.empty() ? "skipped" : "partial";
|
||||
@@ -1495,3 +1516,416 @@ future<> repair_abort_all(seastar::sharded<database>& db) {
|
||||
repair_tracker().abort_all_repairs();
|
||||
});
|
||||
}
|
||||
|
||||
future<> sync_data_using_repair(seastar::sharded<database>& db,
|
||||
sstring keyspace,
|
||||
dht::token_range_vector ranges,
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors) {
|
||||
if (ranges.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
||||
int id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
|
||||
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ri = make_lw_shared<repair_info>(service::get_local_storage_service().db(),
|
||||
std::move(keyspace), std::move(ranges), std::move(cfs),
|
||||
id, std::move(data_centers), std::move(hosts));
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
}
|
||||
return when_all(repair_results.begin(), repair_results.end()).then([id, keyspace] (std::vector<future<>> results) mutable {
|
||||
std::vector<sstring> errors;
|
||||
for (unsigned shard = 0; shard < results.size(); shard++) {
|
||||
auto& f = results[shard];
|
||||
if (f.failed()) {
|
||||
auto ep = f.get_exception();
|
||||
errors.push_back(format("shard {}: {}", shard, ep));
|
||||
}
|
||||
}
|
||||
if (!errors.empty()) {
|
||||
return make_exception_future<>(std::runtime_error(format("{}", errors)));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([id, keyspace] {
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=succeeded", id, keyspace);
|
||||
}).handle_exception([id, keyspace] (std::exception_ptr ep) {
|
||||
rlogger.info("repair id {} to sync data for keyspace={}, status=failed: {}", id, keyspace, ep);
|
||||
return make_exception_future<>(ep);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, std::unordered_set<dht::token> bootstrap_tokens) {
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([&db, tm = std::move(tm), tokens = std::move(bootstrap_tokens)] () mutable {
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces);
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
if (!db.local().has_keyspace(keyspace_name)) {
|
||||
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
|
||||
continue;
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tm, tokens, myip);
|
||||
|
||||
//Active ranges
|
||||
auto metadata_clone = tm.clone_only_token_map();
|
||||
auto range_addresses = strat.get_range_addresses(metadata_clone);
|
||||
|
||||
//Pending ranges
|
||||
metadata_clone.update_normal_tokens(tokens, myip);
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone);
|
||||
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
|
||||
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size());
|
||||
for (auto& desired_range : desired_ranges) {
|
||||
for (auto& x : range_addresses) {
|
||||
const range<dht::token>& src_range = x.first;
|
||||
seastar::thread::maybe_yield();
|
||||
if (src_range.contains(desired_range, dht::tri_compare)) {
|
||||
std::vector<inet_address> old_endpoints(x.second.begin(), x.second.end());
|
||||
auto it = pending_range_addresses.find(desired_range);
|
||||
if (it == pending_range_addresses.end()) {
|
||||
throw std::runtime_error(format("Can not find desired_range = {} in pending_range_addresses", desired_range));
|
||||
}
|
||||
|
||||
std::unordered_set<inet_address> new_endpoints(it->second.begin(), it->second.end());
|
||||
rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, old_endpoints={}, new_endpoints={}",
|
||||
keyspace_name, desired_range, old_endpoints, new_endpoints);
|
||||
// Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
|
||||
// So we need to be careful to only be strict when endpoints == RF
|
||||
// That is we can only have RF >= old_endpoints.size().
|
||||
// 1) If RF > old_endpoints.size(), it means no node is
|
||||
// going to lose the ownership of the range, so there
|
||||
// is no need to choose a mandatory neighbor to sync
|
||||
// data from.
|
||||
// 2) If RF = old_endpoints.size(), it means one node is
|
||||
// going to lose the ownership of the range, we need to
|
||||
// choose it as the mandatory neighbor to sync data
|
||||
// from.
|
||||
std::vector<gms::inet_address> mandatory_neighbors;
|
||||
// All neighbors
|
||||
std::vector<inet_address> neighbors;
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_dc = get_local_dc();
|
||||
auto get_node_losing_the_ranges = [&] (const std::vector<gms::inet_address>& old_nodes, const std::unordered_set<gms::inet_address>& new_nodes) {
|
||||
// Remove the new nodes from the old nodes list, so
|
||||
// that it contains only the node that will lose
|
||||
// the ownership of the range.
|
||||
auto nodes = boost::copy_range<std::vector<gms::inet_address>>(old_nodes |
|
||||
boost::adaptors::filtered([&] (const gms::inet_address& node) { return !new_nodes.count(node); }));
|
||||
if (nodes.size() != 1) {
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, expected 1 node losing range but found more nodes={}",
|
||||
keyspace_name, desired_range, nodes));
|
||||
}
|
||||
return nodes;
|
||||
};
|
||||
auto get_rf_in_local_dc = [&] () {
|
||||
size_t rf_in_local_dc = strat.get_replication_factor();
|
||||
if (strat.get_type() == locator::replication_strategy_type::network_topology) {
|
||||
auto nts = dynamic_cast<locator::network_topology_strategy*>(&strat);
|
||||
if (!nts) {
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, failed to cast to network_topology_strategy",
|
||||
keyspace_name, desired_range));
|
||||
}
|
||||
rf_in_local_dc = nts->get_replication_factor(local_dc);
|
||||
}
|
||||
return rf_in_local_dc;
|
||||
};
|
||||
auto get_old_endpoints_in_local_dc = [&] () {
|
||||
return boost::copy_range<std::vector<gms::inet_address>>(old_endpoints |
|
||||
boost::adaptors::filtered([&] (const gms::inet_address& node) {
|
||||
return snitch_ptr->get_datacenter(node) == local_dc;
|
||||
})
|
||||
);
|
||||
};
|
||||
auto old_endpoints_in_local_dc = get_old_endpoints_in_local_dc();
|
||||
auto rf_in_local_dc = get_rf_in_local_dc();
|
||||
if (old_endpoints.size() == strat.get_replication_factor()) {
|
||||
// For example, with RF = 3 and 3 nodes n1, n2, n3
|
||||
// in the cluster, n4 is bootstrapped, old_replicas
|
||||
// = {n1, n2, n3}, new_replicas = {n1, n2, n4}, n3
|
||||
// is losing the range. Choose the bootstrapping
|
||||
// node n4 to run repair to sync with the node
|
||||
// losing the range n3
|
||||
mandatory_neighbors = get_node_losing_the_ranges(old_endpoints, new_endpoints);
|
||||
neighbors = mandatory_neighbors;
|
||||
} else if (old_endpoints.size() < strat.get_replication_factor()) {
|
||||
if (old_endpoints_in_local_dc.size() == rf_in_local_dc) {
|
||||
// Local DC has enough replica nodes.
|
||||
mandatory_neighbors = get_node_losing_the_ranges(old_endpoints_in_local_dc, new_endpoints);
|
||||
neighbors = mandatory_neighbors;
|
||||
} else if (old_endpoints_in_local_dc.size() == 0) {
|
||||
// Local DC has zero replica node.
|
||||
// Reject the operation
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, no existing node in local dc",
|
||||
keyspace_name, desired_range));
|
||||
} else if (old_endpoints_in_local_dc.size() < rf_in_local_dc) {
|
||||
// Local DC does not have enough replica nodes.
|
||||
// For example, with RF = 3, 2 nodes n1, n2 in the
|
||||
// cluster, n3 is bootstrapped, old_replicas={n1, n2},
|
||||
// new_replicas={n1, n2, n3}. No node is losing
|
||||
// ranges. The bootstrapping node n3 has to sync
|
||||
// with n1 and n2, otherwise n3 might get the old
|
||||
// replica and make the total old replica be 2,
|
||||
// which makes the quorum read incorrect.
|
||||
// Choose the bootstrapping node n3 to reun repair
|
||||
// to sync with n1 and n2.
|
||||
size_t nr_nodes_to_sync_with = std::min<size_t>(rf_in_local_dc / 2 + 1, old_endpoints_in_local_dc.size());
|
||||
neighbors = old_endpoints_in_local_dc;
|
||||
neighbors.resize(nr_nodes_to_sync_with);
|
||||
} else {
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints_in_local_dc={}, rf_in_local_dc={}",
|
||||
keyspace_name, desired_range, old_endpoints_in_local_dc.size(), rf_in_local_dc));
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints={}, rf={}",
|
||||
keyspace_name, desired_range, old_endpoints, strat.get_replication_factor()));
|
||||
}
|
||||
rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, neighbors={}, mandatory_neighbors={}",
|
||||
keyspace_name, desired_range, neighbors, mandatory_neighbors);
|
||||
range_sources[desired_range] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors));
|
||||
}
|
||||
}
|
||||
}
|
||||
auto nr_ranges = desired_ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get();
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
|
||||
});
|
||||
}
|
||||
|
||||
future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, gms::inet_address leaving_node) {
|
||||
using inet_address = gms::inet_address;
|
||||
return seastar::async([&db, tm = std::move(tm), leaving_node = std::move(leaving_node)] () mutable {
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
bool is_removenode = myip != leaving_node;
|
||||
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
|
||||
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
if (!db.local().has_keyspace(keyspace_name)) {
|
||||
rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name);
|
||||
continue;
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
// First get all ranges the leaving node is responsible for
|
||||
dht::token_range_vector ranges = strat.get_ranges(leaving_node);
|
||||
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size());
|
||||
size_t nr_ranges_total = ranges.size();
|
||||
size_t nr_ranges_skipped = 0;
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>> current_replica_endpoints;
|
||||
// Find (for each range) all nodes that store replicas for these ranges as well
|
||||
for (auto& r : ranges) {
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = strat.calculate_natural_endpoints(end_token, tm);
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
}
|
||||
auto temp = tm.clone_after_all_left();
|
||||
// leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode
|
||||
// command was used), it is still present in temp and must be removed.
|
||||
if (temp.is_member(leaving_node)) {
|
||||
temp.remove_endpoint(leaving_node);
|
||||
}
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
dht::token_range_vector ranges_for_removenode;
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_dc = get_local_dc();
|
||||
for (auto&r : ranges) {
|
||||
seastar::thread::maybe_yield();
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
|
||||
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
|
||||
std::unordered_set<inet_address> neighbors_set(new_eps.begin(), new_eps.end());
|
||||
bool skip_this_range = false;
|
||||
auto new_owner = neighbors_set;
|
||||
for (const auto& node : current_eps) {
|
||||
new_owner.erase(node);
|
||||
}
|
||||
if (new_eps.size() == 0) {
|
||||
throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, zero replica after the removal",
|
||||
op, keyspace_name, r, current_eps, new_eps));
|
||||
}
|
||||
if (new_owner.size() == 1) {
|
||||
// For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in
|
||||
// the cluster, n3 is removed, old_replicas = {n1, n2, n3},
|
||||
// new_replicas = {n1, n2, n4}.
|
||||
if (is_removenode) {
|
||||
// For removenode operation, use new owner node to
|
||||
// repair the data in case there is new owner node
|
||||
// available. Nodes that are not the new owner of a
|
||||
// range will ignore the range. There can be at most
|
||||
// one new owner for each of the ranges. Note: The new
|
||||
// owner is the node that does not own the range before
|
||||
// but owns the range after the remove operation. It
|
||||
// does not have to be the primary replica of the
|
||||
// range.
|
||||
// Choose the new owner node n4 to run repair to sync
|
||||
// with all replicas.
|
||||
skip_this_range = *new_owner.begin() != myip;
|
||||
neighbors_set.insert(current_eps.begin(), current_eps.end());
|
||||
} else {
|
||||
// For decommission operation, the decommission node
|
||||
// will repair all the ranges the leaving node is
|
||||
// responsible for.
|
||||
// Choose the decommission node n3 to run repair to
|
||||
// sync with the new owner node n4.
|
||||
for (auto& node : new_owner) {
|
||||
if (snitch_ptr->get_datacenter(node) == local_dc) {
|
||||
neighbors_set = std::unordered_set<inet_address>{node};
|
||||
break;
|
||||
}
|
||||
throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, local_dc));
|
||||
}
|
||||
}
|
||||
} else if (new_owner.size() == 0) {
|
||||
// For example, with RF = 3 and 3 nodes n1, n2, n3 in the
|
||||
// cluster, n3 is removed, old_replicas = {n1, n2, n3},
|
||||
// new_replicas = {n1, n2}.
|
||||
if (is_removenode) {
|
||||
// For removenode operation, use the primary replica
|
||||
// node to repair the data in case there is no new
|
||||
// owner node available.
|
||||
// Choose the primary replica node n1 to run repair to
|
||||
// sync with all replicas.
|
||||
skip_this_range = new_eps.front() != myip;
|
||||
neighbors_set.insert(current_eps.begin(), current_eps.end());
|
||||
} else {
|
||||
// For decommission operation, the decommission node
|
||||
// will repair all the ranges the leaving node is
|
||||
// responsible for. We are losing data on n3, we have
|
||||
// to sync with at least one of {n1, n2}, otherwise we
|
||||
// might lose the only new replica on n3.
|
||||
// Choose the decommission node n3 to run repair to
|
||||
// sync with one of the replica nodes, e.g., n1, in the
|
||||
// local DC.
|
||||
for (auto& node : new_eps) {
|
||||
if (snitch_ptr->get_datacenter(node) == local_dc) {
|
||||
neighbors_set = std::unordered_set<inet_address>{node};
|
||||
break;
|
||||
}
|
||||
throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, local_dc));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, new_owner));
|
||||
}
|
||||
neighbors_set.erase(myip);
|
||||
neighbors_set.erase(leaving_node);
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors_set |
|
||||
boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) {
|
||||
return snitch_ptr->get_datacenter(node) == local_dc;
|
||||
})
|
||||
);
|
||||
|
||||
if (skip_this_range) {
|
||||
nr_ranges_skipped++;
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
} else {
|
||||
rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}",
|
||||
op, keyspace_name, r, current_eps, new_eps, neighbors);
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors));
|
||||
if (is_removenode) {
|
||||
ranges_for_removenode.push_back(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (is_removenode) {
|
||||
ranges.swap(ranges_for_removenode);
|
||||
}
|
||||
auto nr_ranges_synced = ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
||||
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
|
||||
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
|
||||
});
|
||||
}
|
||||
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
|
||||
return do_decommission_removenode_with_repair(db, std::move(tm), utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, gms::inet_address leaving_node) {
|
||||
return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node));
|
||||
}
|
||||
|
||||
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc) {
|
||||
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable {
|
||||
auto keyspaces = db.local().get_non_system_keyspaces();
|
||||
rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
auto myip = utils::fb_utilities::get_broadcast_address();
|
||||
for (auto& keyspace_name : keyspaces) {
|
||||
if (!db.local().has_keyspace(keyspace_name)) {
|
||||
rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name);
|
||||
continue;
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myip);
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, ranges.size());
|
||||
for (auto it = ranges.begin(); it != ranges.end();) {
|
||||
auto& r = *it;
|
||||
seastar::thread::maybe_yield();
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, tm) |
|
||||
boost::adaptors::filtered([myip, &source_dc, &snitch_ptr] (const gms::inet_address& node) {
|
||||
if (node == myip) {
|
||||
return false;
|
||||
}
|
||||
return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc;
|
||||
})
|
||||
);
|
||||
rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors);
|
||||
if (!neighbors.empty()) {
|
||||
range_sources[r] = repair_neighbors(std::move(neighbors));
|
||||
++it;
|
||||
} else {
|
||||
// Skip the range with zero neighbors
|
||||
it = ranges.erase(it);
|
||||
}
|
||||
}
|
||||
auto nr_ranges = ranges.size();
|
||||
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
|
||||
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
|
||||
});
|
||||
}
|
||||
|
||||
future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring source_dc) {
|
||||
auto op = sstring("rebuild_with_repair");
|
||||
if (source_dc.empty()) {
|
||||
source_dc = get_local_dc();
|
||||
}
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
||||
}
|
||||
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
|
||||
auto op = sstring("replace_with_repair");
|
||||
auto source_dc = get_local_dc();
|
||||
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
|
||||
}
|
||||
|
||||
@@ -46,6 +46,13 @@ public:
|
||||
repair_stopped_exception() : repair_exception("Repair stopped") { }
|
||||
};
|
||||
|
||||
// The tokens are the tokens assigned to the bootstrap node.
|
||||
future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(seastar::sharded<database>& db, locator::token_metadata tm);
|
||||
future<> removenode_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, gms::inet_address leaving_node);
|
||||
future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring source_dc);
|
||||
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm);
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
// operation.
|
||||
// repair_start() starts the requested repair on this node. It returns an
|
||||
@@ -149,6 +156,20 @@ public:
|
||||
sstring get_stats();
|
||||
};
|
||||
|
||||
class repair_neighbors {
|
||||
public:
|
||||
std::vector<gms::inet_address> all;
|
||||
std::vector<gms::inet_address> mandatory;
|
||||
repair_neighbors() = default;
|
||||
explicit repair_neighbors(std::vector<gms::inet_address> a)
|
||||
: all(std::move(a)) {
|
||||
}
|
||||
repair_neighbors(std::vector<gms::inet_address> a, std::vector<gms::inet_address> m)
|
||||
: all(std::move(a))
|
||||
, mandatory(std::move(m)) {
|
||||
}
|
||||
};
|
||||
|
||||
class repair_info {
|
||||
public:
|
||||
seastar::sharded<database>& db;
|
||||
@@ -160,6 +181,7 @@ public:
|
||||
shard_id shard;
|
||||
std::vector<sstring> data_centers;
|
||||
std::vector<sstring> hosts;
|
||||
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
|
||||
size_t nr_failed_ranges = 0;
|
||||
bool aborted = false;
|
||||
// Map of peer -> <cf, ranges>
|
||||
@@ -198,6 +220,7 @@ public:
|
||||
const std::vector<gms::inet_address>& neighbors_out);
|
||||
void abort();
|
||||
void check_in_abort();
|
||||
repair_neighbors get_repair_neighbors(const dht::token_range& range);
|
||||
void update_statistics(const repair_stats& stats) {
|
||||
_stats.add(stats);
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@
|
||||
#include "database.hh"
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include "cdc/generation.hh"
|
||||
#include "repair/repair.hh"
|
||||
|
||||
using token = dht::token;
|
||||
using UUID = utils::UUID;
|
||||
@@ -524,7 +525,8 @@ void storage_service::join_token_ring(int delay) {
|
||||
slogger.info("This node will not auto bootstrap because it is configured to be a seed node.");
|
||||
}
|
||||
if (should_bootstrap()) {
|
||||
if (db::system_keyspace::bootstrap_in_progress()) {
|
||||
bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress();
|
||||
if (resume_bootstrap) {
|
||||
slogger.warn("Detected previous bootstrap failure; retrying");
|
||||
} else {
|
||||
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS).get();
|
||||
@@ -577,7 +579,18 @@ void storage_service::join_token_ring(int delay) {
|
||||
throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)");
|
||||
}
|
||||
set_mode(mode::JOINING, "getting bootstrap token", true);
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local());
|
||||
if (resume_bootstrap) {
|
||||
_bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0();
|
||||
if (!_bootstrap_tokens.empty()) {
|
||||
slogger.info("Using previously saved tokens = {}", _bootstrap_tokens);
|
||||
} else {
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local());
|
||||
slogger.info("Using newly generated tokens = {}", _bootstrap_tokens);
|
||||
}
|
||||
} else {
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local());
|
||||
slogger.info("Using newly generated tokens = {}", _bootstrap_tokens);
|
||||
}
|
||||
} else {
|
||||
auto replace_addr = db().local().get_replace_address();
|
||||
if (replace_addr && *replace_addr != get_broadcast_address()) {
|
||||
@@ -940,9 +953,17 @@ void storage_service::bootstrap() {
|
||||
_gossiper.check_seen_seeds();
|
||||
|
||||
set_mode(mode::JOINING, "Starting to bootstrap...", true);
|
||||
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
|
||||
// Does the actual streaming of newly replicated token ranges.
|
||||
bs.bootstrap().get();
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
if (db().local().is_replacing()) {
|
||||
replace_with_repair(_db, _token_metadata).get();
|
||||
} else {
|
||||
bootstrap_with_repair(_db, _token_metadata, _bootstrap_tokens).get();
|
||||
}
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
|
||||
// Does the actual streaming of newly replicated token ranges.
|
||||
bs.bootstrap().get();
|
||||
}
|
||||
slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens);
|
||||
}
|
||||
|
||||
@@ -2579,24 +2600,28 @@ future<> storage_service::drain() {
|
||||
future<> storage_service::rebuild(sstring source_dc) {
|
||||
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) {
|
||||
slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss._abort_source,
|
||||
ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild);
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
|
||||
if (source_dc != "") {
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
|
||||
}
|
||||
auto keyspaces = make_lw_shared<std::vector<sstring>>(ss._db.local().get_non_system_keyspaces());
|
||||
return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) {
|
||||
return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
|
||||
}).then([streamer] {
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("Streaming for rebuild successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
|
||||
slogger.warn("Error while rebuilding node: {}", std::current_exception());
|
||||
return make_exception_future<>(std::move(ep));
|
||||
if (ss.is_repair_based_node_ops_enabled()) {
|
||||
return rebuild_with_repair(ss._db, ss._token_metadata, std::move(source_dc));
|
||||
} else {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._token_metadata, ss._abort_source,
|
||||
ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild);
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
|
||||
if (source_dc != "") {
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
|
||||
}
|
||||
auto keyspaces = make_lw_shared<std::vector<sstring>>(ss._db.local().get_non_system_keyspaces());
|
||||
return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) {
|
||||
return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
|
||||
}).then([streamer] {
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("Streaming for rebuild successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
|
||||
slogger.warn("Error while rebuilding node: {}", std::current_exception());
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2684,44 +2709,54 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void storage_service::unbootstrap() {
|
||||
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
|
||||
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
|
||||
if (slogger.is_enabled(logging::log_level::debug)) {
|
||||
std::vector<range<token>> ranges;
|
||||
for (auto& x : ranges_mm) {
|
||||
ranges.push_back(x.first);
|
||||
}
|
||||
slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges);
|
||||
}
|
||||
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
|
||||
}
|
||||
|
||||
set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true);
|
||||
|
||||
auto stream_success = stream_ranges(ranges_to_stream);
|
||||
// Wait for batch log to complete before streaming hints.
|
||||
slogger.debug("waiting for batch log processing.");
|
||||
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
|
||||
db::get_local_batchlog_manager().do_batch_log_replay().get();
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
decommission_with_repair(_db, _token_metadata).get();
|
||||
} else {
|
||||
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
|
||||
|
||||
set_mode(mode::LEAVING, "streaming hints to other nodes", true);
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
|
||||
if (slogger.is_enabled(logging::log_level::debug)) {
|
||||
std::vector<range<token>> ranges;
|
||||
for (auto& x : ranges_mm) {
|
||||
ranges.push_back(x.first);
|
||||
}
|
||||
slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges);
|
||||
}
|
||||
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
|
||||
}
|
||||
|
||||
// wait for the transfer runnables to signal the latch.
|
||||
slogger.debug("waiting for stream acks.");
|
||||
try {
|
||||
stream_success.get();
|
||||
} catch (...) {
|
||||
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
|
||||
throw;
|
||||
set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true);
|
||||
|
||||
auto stream_success = stream_ranges(ranges_to_stream);
|
||||
// Wait for batch log to complete before streaming hints.
|
||||
slogger.debug("waiting for batch log processing.");
|
||||
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
|
||||
db::get_local_batchlog_manager().do_batch_log_replay().get();
|
||||
|
||||
set_mode(mode::LEAVING, "streaming hints to other nodes", true);
|
||||
|
||||
// wait for the transfer runnables to signal the latch.
|
||||
slogger.debug("waiting for stream acks.");
|
||||
try {
|
||||
stream_success.get();
|
||||
} catch (...) {
|
||||
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
slogger.debug("stream acks all received.");
|
||||
}
|
||||
slogger.debug("stream acks all received.");
|
||||
leave_ring();
|
||||
}
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
return removenode_with_repair(_db, _token_metadata, endpoint).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
}
|
||||
return seastar::async([this, endpoint, notify_endpoint] {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), _abort_source, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
auto my_address = get_broadcast_address();
|
||||
@@ -3492,5 +3527,9 @@ future<bool> storage_service::is_cleanup_allowed(sstring keyspace) {
|
||||
});
|
||||
}
|
||||
|
||||
bool storage_service::is_repair_based_node_ops_enabled() {
|
||||
return _db.local().get_config().enable_repair_based_node_ops();
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -923,6 +923,7 @@ private:
|
||||
void notify_cql_change(inet_address endpoint, bool ready);
|
||||
public:
|
||||
future<bool> is_cleanup_allowed(sstring keyspace);
|
||||
bool is_repair_based_node_ops_enabled();
|
||||
};
|
||||
|
||||
future<> init_storage_service(sharded<abort_source>& abort_sources, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
|
||||
|
||||
Reference in New Issue
Block a user