From 1038e375afcaaba97861a72a8ac31e49ac1b5507 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 19 Dec 2019 17:07:19 +0800 Subject: [PATCH 01/18] repair: Propagate exception in tracker::run In sync_data_with_repair, we depends on return future of tracker::run to tell if the repair is successful or not. --- repair/repair.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/repair/repair.cc b/repair/repair.cc index a8fbd48889..c3c24aa0d2 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -333,6 +333,7 @@ future<> tracker::run(int id, std::function ()> func) { }).handle_exception([this, id] (std::exception_ptr ep) { rlogger.info("repair id {} failed: {}", id, ep); done(id, false); + return make_exception_future(std::move(ep)); }); }); } From 198cad61790d04ce98132c28ccc23f5d378a1a60 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 02/18] repair: Introduce sync_data_using_repair It is used to sync data for node operations like bootstrap, decommission and so on. Unlike plain repair operation, the user of sync_data_with_repair() can pass repair_neighbors object to specify the pre-calculated neighbors for a range. If a mandatory neighbor is not available, the repair will fail so that the upper layer can fail the node operation. --- repair/repair.cc | 70 +++++++++++++++++++++++++++++++++++++++++++++++- repair/repair.hh | 16 +++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/repair/repair.cc b/repair/repair.cc index c3c24aa0d2..037be59e85 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -774,6 +774,12 @@ void repair_info::check_in_abort() { } } +repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) { + return neighbors.empty() ? + repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts)) : + neighbors[range]; +} + // Repair a single cf in a single local range. // Comparable to RepairJob in Origin. static future<> repair_cf_range(repair_info& ri, @@ -984,9 +990,22 @@ static future<> repair_cf_range(repair_info& ri, // Comparable to RepairSession in Origin static future<> repair_range(repair_info& ri, const dht::token_range& range) { auto id = utils::UUID_gen::get_time_UUID(); - return do_with(get_neighbors(ri.db.local(), ri.keyspace, range, ri.data_centers, ri.hosts), [&ri, range, id] (std::vector& neighbors) { + repair_neighbors neighbors = ri.get_repair_neighbors(range); + return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [&ri, range, id] (auto& neighbors, auto& mandatory_neighbors) { auto live_neighbors = boost::copy_range>(neighbors | boost::adaptors::filtered([] (const gms::inet_address& node) { return gms::get_local_gossiper().is_alive(node); })); + for (auto& node : mandatory_neighbors) { + auto it = std::find(live_neighbors.begin(), live_neighbors.end(), node); + if (it == live_neighbors.end()) { + ri.nr_failed_ranges++; + auto status = format("failed: mandatory neighbor={} is not alive", node); + rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", + ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.cfs, range, neighbors, live_neighbors, status); + ri.abort(); + return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}", + node, ri.keyspace, mandatory_neighbors))); + } + } if (live_neighbors.size() != neighbors.size()) { ri.nr_failed_ranges++; auto status = live_neighbors.empty() ? "skipped" : "partial"; @@ -1496,3 +1515,52 @@ future<> repair_abort_all(seastar::sharded& db) { repair_tracker().abort_all_repairs(); }); } + +future<> sync_data_using_repair(seastar::sharded& db, + sstring keyspace, + dht::token_range_vector ranges, + std::unordered_map neighbors) { + if (ranges.empty()) { + return make_ready_future<>(); + } + return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable { + int id = repair_tracker().next_repair_command(); + rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace); + return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable { + auto cfs = list_column_families(db.local(), keyspace); + std::vector> repair_results; + repair_results.reserve(smp::count); + for (auto shard : boost::irange(unsigned(0), smp::count)) { + auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable { + auto data_centers = std::vector(); + auto hosts = std::vector(); + auto ri = make_lw_shared(service::get_local_storage_service().db(), + std::move(keyspace), std::move(ranges), std::move(cfs), + id, std::move(data_centers), std::move(hosts)); + ri->neighbors = std::move(neighbors); + return repair_ranges(ri); + }); + repair_results.push_back(std::move(f)); + } + return when_all(repair_results.begin(), repair_results.end()).then([id, keyspace] (std::vector> results) mutable { + std::vector errors; + for (unsigned shard = 0; shard < results.size(); shard++) { + auto& f = results[shard]; + if (f.failed()) { + auto ep = f.get_exception(); + errors.push_back(format("shard {}: {}", shard, ep)); + } + } + if (!errors.empty()) { + return make_exception_future<>(std::runtime_error(format("{}", errors))); + } + return make_ready_future<>(); + }); + }).then([id, keyspace] { + rlogger.info("repair id {} to sync data for keyspace={}, status=succeeded", id, keyspace); + }).handle_exception([id, keyspace] (std::exception_ptr ep) { + rlogger.info("repair id {} to sync data for keyspace={}, status=failed: {}", id, keyspace, ep); + return make_exception_future<>(ep); + }); + }); +} diff --git a/repair/repair.hh b/repair/repair.hh index 12f0a3075f..cfd9a59db1 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -149,6 +149,20 @@ public: sstring get_stats(); }; +class repair_neighbors { +public: + std::vector all; + std::vector mandatory; + repair_neighbors() = default; + explicit repair_neighbors(std::vector a) + : all(std::move(a)) { + } + repair_neighbors(std::vector a, std::vector m) + : all(std::move(a)) + , mandatory(std::move(m)) { + } +}; + class repair_info { public: seastar::sharded& db; @@ -160,6 +174,7 @@ public: shard_id shard; std::vector data_centers; std::vector hosts; + std::unordered_map neighbors; size_t nr_failed_ranges = 0; bool aborted = false; // Map of peer -> @@ -198,6 +213,7 @@ public: const std::vector& neighbors_out); void abort(); void check_in_abort(); + repair_neighbors get_repair_neighbors(const dht::token_range& range); void update_statistics(const repair_stats& stats) { _stats.add(stats); } From 9c67389cc8d77c6eb60234c96cc5ffbaae2771a6 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 03/18] repair: Add bootstrap_with_repair It is used to bootstrap a node using repair instead of using stream_plan. --- repair/repair.cc | 146 +++++++++++++++++++++++++++++++++++++++++++++++ repair/repair.hh | 3 + 2 files changed, 149 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index 037be59e85..054ccfe2d2 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -35,6 +35,7 @@ #include "sstables/sstables.hh" #include "database.hh" #include "hashers.hh" +#include "locator/network_topology_strategy.hh" #include #include @@ -1564,3 +1565,148 @@ future<> sync_data_using_repair(seastar::sharded& db, }); }); } + +future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens) { + using inet_address = gms::inet_address; + return seastar::async([&db, tm = std::move(tm), tokens = std::move(bootstrap_tokens)] () mutable { + auto keyspaces = db.local().get_non_system_keyspaces(); + rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces); + auto myip = utils::fb_utilities::get_broadcast_address(); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tm, tokens, myip); + + //Active ranges + auto metadata_clone = tm.clone_only_token_map(); + auto range_addresses = strat.get_range_addresses(metadata_clone); + + //Pending ranges + metadata_clone.update_normal_tokens(tokens, myip); + auto pending_range_addresses = strat.get_range_addresses(metadata_clone); + + //Collects the source that will have its range moved to the new node + std::unordered_map range_sources; + + rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size()); + for (auto& desired_range : desired_ranges) { + for (auto& x : range_addresses) { + const range& src_range = x.first; + seastar::thread::maybe_yield(); + if (src_range.contains(desired_range, dht::tri_compare)) { + std::vector old_endpoints(x.second.begin(), x.second.end()); + auto it = pending_range_addresses.find(desired_range); + if (it == pending_range_addresses.end()) { + throw std::runtime_error(format("Can not find desired_range = {} in pending_range_addresses", desired_range)); + } + + std::unordered_set new_endpoints(it->second.begin(), it->second.end()); + rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, old_endpoints={}, new_endpoints={}", + keyspace_name, desired_range, old_endpoints, new_endpoints); + // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + // So we need to be careful to only be strict when endpoints == RF + // That is we can only have RF >= old_endpoints.size(). + // 1) If RF > old_endpoints.size(), it means no node is + // going to lose the ownership of the range, so there + // is no need to choose a mandatory neighbor to sync + // data from. + // 2) If RF = old_endpoints.size(), it means one node is + // going to lose the ownership of the range, we need to + // choose it as the mandatory neighbor to sync data + // from. + std::vector mandatory_neighbors; + // All neighbors + std::vector neighbors; + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto local_dc = get_local_dc(); + auto get_node_losing_the_ranges = [&] (const std::vector& old_nodes, const std::unordered_set& new_nodes) { + // Remove the new nodes from the old nodes list, so + // that it contains only the node that will lose + // the ownership of the range. + auto nodes = boost::copy_range>(old_nodes | + boost::adaptors::filtered([&] (const gms::inet_address& node) { return !new_nodes.count(node); })); + if (nodes.size() != 1) { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, expected 1 node losing range but found more nodes={}", + keyspace_name, desired_range, nodes)); + } + return nodes; + }; + auto get_rf_in_local_dc = [&] () { + size_t rf_in_local_dc = strat.get_replication_factor(); + if (strat.get_type() == locator::replication_strategy_type::network_topology) { + auto nts = dynamic_cast(&strat); + if (!nts) { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, failed to cast to network_topology_strategy", + keyspace_name, desired_range)); + } + rf_in_local_dc = nts->get_replication_factor(local_dc); + } + return rf_in_local_dc; + }; + auto get_old_endpoints_in_local_dc = [&] () { + return boost::copy_range>(old_endpoints | + boost::adaptors::filtered([&] (const gms::inet_address& node) { + return snitch_ptr->get_datacenter(node) == local_dc; + }) + ); + }; + auto old_endpoints_in_local_dc = get_old_endpoints_in_local_dc(); + auto rf_in_local_dc = get_rf_in_local_dc(); + if (old_endpoints.size() == strat.get_replication_factor()) { + // For example, with RF = 3 and 3 nodes n1, n2, n3 + // in the cluster, n4 is bootstrapped, old_replicas + // = {n1, n2, n3}, new_replicas = {n1, n2, n4}, n3 + // is losing the range. Choose the bootstrapping + // node n4 to run repair to sync with the node + // losing the range n3 + mandatory_neighbors = get_node_losing_the_ranges(old_endpoints, new_endpoints); + neighbors = mandatory_neighbors; + } else if (old_endpoints.size() < strat.get_replication_factor()) { + if (old_endpoints_in_local_dc.size() == rf_in_local_dc) { + // Local DC has enough replica nodes. + mandatory_neighbors = get_node_losing_the_ranges(old_endpoints_in_local_dc, new_endpoints); + neighbors = mandatory_neighbors; + } else if (old_endpoints_in_local_dc.size() == 0) { + // Local DC has zero replica node. + // Reject the operation + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, no existing node in local dc", + keyspace_name, desired_range)); + } else if (old_endpoints_in_local_dc.size() < rf_in_local_dc) { + // Local DC does not have enough replica nodes. + // For example, with RF = 3, 2 nodes n1, n2 in the + // cluster, n3 is bootstrapped, old_replicas={n1, n2}, + // new_replicas={n1, n2, n3}. No node is losing + // ranges. The bootstrapping node n3 has to sync + // with n1 and n2, otherwise n3 might get the old + // replica and make the total old replica be 2, + // which makes the quorum read incorrect. + // Choose the bootstrapping node n3 to reun repair + // to sync with n1 and n2. + size_t nr_nodes_to_sync_with = std::min(rf_in_local_dc / 2 + 1, old_endpoints_in_local_dc.size()); + neighbors = old_endpoints_in_local_dc; + neighbors.resize(nr_nodes_to_sync_with); + } else { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints_in_local_dc={}, rf_in_local_dc={}", + keyspace_name, desired_range, old_endpoints_in_local_dc.size(), rf_in_local_dc)); + } + } else { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints={}, rf={}", + keyspace_name, desired_range, old_endpoints, strat.get_replication_factor())); + } + rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, neighbors={}, mandatory_neighbors={}", + keyspace_name, desired_range, neighbors, mandatory_neighbors); + range_sources[desired_range] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors)); + } + } + } + auto nr_ranges = desired_ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get(); + rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges); + } + rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces); + }); +} diff --git a/repair/repair.hh b/repair/repair.hh index cfd9a59db1..609637ab17 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -46,6 +46,9 @@ public: repair_stopped_exception() : repair_exception("Repair stopped") { } }; +// The tokens are the tokens assigned to the bootstrap node. +future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens); + // NOTE: repair_start() can be run on any node, but starts a node-global // operation. // repair_start() starts the requested repair on this node. It returns an From 569c126a849b9ce62c4ee739bec13cfff49c840f Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 04/18] repair: Add do_decommission_removenode_with_repair It will be used by decommission and removenode operation shortly. --- repair/repair.cc | 152 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index 054ccfe2d2..550c2bbd33 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1710,3 +1710,155 @@ future<> bootstrap_with_repair(seastar::sharded& db, locator::token_me rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces); }); } + +future<> do_decommission_removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node) { + using inet_address = gms::inet_address; + return seastar::async([&db, tm = std::move(tm), leaving_node = std::move(leaving_node)] () mutable { + auto myip = utils::fb_utilities::get_broadcast_address(); + auto keyspaces = db.local().get_non_system_keyspaces(); + bool is_removenode = myip != leaving_node; + auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair"; + rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + // First get all ranges the leaving node is responsible for + dht::token_range_vector ranges = strat.get_ranges(leaving_node); + rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size()); + size_t nr_ranges_total = ranges.size(); + size_t nr_ranges_skipped = 0; + std::unordered_map> current_replica_endpoints; + // Find (for each range) all nodes that store replicas for these ranges as well + for (auto& r : ranges) { + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + auto eps = strat.calculate_natural_endpoints(end_token, tm); + current_replica_endpoints.emplace(r, std::move(eps)); + } + auto temp = tm.clone_after_all_left(); + // leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode + // command was used), it is still present in temp and must be removed. + if (temp.is_member(leaving_node)) { + temp.remove_endpoint(leaving_node); + } + std::unordered_map range_sources; + dht::token_range_vector ranges_for_removenode; + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto local_dc = get_local_dc(); + for (auto&r : ranges) { + seastar::thread::maybe_yield(); + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + const std::vector new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp); + const std::vector& current_eps = current_replica_endpoints[r]; + std::unordered_set neighbors_set(new_eps.begin(), new_eps.end()); + bool skip_this_range = false; + auto new_owner = neighbors_set; + for (const auto& node : current_eps) { + new_owner.erase(node); + } + if (new_eps.size() == 0) { + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, zero replica after the removal", + op, keyspace_name, r, current_eps, new_eps)); + } + if (new_owner.size() == 1) { + // For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in + // the cluster, n3 is removed, old_replicas = {n1, n2, n3}, + // new_replicas = {n1, n2, n4}. + if (is_removenode) { + // For removenode operation, use new owner node to + // repair the data in case there is new owner node + // available. Nodes that are not the new owner of a + // range will ignore the range. There can be at most + // one new owner for each of the ranges. Note: The new + // owner is the node that does not own the range before + // but owns the range after the remove operation. It + // does not have to be the primary replica of the + // range. + // Choose the new owner node n4 to run repair to sync + // with all replicas. + skip_this_range = *new_owner.begin() != myip; + neighbors_set.insert(current_eps.begin(), current_eps.end()); + } else { + // For decommission operation, the decommission node + // will repair all the ranges the leaving node is + // responsible for. + // Choose the decommission node n3 to run repair to + // sync with the new owner node n4. + for (auto& node : new_owner) { + if (snitch_ptr->get_datacenter(node) == local_dc) { + neighbors_set = std::unordered_set{node}; + break; + } + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}", + op, keyspace_name, r, current_eps, new_eps, local_dc)); + } + } + } else if (new_owner.size() == 0) { + // For example, with RF = 3 and 3 nodes n1, n2, n3 in the + // cluster, n3 is removed, old_replicas = {n1, n2, n3}, + // new_replicas = {n1, n2}. + if (is_removenode) { + // For removenode operation, use the primary replica + // node to repair the data in case there is no new + // owner node available. + // Choose the primary replica node n1 to run repair to + // sync with all replicas. + skip_this_range = new_eps.front() != myip; + neighbors_set.insert(current_eps.begin(), current_eps.end()); + } else { + // For decommission operation, the decommission node + // will repair all the ranges the leaving node is + // responsible for. We are losing data on n3, we have + // to sync with at least one of {n1, n2}, otherwise we + // might lose the only new replica on n3. + // Choose the decommission node n3 to run repair to + // sync with one of the replica nodes, e.g., n1, in the + // local DC. + for (auto& node : new_eps) { + if (snitch_ptr->get_datacenter(node) == local_dc) { + neighbors_set = std::unordered_set{node}; + break; + } + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}", + op, keyspace_name, r, current_eps, new_eps, local_dc)); + } + } + } else { + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}", + op, keyspace_name, r, current_eps, new_eps, new_owner)); + } + neighbors_set.erase(myip); + neighbors_set.erase(leaving_node); + auto neighbors = boost::copy_range>(neighbors_set | + boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) { + return snitch_ptr->get_datacenter(node) == local_dc; + }) + ); + + if (skip_this_range) { + nr_ranges_skipped++; + rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped", + op, keyspace_name, r, current_eps, new_eps, neighbors); + } else { + rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}", + op, keyspace_name, r, current_eps, new_eps, neighbors); + range_sources[r] = repair_neighbors(std::move(neighbors)); + if (is_removenode) { + ranges_for_removenode.push_back(r); + } + } + } + if (is_removenode) { + ranges.swap(ranges_for_removenode); + } + auto nr_ranges_synced = ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get(); + rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}", + op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped); + } + rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node); + }); +} From e9a9fde1f7e410306adf2b09539f49741ecfce56 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 05/18] repair: Add decommission_with_repair It is used to decommission a node using repair instead of using stream_plan. --- repair/repair.cc | 4 ++++ repair/repair.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index 550c2bbd33..e842dedb95 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1862,3 +1862,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded& db, rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node); }); } + +future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm) { + return do_decommission_removenode_with_repair(db, std::move(tm), utils::fb_utilities::get_broadcast_address()); +} diff --git a/repair/repair.hh b/repair/repair.hh index 609637ab17..0a0df53a31 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -48,6 +48,7 @@ public: // The tokens are the tokens assigned to the bootstrap node. future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens); +future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm); // NOTE: repair_start() can be run on any node, but starts a node-global // operation. From b18e078ca2f0a3691c447c0fb687469b6c2c1b57 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 06/18] repair: Add removenode_with_repair It is used to remove a dead node from a cluster using repair instead of using stream_plan. --- repair/repair.cc | 4 ++++ repair/repair.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index e842dedb95..dafd716a0a 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1866,3 +1866,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded& db, future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm) { return do_decommission_removenode_with_repair(db, std::move(tm), utils::fb_utilities::get_broadcast_address()); } + +future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node) { + return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node)); +} diff --git a/repair/repair.hh b/repair/repair.hh index 0a0df53a31..1edba26140 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -49,6 +49,7 @@ public: // The tokens are the tokens assigned to the bootstrap node. future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm); +future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node); // NOTE: repair_start() can be run on any node, but starts a node-global // operation. From b488ab7d11c6586bcd4e48a4c81dde5d76c88326 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 07/18] repair: Add do_rebuild_replace_with_repair The rebuild and replace operations are similar because the token ring does not change for both of them. Add a common helper to do rebuild and replace with repair. It will be used by rebuild and replace operation shortly. --- repair/repair.cc | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index dafd716a0a..b75ee2b217 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1870,3 +1870,48 @@ future<> decommission_with_repair(seastar::sharded& db, locator::token future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node) { return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node)); } + +future<> do_rebuild_replace_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring op, sstring source_dc) { + return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable { + auto keyspaces = db.local().get_non_system_keyspaces(); + rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc); + auto myip = utils::fb_utilities::get_broadcast_address(); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + dht::token_range_vector ranges = strat.get_ranges(myip); + std::unordered_map range_sources; + rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, ranges.size()); + for (auto it = ranges.begin(); it != ranges.end();) { + auto& r = *it; + seastar::thread::maybe_yield(); + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, tm) | + boost::adaptors::filtered([myip, &source_dc, &snitch_ptr] (const gms::inet_address& node) { + if (node == myip) { + return false; + } + return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc; + }) + ); + rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors); + if (!neighbors.empty()) { + range_sources[r] = repair_neighbors(std::move(neighbors)); + ++it; + } else { + // Skip the range with zero neighbors + it = ranges.erase(it); + } + } + auto nr_ranges = ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get(); + rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges); + } + rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc); + }); +} From 960ce7ab5432981381502b10faa72c79d92cd7a3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 08/18] repair: Add rebuild_with_repair It is used to rebuild a node using repair instead of using stream_plan. --- repair/repair.cc | 8 ++++++++ repair/repair.hh | 1 + 2 files changed, 9 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index b75ee2b217..d02c364000 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1915,3 +1915,11 @@ future<> do_rebuild_replace_with_repair(seastar::sharded& db, locator: rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc); }); } + +future<> rebuild_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring source_dc) { + auto op = sstring("rebuild_with_repair"); + if (source_dc.empty()) { + source_dc = get_local_dc(); + } + return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc)); +} diff --git a/repair/repair.hh b/repair/repair.hh index 1edba26140..d496ced8d9 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -50,6 +50,7 @@ public: future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm); future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node); +future<> rebuild_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring source_dc); // NOTE: repair_start() can be run on any node, but starts a node-global // operation. From 1672f64adda0ff8c5c037aa609e857a75866c3d4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 19 Aug 2019 14:27:28 +0800 Subject: [PATCH 09/18] repair: Add replace_with_repair It is used to replace a dead node using repair instead of using stream_plan. --- repair/repair.cc | 6 ++++++ repair/repair.hh | 1 + 2 files changed, 7 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index d02c364000..f7d3f84705 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1923,3 +1923,9 @@ future<> rebuild_with_repair(seastar::sharded& db, locator::token_meta } return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc)); } + +future<> replace_with_repair(seastar::sharded& db, locator::token_metadata tm) { + auto op = sstring("replace_with_repair"); + auto source_dc = get_local_dc(); + return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc)); +} diff --git a/repair/repair.hh b/repair/repair.hh index d496ced8d9..17d06bee0a 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -51,6 +51,7 @@ future<> bootstrap_with_repair(seastar::sharded& db, locator::token_me future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm); future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node); future<> rebuild_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring source_dc); +future<> replace_with_repair(seastar::sharded& db, locator::token_metadata tm); // NOTE: repair_start() can be run on any node, but starts a node-global // operation. From cb4045e11d7bf7c531da7bd93a733e3715623869 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 10:57:04 +0800 Subject: [PATCH 10/18] config: Add enable_repair_based_node_ops An option to enable the repair based node operations. --- db/config.cc | 1 + db/config.hh | 1 + 2 files changed, 2 insertions(+) diff --git a/db/config.cc b/db/config.cc index 958abf46c3..f7425fa961 100644 --- a/db/config.cc +++ b/db/config.cc @@ -681,6 +681,7 @@ db::config::config(std::shared_ptr exts) , replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.") , replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.") , override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster") + , enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based") , ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.") , shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.") , fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.") diff --git a/db/config.hh b/db/config.hh index 451a779bfe..0a6ab4e819 100644 --- a/db/config.hh +++ b/db/config.hh @@ -270,6 +270,7 @@ public: named_value replace_address; named_value replace_address_first_boot; named_value override_decommission; + named_value enable_repair_based_node_ops; named_value ring_delay_ms; named_value shadow_round_ms; named_value fd_max_interval_ms; From a4c614914a99148d4fab5facf96bbe8ac5a25493 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 10:58:49 +0800 Subject: [PATCH 11/18] storage_service: Add is_repair_based_node_ops_enabled helper It is used to check if repair based node operations are enabled or not. --- service/storage_service.cc | 4 ++++ service/storage_service.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 25b5f1d065..28b357958d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3480,5 +3480,9 @@ future storage_service::is_cleanup_allowed(sstring keyspace) { }); } +bool storage_service::is_repair_based_node_ops_enabled() { + return _db.local().get_config().enable_repair_based_node_ops(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 9a2e32207f..47e018b626 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -909,6 +909,7 @@ private: void notify_cql_change(inet_address endpoint, bool ready); public: future is_cleanup_allowed(sstring keyspace); + bool is_repair_based_node_ops_enabled(); }; future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service, From 3b64b4bb1736537680de70f1d4a51a909246bde5 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 14:38:23 +0800 Subject: [PATCH 12/18] storage_service: Use the same tokens as previous bootstrap With repair based node operations, we can resume previous failed bootstrap. In order to do that, we need the bootstrap node uses the same tokens as previous bootstrap. Currently, we always use new tokens when we bootstrap, because we need to stream all the ranges anyway. It does not matter if we use the same tokens or not. --- service/storage_service.cc | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 28b357958d..123dfcc42f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -524,7 +524,8 @@ void storage_service::join_token_ring(int delay) { slogger.info("This node will not auto bootstrap because it is configured to be a seed node."); } if (should_bootstrap()) { - if (db::system_keyspace::bootstrap_in_progress()) { + bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress(); + if (resume_bootstrap) { slogger.warn("Detected previous bootstrap failure; retrying"); } else { db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS).get(); @@ -577,7 +578,18 @@ void storage_service::join_token_ring(int delay) { throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"); } set_mode(mode::JOINING, "getting bootstrap token", true); - _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + if (resume_bootstrap) { + _bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0(); + if (!_bootstrap_tokens.empty()) { + slogger.info("Using previously saved tokens = {}", _bootstrap_tokens); + } else { + _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + slogger.info("Using newly generated tokens = {}", _bootstrap_tokens); + } + } else { + _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + slogger.info("Using newly generated tokens = {}", _bootstrap_tokens); + } } else { auto replace_addr = db().local().get_replace_address(); if (replace_addr && *replace_addr != get_broadcast_address()) { From cf0601735e7ede024960a3f246c59959c57002e2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 11:00:24 +0800 Subject: [PATCH 13/18] storage_service: Enable node repair based ops for rebuild - Rebuild operation It is used to get all the data this node owns form other nodes. It pulls data from only one of the replicas which might not be the latest copy. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test --- service/storage_service.cc | 39 +++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 123dfcc42f..cc18a5e2c6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -80,6 +80,7 @@ #include "database.hh" #include #include "cdc/generation.hh" +#include "repair/repair.hh" using token = dht::token; using UUID = utils::UUID; @@ -2591,24 +2592,28 @@ future<> storage_service::drain() { future<> storage_service::rebuild(sstring source_dc) { return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) { slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, - ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); - streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); - if (source_dc != "") { - streamer->add_source_filter(std::make_unique(source_dc)); - } - auto keyspaces = make_lw_shared>(ss._db.local().get_non_system_keyspaces()); - return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) { - return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); - }).then([streamer] { - return streamer->stream_async().then([streamer] { - slogger.info("Streaming for rebuild successful"); - }).handle_exception([] (auto ep) { - // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - slogger.warn("Error while rebuilding node: {}", std::current_exception()); - return make_exception_future<>(std::move(ep)); + if (ss.is_repair_based_node_ops_enabled()) { + return rebuild_with_repair(ss._db, ss._token_metadata, std::move(source_dc)); + } else { + auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, + ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); + streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + auto keyspaces = make_lw_shared>(ss._db.local().get_non_system_keyspaces()); + return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) { + return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); + }).then([streamer] { + return streamer->stream_async().then([streamer] { + slogger.info("Streaming for rebuild successful"); + }).handle_exception([] (auto ep) { + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + slogger.warn("Error while rebuilding node: {}", std::current_exception()); + return make_exception_future<>(std::move(ep)); + }); }); - }); + } }); } From f4b4192c913e983e5332e1f45c93b540e7a0a3e2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 11:07:25 +0800 Subject: [PATCH 14/18] storage_service: Enable node repair based ops for removenode - Removenode operation It is used to remove a dead node out of the cluster. Existing nodes pulls data from other existing nodes for the new ranges it own. It pulls from one of the replicas which might not be the latest copy. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test --- service/storage_service.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index cc18a5e2c6..4cdffe9537 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2739,6 +2739,11 @@ void storage_service::unbootstrap() { } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { + if (is_repair_based_node_ops_enabled()) { + return removenode_with_repair(_db, _token_metadata, endpoint).finally([this, notify_endpoint] () { + return send_replication_notification(notify_endpoint); + }); + } return seastar::async([this, endpoint, notify_endpoint] { auto streamer = make_lw_shared(_db, get_token_metadata(), _abort_source, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode); auto my_address = get_broadcast_address(); From a38916121c54f7c3322863b1e82b4f50eecd0ea5 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 14:46:58 +0800 Subject: [PATCH 15/18] storage_service: Enable node repair based ops for replace - Replace operation It is used to replace a dead node. The token ring does not change. It pulls data from only one of the replicas which might not be the latest copy. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test --- service/storage_service.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 4cdffe9537..81a32bfe95 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -953,9 +953,13 @@ void storage_service::bootstrap() { _gossiper.check_seen_seeds(); set_mode(mode::JOINING, "Starting to bootstrap...", true); - dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); - // Does the actual streaming of newly replicated token ranges. - bs.bootstrap().get(); + if (is_repair_based_node_ops_enabled() & db().local().is_replacing()) { + replace_with_repair(_db, _token_metadata).get(); + } else { + dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); + // Does the actual streaming of newly replicated token ranges. + bs.bootstrap().get(); + } slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens); } From 62f056c022b44dfb48cac898ddb40fe3814b0f49 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 11:04:13 +0800 Subject: [PATCH 16/18] storage_service: Enable node repair based ops for decommission MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Decommission operation It is used to remove a live node form the cluster. Token ring changes. Do not suffer “not the latest replica” issue. The leaving node pushes data to existing nodes. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test --- service/storage_service.cc | 65 ++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 81a32bfe95..3a73e630fd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2705,40 +2705,45 @@ std::unordered_multimap storage_service::get_cha // Runs inside seastar::async context void storage_service::unbootstrap() { - std::unordered_map> ranges_to_stream; - - auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); - if (slogger.is_enabled(logging::log_level::debug)) { - std::vector> ranges; - for (auto& x : ranges_mm) { - ranges.push_back(x.first); - } - slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges); - } - ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); - } - - set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); - - auto stream_success = stream_ranges(ranges_to_stream); - // Wait for batch log to complete before streaming hints. - slogger.debug("waiting for batch log processing."); - // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. db::get_local_batchlog_manager().do_batch_log_replay().get(); + if (is_repair_based_node_ops_enabled()) { + decommission_with_repair(_db, _token_metadata).get(); + } else { + std::unordered_map> ranges_to_stream; - set_mode(mode::LEAVING, "streaming hints to other nodes", true); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); + if (slogger.is_enabled(logging::log_level::debug)) { + std::vector> ranges; + for (auto& x : ranges_mm) { + ranges.push_back(x.first); + } + slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges); + } + ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); + } - // wait for the transfer runnables to signal the latch. - slogger.debug("waiting for stream acks."); - try { - stream_success.get(); - } catch (...) { - slogger.warn("unbootstrap fails to stream : {}", std::current_exception()); - throw; + set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); + + auto stream_success = stream_ranges(ranges_to_stream); + // Wait for batch log to complete before streaming hints. + slogger.debug("waiting for batch log processing."); + // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. + db::get_local_batchlog_manager().do_batch_log_replay().get(); + + set_mode(mode::LEAVING, "streaming hints to other nodes", true); + + // wait for the transfer runnables to signal the latch. + slogger.debug("waiting for stream acks."); + try { + stream_success.get(); + } catch (...) { + slogger.warn("unbootstrap fails to stream : {}", std::current_exception()); + throw; + } + slogger.debug("stream acks all received."); } - slogger.debug("stream acks all received."); leave_ring(); } From ac90c1c18464d870d6e10e674aa08b18d60344e2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 27 Nov 2019 14:53:13 +0800 Subject: [PATCH 17/18] storage_service: Enable node repair based ops for bootstrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bootstrap operation It is used to add a new node into the cluster. The token ring changes. Do not suffer from the "not the latest replica” issue. New node pulls data from existing nodes that are losing the token range. Suffer from failed streaming. We split the ranges in 10 groups and we stream one group at a time. Restream the group if failed, causing unnecessary data transmission on wire. Bootstrap is not resumable. Failure after 99.99% of data is streamed. If we restart the node again, we need to stream all the data again even if the node already has 99.99% of the data. Fixes: #3003 Fixes: #4208 Tests: update_cluster_layout_tests.py + replace_address_test.py + manual test --- service/storage_service.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 3a73e630fd..f0f6285333 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -953,8 +953,12 @@ void storage_service::bootstrap() { _gossiper.check_seen_seeds(); set_mode(mode::JOINING, "Starting to bootstrap...", true); - if (is_repair_based_node_ops_enabled() & db().local().is_replacing()) { - replace_with_repair(_db, _token_metadata).get(); + if (is_repair_based_node_ops_enabled()) { + if (db().local().is_replacing()) { + replace_with_repair(_db, _token_metadata).get(); + } else { + bootstrap_with_repair(_db, _token_metadata, _bootstrap_tokens).get(); + } } else { dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); // Does the actual streaming of newly replicated token ranges. From aaa1f3ce7bfe986712aacae80ef270a4f33f4794 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 3 Jan 2020 10:12:05 +0800 Subject: [PATCH 18/18] docs: Add doc for repair_based_node_ops This patch adds a doc for the repair based node operations. --- docs/repair_based_node_ops.md | 114 ++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 docs/repair_based_node_ops.md diff --git a/docs/repair_based_node_ops.md b/docs/repair_based_node_ops.md new file mode 100644 index 0000000000..8bf3e96f2b --- /dev/null +++ b/docs/repair_based_node_ops.md @@ -0,0 +1,114 @@ +# Repair based node operations + +Here below is a simple introduction to the node operations Scylla support and +some of the issues that streaming based node operations have. + +- Replace operation + + It is used to replace a dead node. The token ring does not change. Replacing + node pulls data from only one of the replicas which might not be the latest + copy. + +- Rebuild operation + + It is used to get all the data this node owns form other existing nodes. It + pulls data from only one of the replicas which might not be the latest copy. + +- Removenode operation + + It is used to remove a dead node out of the cluster. Existing nodes pull + data from other existing nodes for the new ranges it owns. It pulls from one + of the replicas which might not be the latest copy. + +- Bootstrap operation + + It is used to add a new node into the cluster. The token ring changes. It + does not suffer from the "latest replica” issue. New node pulls data + from existing nodes that are losing the token ranges. + + It suffers from failed streaming. We split the ranges in 10 groups and we + stream one group at a time. Restream the group if failed, causing + unnecessary data transmission on wire. + + Bootstrap is not resumable. With failure after 99.99% of data is streamed, + if we restart the node again, we need to stream all the data again even if + the node already has 99.99% of the data. + +- Decommission operation + + It is used to remove a live node form the cluster. Token ring changes. It + does not suffer from the “latest replica” issue. The leaving node pushes data + to existing nodes. + + It suffers from resumable issue like bootstrap operation. + +To solve all the issues above. We use repair based node operations. The idea +behind repair based node operations is simple: use repair to synchronize data +between replicas instead of using streaming. + +The benefits: + +- Latest copy is guaranteed + +- Resumable in nature + +- No extra data is streamed on wire + E.g., rebuild twice, will not stream the same data twice + +- Unified code path for all the node operations + +- Free repair operation during bootstrap, replace operation and so on. + +Select nodes to synchronize with: + +- Replace operation + Synchronize with all replica nodes from the local DC + +- Rebuild operation + Synchronize with all replica nodes from the local DC or from the DC specified by the user + +- Removenode operation + Synchronize with all replica nodes from local DC + +- Bootstrap operation + 1) If local_dc_replica_nodes = RF, synchronize with 1 node that is losing the + range in the local DC + + 2) If 0 < local_dc_replica_nodes < RF, synchronize with + up to RF/2 + 1 nodes in local DC + + 3) If local_dc_replica_nodes = 0, reject the bootstrap operation + +- Decommission operation + 1) Synchronize with one node that is gaining the range in local DC + + For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in the cluster, n3 is + removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2, n4}. + + The decommission node will repair all the ranges the leaving node is + responsible for. Choose the decommission node n3 to run repair to synchronize + with the new owner node n4 in the local DC. + + 2) Synchronize with one node when no node is gaining the range in local DC + + For example, with RF = 3 and 3 nodes n1, n2, n3 in the cluster, n3 is + removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2}. + + The decommission node will repair all the ranges the leaving node is + responsible for. The cluster is losing data on n3, it has to synchronize with + at least one of {n1, n2}, otherwise it might lose the only new replica on n3. + Choose the decommission node n3 to run repair to synchronize with one of the + replica nodes, e.g., n1, in the local DC. + + Note, it is enough to synchronize with one node instead of quorum number of + nodes. For example, with RF = 3, timestamp(B) > timestamp(A), + + n1 n2 n3 n4 + 1) A B B (Before n3 is decommissioned) + 2) A B (After n3 is decommissioned) + 3) A B B (After n4 is bootstrapped) + + Suppose we decommission n3 and n3 decides to synchronize with only n2. We end + up with n1 has A, n2 has B. Then we add a new node n4 to the cluster, since + no node is losing the range, so n4 will sync quorum number of nodes, + that is n1 and n2. As a result, n4 will have B instead of A.