diff --git a/db/config.cc b/db/config.cc index 958abf46c3..f7425fa961 100644 --- a/db/config.cc +++ b/db/config.cc @@ -681,6 +681,7 @@ db::config::config(std::shared_ptr exts) , replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.") , replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.") , override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster") + , enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based") , ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.") , shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.") , fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.") diff --git a/db/config.hh b/db/config.hh index 451a779bfe..0a6ab4e819 100644 --- a/db/config.hh +++ b/db/config.hh @@ -270,6 +270,7 @@ public: named_value replace_address; named_value replace_address_first_boot; named_value override_decommission; + named_value enable_repair_based_node_ops; named_value ring_delay_ms; named_value shadow_round_ms; named_value fd_max_interval_ms; diff --git a/docs/repair_based_node_ops.md b/docs/repair_based_node_ops.md new file mode 100644 index 0000000000..8bf3e96f2b --- /dev/null +++ b/docs/repair_based_node_ops.md @@ -0,0 +1,114 @@ +# Repair based node operations + +Here below is a simple introduction to the node operations Scylla support and +some of the issues that streaming based node operations have. + +- Replace operation + + It is used to replace a dead node. The token ring does not change. Replacing + node pulls data from only one of the replicas which might not be the latest + copy. + +- Rebuild operation + + It is used to get all the data this node owns form other existing nodes. It + pulls data from only one of the replicas which might not be the latest copy. + +- Removenode operation + + It is used to remove a dead node out of the cluster. Existing nodes pull + data from other existing nodes for the new ranges it owns. It pulls from one + of the replicas which might not be the latest copy. + +- Bootstrap operation + + It is used to add a new node into the cluster. The token ring changes. It + does not suffer from the "latest replica” issue. New node pulls data + from existing nodes that are losing the token ranges. + + It suffers from failed streaming. We split the ranges in 10 groups and we + stream one group at a time. Restream the group if failed, causing + unnecessary data transmission on wire. + + Bootstrap is not resumable. With failure after 99.99% of data is streamed, + if we restart the node again, we need to stream all the data again even if + the node already has 99.99% of the data. + +- Decommission operation + + It is used to remove a live node form the cluster. Token ring changes. It + does not suffer from the “latest replica” issue. The leaving node pushes data + to existing nodes. + + It suffers from resumable issue like bootstrap operation. + +To solve all the issues above. We use repair based node operations. The idea +behind repair based node operations is simple: use repair to synchronize data +between replicas instead of using streaming. + +The benefits: + +- Latest copy is guaranteed + +- Resumable in nature + +- No extra data is streamed on wire + E.g., rebuild twice, will not stream the same data twice + +- Unified code path for all the node operations + +- Free repair operation during bootstrap, replace operation and so on. + +Select nodes to synchronize with: + +- Replace operation + Synchronize with all replica nodes from the local DC + +- Rebuild operation + Synchronize with all replica nodes from the local DC or from the DC specified by the user + +- Removenode operation + Synchronize with all replica nodes from local DC + +- Bootstrap operation + 1) If local_dc_replica_nodes = RF, synchronize with 1 node that is losing the + range in the local DC + + 2) If 0 < local_dc_replica_nodes < RF, synchronize with + up to RF/2 + 1 nodes in local DC + + 3) If local_dc_replica_nodes = 0, reject the bootstrap operation + +- Decommission operation + 1) Synchronize with one node that is gaining the range in local DC + + For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in the cluster, n3 is + removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2, n4}. + + The decommission node will repair all the ranges the leaving node is + responsible for. Choose the decommission node n3 to run repair to synchronize + with the new owner node n4 in the local DC. + + 2) Synchronize with one node when no node is gaining the range in local DC + + For example, with RF = 3 and 3 nodes n1, n2, n3 in the cluster, n3 is + removed, old_replicas = {n1, n2, n3}, new_replicas = {n1, n2}. + + The decommission node will repair all the ranges the leaving node is + responsible for. The cluster is losing data on n3, it has to synchronize with + at least one of {n1, n2}, otherwise it might lose the only new replica on n3. + Choose the decommission node n3 to run repair to synchronize with one of the + replica nodes, e.g., n1, in the local DC. + + Note, it is enough to synchronize with one node instead of quorum number of + nodes. For example, with RF = 3, timestamp(B) > timestamp(A), + + n1 n2 n3 n4 + 1) A B B (Before n3 is decommissioned) + 2) A B (After n3 is decommissioned) + 3) A B B (After n4 is bootstrapped) + + Suppose we decommission n3 and n3 decides to synchronize with only n2. We end + up with n1 has A, n2 has B. Then we add a new node n4 to the cluster, since + no node is losing the range, so n4 will sync quorum number of nodes, + that is n1 and n2. As a result, n4 will have B instead of A. diff --git a/repair/repair.cc b/repair/repair.cc index a8fbd48889..f7d3f84705 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -35,6 +35,7 @@ #include "sstables/sstables.hh" #include "database.hh" #include "hashers.hh" +#include "locator/network_topology_strategy.hh" #include #include @@ -333,6 +334,7 @@ future<> tracker::run(int id, std::function ()> func) { }).handle_exception([this, id] (std::exception_ptr ep) { rlogger.info("repair id {} failed: {}", id, ep); done(id, false); + return make_exception_future(std::move(ep)); }); }); } @@ -773,6 +775,12 @@ void repair_info::check_in_abort() { } } +repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) { + return neighbors.empty() ? + repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts)) : + neighbors[range]; +} + // Repair a single cf in a single local range. // Comparable to RepairJob in Origin. static future<> repair_cf_range(repair_info& ri, @@ -983,9 +991,22 @@ static future<> repair_cf_range(repair_info& ri, // Comparable to RepairSession in Origin static future<> repair_range(repair_info& ri, const dht::token_range& range) { auto id = utils::UUID_gen::get_time_UUID(); - return do_with(get_neighbors(ri.db.local(), ri.keyspace, range, ri.data_centers, ri.hosts), [&ri, range, id] (std::vector& neighbors) { + repair_neighbors neighbors = ri.get_repair_neighbors(range); + return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [&ri, range, id] (auto& neighbors, auto& mandatory_neighbors) { auto live_neighbors = boost::copy_range>(neighbors | boost::adaptors::filtered([] (const gms::inet_address& node) { return gms::get_local_gossiper().is_alive(node); })); + for (auto& node : mandatory_neighbors) { + auto it = std::find(live_neighbors.begin(), live_neighbors.end(), node); + if (it == live_neighbors.end()) { + ri.nr_failed_ranges++; + auto status = format("failed: mandatory neighbor={} is not alive", node); + rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", + ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.cfs, range, neighbors, live_neighbors, status); + ri.abort(); + return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}", + node, ri.keyspace, mandatory_neighbors))); + } + } if (live_neighbors.size() != neighbors.size()) { ri.nr_failed_ranges++; auto status = live_neighbors.empty() ? "skipped" : "partial"; @@ -1495,3 +1516,416 @@ future<> repair_abort_all(seastar::sharded& db) { repair_tracker().abort_all_repairs(); }); } + +future<> sync_data_using_repair(seastar::sharded& db, + sstring keyspace, + dht::token_range_vector ranges, + std::unordered_map neighbors) { + if (ranges.empty()) { + return make_ready_future<>(); + } + return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable { + int id = repair_tracker().next_repair_command(); + rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace); + return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable { + auto cfs = list_column_families(db.local(), keyspace); + std::vector> repair_results; + repair_results.reserve(smp::count); + for (auto shard : boost::irange(unsigned(0), smp::count)) { + auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable { + auto data_centers = std::vector(); + auto hosts = std::vector(); + auto ri = make_lw_shared(service::get_local_storage_service().db(), + std::move(keyspace), std::move(ranges), std::move(cfs), + id, std::move(data_centers), std::move(hosts)); + ri->neighbors = std::move(neighbors); + return repair_ranges(ri); + }); + repair_results.push_back(std::move(f)); + } + return when_all(repair_results.begin(), repair_results.end()).then([id, keyspace] (std::vector> results) mutable { + std::vector errors; + for (unsigned shard = 0; shard < results.size(); shard++) { + auto& f = results[shard]; + if (f.failed()) { + auto ep = f.get_exception(); + errors.push_back(format("shard {}: {}", shard, ep)); + } + } + if (!errors.empty()) { + return make_exception_future<>(std::runtime_error(format("{}", errors))); + } + return make_ready_future<>(); + }); + }).then([id, keyspace] { + rlogger.info("repair id {} to sync data for keyspace={}, status=succeeded", id, keyspace); + }).handle_exception([id, keyspace] (std::exception_ptr ep) { + rlogger.info("repair id {} to sync data for keyspace={}, status=failed: {}", id, keyspace, ep); + return make_exception_future<>(ep); + }); + }); +} + +future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens) { + using inet_address = gms::inet_address; + return seastar::async([&db, tm = std::move(tm), tokens = std::move(bootstrap_tokens)] () mutable { + auto keyspaces = db.local().get_non_system_keyspaces(); + rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces); + auto myip = utils::fb_utilities::get_broadcast_address(); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tm, tokens, myip); + + //Active ranges + auto metadata_clone = tm.clone_only_token_map(); + auto range_addresses = strat.get_range_addresses(metadata_clone); + + //Pending ranges + metadata_clone.update_normal_tokens(tokens, myip); + auto pending_range_addresses = strat.get_range_addresses(metadata_clone); + + //Collects the source that will have its range moved to the new node + std::unordered_map range_sources; + + rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size()); + for (auto& desired_range : desired_ranges) { + for (auto& x : range_addresses) { + const range& src_range = x.first; + seastar::thread::maybe_yield(); + if (src_range.contains(desired_range, dht::tri_compare)) { + std::vector old_endpoints(x.second.begin(), x.second.end()); + auto it = pending_range_addresses.find(desired_range); + if (it == pending_range_addresses.end()) { + throw std::runtime_error(format("Can not find desired_range = {} in pending_range_addresses", desired_range)); + } + + std::unordered_set new_endpoints(it->second.begin(), it->second.end()); + rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, old_endpoints={}, new_endpoints={}", + keyspace_name, desired_range, old_endpoints, new_endpoints); + // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + // So we need to be careful to only be strict when endpoints == RF + // That is we can only have RF >= old_endpoints.size(). + // 1) If RF > old_endpoints.size(), it means no node is + // going to lose the ownership of the range, so there + // is no need to choose a mandatory neighbor to sync + // data from. + // 2) If RF = old_endpoints.size(), it means one node is + // going to lose the ownership of the range, we need to + // choose it as the mandatory neighbor to sync data + // from. + std::vector mandatory_neighbors; + // All neighbors + std::vector neighbors; + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto local_dc = get_local_dc(); + auto get_node_losing_the_ranges = [&] (const std::vector& old_nodes, const std::unordered_set& new_nodes) { + // Remove the new nodes from the old nodes list, so + // that it contains only the node that will lose + // the ownership of the range. + auto nodes = boost::copy_range>(old_nodes | + boost::adaptors::filtered([&] (const gms::inet_address& node) { return !new_nodes.count(node); })); + if (nodes.size() != 1) { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, expected 1 node losing range but found more nodes={}", + keyspace_name, desired_range, nodes)); + } + return nodes; + }; + auto get_rf_in_local_dc = [&] () { + size_t rf_in_local_dc = strat.get_replication_factor(); + if (strat.get_type() == locator::replication_strategy_type::network_topology) { + auto nts = dynamic_cast(&strat); + if (!nts) { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, failed to cast to network_topology_strategy", + keyspace_name, desired_range)); + } + rf_in_local_dc = nts->get_replication_factor(local_dc); + } + return rf_in_local_dc; + }; + auto get_old_endpoints_in_local_dc = [&] () { + return boost::copy_range>(old_endpoints | + boost::adaptors::filtered([&] (const gms::inet_address& node) { + return snitch_ptr->get_datacenter(node) == local_dc; + }) + ); + }; + auto old_endpoints_in_local_dc = get_old_endpoints_in_local_dc(); + auto rf_in_local_dc = get_rf_in_local_dc(); + if (old_endpoints.size() == strat.get_replication_factor()) { + // For example, with RF = 3 and 3 nodes n1, n2, n3 + // in the cluster, n4 is bootstrapped, old_replicas + // = {n1, n2, n3}, new_replicas = {n1, n2, n4}, n3 + // is losing the range. Choose the bootstrapping + // node n4 to run repair to sync with the node + // losing the range n3 + mandatory_neighbors = get_node_losing_the_ranges(old_endpoints, new_endpoints); + neighbors = mandatory_neighbors; + } else if (old_endpoints.size() < strat.get_replication_factor()) { + if (old_endpoints_in_local_dc.size() == rf_in_local_dc) { + // Local DC has enough replica nodes. + mandatory_neighbors = get_node_losing_the_ranges(old_endpoints_in_local_dc, new_endpoints); + neighbors = mandatory_neighbors; + } else if (old_endpoints_in_local_dc.size() == 0) { + // Local DC has zero replica node. + // Reject the operation + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, no existing node in local dc", + keyspace_name, desired_range)); + } else if (old_endpoints_in_local_dc.size() < rf_in_local_dc) { + // Local DC does not have enough replica nodes. + // For example, with RF = 3, 2 nodes n1, n2 in the + // cluster, n3 is bootstrapped, old_replicas={n1, n2}, + // new_replicas={n1, n2, n3}. No node is losing + // ranges. The bootstrapping node n3 has to sync + // with n1 and n2, otherwise n3 might get the old + // replica and make the total old replica be 2, + // which makes the quorum read incorrect. + // Choose the bootstrapping node n3 to reun repair + // to sync with n1 and n2. + size_t nr_nodes_to_sync_with = std::min(rf_in_local_dc / 2 + 1, old_endpoints_in_local_dc.size()); + neighbors = old_endpoints_in_local_dc; + neighbors.resize(nr_nodes_to_sync_with); + } else { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints_in_local_dc={}, rf_in_local_dc={}", + keyspace_name, desired_range, old_endpoints_in_local_dc.size(), rf_in_local_dc)); + } + } else { + throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, wrong number of old_endpoints={}, rf={}", + keyspace_name, desired_range, old_endpoints, strat.get_replication_factor())); + } + rlogger.debug("bootstrap_with_repair: keyspace={}, range={}, neighbors={}, mandatory_neighbors={}", + keyspace_name, desired_range, neighbors, mandatory_neighbors); + range_sources[desired_range] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors)); + } + } + } + auto nr_ranges = desired_ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get(); + rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges); + } + rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces); + }); +} + +future<> do_decommission_removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node) { + using inet_address = gms::inet_address; + return seastar::async([&db, tm = std::move(tm), leaving_node = std::move(leaving_node)] () mutable { + auto myip = utils::fb_utilities::get_broadcast_address(); + auto keyspaces = db.local().get_non_system_keyspaces(); + bool is_removenode = myip != leaving_node; + auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair"; + rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + // First get all ranges the leaving node is responsible for + dht::token_range_vector ranges = strat.get_ranges(leaving_node); + rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size()); + size_t nr_ranges_total = ranges.size(); + size_t nr_ranges_skipped = 0; + std::unordered_map> current_replica_endpoints; + // Find (for each range) all nodes that store replicas for these ranges as well + for (auto& r : ranges) { + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + auto eps = strat.calculate_natural_endpoints(end_token, tm); + current_replica_endpoints.emplace(r, std::move(eps)); + } + auto temp = tm.clone_after_all_left(); + // leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode + // command was used), it is still present in temp and must be removed. + if (temp.is_member(leaving_node)) { + temp.remove_endpoint(leaving_node); + } + std::unordered_map range_sources; + dht::token_range_vector ranges_for_removenode; + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto local_dc = get_local_dc(); + for (auto&r : ranges) { + seastar::thread::maybe_yield(); + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + const std::vector new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp); + const std::vector& current_eps = current_replica_endpoints[r]; + std::unordered_set neighbors_set(new_eps.begin(), new_eps.end()); + bool skip_this_range = false; + auto new_owner = neighbors_set; + for (const auto& node : current_eps) { + new_owner.erase(node); + } + if (new_eps.size() == 0) { + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, zero replica after the removal", + op, keyspace_name, r, current_eps, new_eps)); + } + if (new_owner.size() == 1) { + // For example, with RF = 3 and 4 nodes n1, n2, n3, n4 in + // the cluster, n3 is removed, old_replicas = {n1, n2, n3}, + // new_replicas = {n1, n2, n4}. + if (is_removenode) { + // For removenode operation, use new owner node to + // repair the data in case there is new owner node + // available. Nodes that are not the new owner of a + // range will ignore the range. There can be at most + // one new owner for each of the ranges. Note: The new + // owner is the node that does not own the range before + // but owns the range after the remove operation. It + // does not have to be the primary replica of the + // range. + // Choose the new owner node n4 to run repair to sync + // with all replicas. + skip_this_range = *new_owner.begin() != myip; + neighbors_set.insert(current_eps.begin(), current_eps.end()); + } else { + // For decommission operation, the decommission node + // will repair all the ranges the leaving node is + // responsible for. + // Choose the decommission node n3 to run repair to + // sync with the new owner node n4. + for (auto& node : new_owner) { + if (snitch_ptr->get_datacenter(node) == local_dc) { + neighbors_set = std::unordered_set{node}; + break; + } + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}", + op, keyspace_name, r, current_eps, new_eps, local_dc)); + } + } + } else if (new_owner.size() == 0) { + // For example, with RF = 3 and 3 nodes n1, n2, n3 in the + // cluster, n3 is removed, old_replicas = {n1, n2, n3}, + // new_replicas = {n1, n2}. + if (is_removenode) { + // For removenode operation, use the primary replica + // node to repair the data in case there is no new + // owner node available. + // Choose the primary replica node n1 to run repair to + // sync with all replicas. + skip_this_range = new_eps.front() != myip; + neighbors_set.insert(current_eps.begin(), current_eps.end()); + } else { + // For decommission operation, the decommission node + // will repair all the ranges the leaving node is + // responsible for. We are losing data on n3, we have + // to sync with at least one of {n1, n2}, otherwise we + // might lose the only new replica on n3. + // Choose the decommission node n3 to run repair to + // sync with one of the replica nodes, e.g., n1, in the + // local DC. + for (auto& node : new_eps) { + if (snitch_ptr->get_datacenter(node) == local_dc) { + neighbors_set = std::unordered_set{node}; + break; + } + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, can not find new node in local dc={}", + op, keyspace_name, r, current_eps, new_eps, local_dc)); + } + } + } else { + throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}", + op, keyspace_name, r, current_eps, new_eps, new_owner)); + } + neighbors_set.erase(myip); + neighbors_set.erase(leaving_node); + auto neighbors = boost::copy_range>(neighbors_set | + boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) { + return snitch_ptr->get_datacenter(node) == local_dc; + }) + ); + + if (skip_this_range) { + nr_ranges_skipped++; + rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, skipped", + op, keyspace_name, r, current_eps, new_eps, neighbors); + } else { + rlogger.debug("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}", + op, keyspace_name, r, current_eps, new_eps, neighbors); + range_sources[r] = repair_neighbors(std::move(neighbors)); + if (is_removenode) { + ranges_for_removenode.push_back(r); + } + } + } + if (is_removenode) { + ranges.swap(ranges_for_removenode); + } + auto nr_ranges_synced = ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get(); + rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}", + op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped); + } + rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node); + }); +} + +future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm) { + return do_decommission_removenode_with_repair(db, std::move(tm), utils::fb_utilities::get_broadcast_address()); +} + +future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node) { + return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node)); +} + +future<> do_rebuild_replace_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring op, sstring source_dc) { + return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable { + auto keyspaces = db.local().get_non_system_keyspaces(); + rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc); + auto myip = utils::fb_utilities::get_broadcast_address(); + for (auto& keyspace_name : keyspaces) { + if (!db.local().has_keyspace(keyspace_name)) { + rlogger.info("{}: keyspace={} does not exist any more, ignoring it", op, keyspace_name); + continue; + } + auto& ks = db.local().find_keyspace(keyspace_name); + auto& strat = ks.get_replication_strategy(); + dht::token_range_vector ranges = strat.get_ranges(myip); + std::unordered_map range_sources; + rlogger.info("{}: started with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, ranges.size()); + for (auto it = ranges.begin(); it != ranges.end();) { + auto& r = *it; + seastar::thread::maybe_yield(); + auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, tm) | + boost::adaptors::filtered([myip, &source_dc, &snitch_ptr] (const gms::inet_address& node) { + if (node == myip) { + return false; + } + return source_dc.empty() ? true : snitch_ptr->get_datacenter(node) == source_dc; + }) + ); + rlogger.debug("{}: keyspace={}, range={}, neighbors={}", op, keyspace_name, r, neighbors); + if (!neighbors.empty()) { + range_sources[r] = repair_neighbors(std::move(neighbors)); + ++it; + } else { + // Skip the range with zero neighbors + it = ranges.erase(it); + } + } + auto nr_ranges = ranges.size(); + sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get(); + rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges); + } + rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc); + }); +} + +future<> rebuild_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring source_dc) { + auto op = sstring("rebuild_with_repair"); + if (source_dc.empty()) { + source_dc = get_local_dc(); + } + return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc)); +} + +future<> replace_with_repair(seastar::sharded& db, locator::token_metadata tm) { + auto op = sstring("replace_with_repair"); + auto source_dc = get_local_dc(); + return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc)); +} diff --git a/repair/repair.hh b/repair/repair.hh index 12f0a3075f..17d06bee0a 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -46,6 +46,13 @@ public: repair_stopped_exception() : repair_exception("Repair stopped") { } }; +// The tokens are the tokens assigned to the bootstrap node. +future<> bootstrap_with_repair(seastar::sharded& db, locator::token_metadata tm, std::unordered_set bootstrap_tokens); +future<> decommission_with_repair(seastar::sharded& db, locator::token_metadata tm); +future<> removenode_with_repair(seastar::sharded& db, locator::token_metadata tm, gms::inet_address leaving_node); +future<> rebuild_with_repair(seastar::sharded& db, locator::token_metadata tm, sstring source_dc); +future<> replace_with_repair(seastar::sharded& db, locator::token_metadata tm); + // NOTE: repair_start() can be run on any node, but starts a node-global // operation. // repair_start() starts the requested repair on this node. It returns an @@ -149,6 +156,20 @@ public: sstring get_stats(); }; +class repair_neighbors { +public: + std::vector all; + std::vector mandatory; + repair_neighbors() = default; + explicit repair_neighbors(std::vector a) + : all(std::move(a)) { + } + repair_neighbors(std::vector a, std::vector m) + : all(std::move(a)) + , mandatory(std::move(m)) { + } +}; + class repair_info { public: seastar::sharded& db; @@ -160,6 +181,7 @@ public: shard_id shard; std::vector data_centers; std::vector hosts; + std::unordered_map neighbors; size_t nr_failed_ranges = 0; bool aborted = false; // Map of peer -> @@ -198,6 +220,7 @@ public: const std::vector& neighbors_out); void abort(); void check_in_abort(); + repair_neighbors get_repair_neighbors(const dht::token_range& range); void update_statistics(const repair_stats& stats) { _stats.add(stats); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 130eee88b9..10a9a675ed 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -80,6 +80,7 @@ #include "database.hh" #include #include "cdc/generation.hh" +#include "repair/repair.hh" using token = dht::token; using UUID = utils::UUID; @@ -524,7 +525,8 @@ void storage_service::join_token_ring(int delay) { slogger.info("This node will not auto bootstrap because it is configured to be a seed node."); } if (should_bootstrap()) { - if (db::system_keyspace::bootstrap_in_progress()) { + bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress(); + if (resume_bootstrap) { slogger.warn("Detected previous bootstrap failure; retrying"); } else { db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS).get(); @@ -577,7 +579,18 @@ void storage_service::join_token_ring(int delay) { throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"); } set_mode(mode::JOINING, "getting bootstrap token", true); - _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + if (resume_bootstrap) { + _bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0(); + if (!_bootstrap_tokens.empty()) { + slogger.info("Using previously saved tokens = {}", _bootstrap_tokens); + } else { + _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + slogger.info("Using newly generated tokens = {}", _bootstrap_tokens); + } + } else { + _bootstrap_tokens = boot_strapper::get_bootstrap_tokens(_token_metadata, _db.local()); + slogger.info("Using newly generated tokens = {}", _bootstrap_tokens); + } } else { auto replace_addr = db().local().get_replace_address(); if (replace_addr && *replace_addr != get_broadcast_address()) { @@ -940,9 +953,17 @@ void storage_service::bootstrap() { _gossiper.check_seen_seeds(); set_mode(mode::JOINING, "Starting to bootstrap...", true); - dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); - // Does the actual streaming of newly replicated token ranges. - bs.bootstrap().get(); + if (is_repair_based_node_ops_enabled()) { + if (db().local().is_replacing()) { + replace_with_repair(_db, _token_metadata).get(); + } else { + bootstrap_with_repair(_db, _token_metadata, _bootstrap_tokens).get(); + } + } else { + dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata); + // Does the actual streaming of newly replicated token ranges. + bs.bootstrap().get(); + } slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens); } @@ -2579,24 +2600,28 @@ future<> storage_service::drain() { future<> storage_service::rebuild(sstring source_dc) { return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) { slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, - ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); - streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); - if (source_dc != "") { - streamer->add_source_filter(std::make_unique(source_dc)); - } - auto keyspaces = make_lw_shared>(ss._db.local().get_non_system_keyspaces()); - return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) { - return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); - }).then([streamer] { - return streamer->stream_async().then([streamer] { - slogger.info("Streaming for rebuild successful"); - }).handle_exception([] (auto ep) { - // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - slogger.warn("Error while rebuilding node: {}", std::current_exception()); - return make_exception_future<>(std::move(ep)); + if (ss.is_repair_based_node_ops_enabled()) { + return rebuild_with_repair(ss._db, ss._token_metadata, std::move(source_dc)); + } else { + auto streamer = make_lw_shared(ss._db, ss._token_metadata, ss._abort_source, + ss.get_broadcast_address(), "Rebuild", streaming::stream_reason::rebuild); + streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + auto keyspaces = make_lw_shared>(ss._db.local().get_non_system_keyspaces()); + return do_for_each(*keyspaces, [keyspaces, streamer, &ss] (sstring& keyspace_name) { + return streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name)); + }).then([streamer] { + return streamer->stream_async().then([streamer] { + slogger.info("Streaming for rebuild successful"); + }).handle_exception([] (auto ep) { + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + slogger.warn("Error while rebuilding node: {}", std::current_exception()); + return make_exception_future<>(std::move(ep)); + }); }); - }); + } }); } @@ -2684,44 +2709,54 @@ std::unordered_multimap storage_service::get_cha // Runs inside seastar::async context void storage_service::unbootstrap() { - std::unordered_map> ranges_to_stream; - - auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); - if (slogger.is_enabled(logging::log_level::debug)) { - std::vector> ranges; - for (auto& x : ranges_mm) { - ranges.push_back(x.first); - } - slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges); - } - ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); - } - - set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); - - auto stream_success = stream_ranges(ranges_to_stream); - // Wait for batch log to complete before streaming hints. - slogger.debug("waiting for batch log processing."); - // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. db::get_local_batchlog_manager().do_batch_log_replay().get(); + if (is_repair_based_node_ops_enabled()) { + decommission_with_repair(_db, _token_metadata).get(); + } else { + std::unordered_map> ranges_to_stream; - set_mode(mode::LEAVING, "streaming hints to other nodes", true); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); + if (slogger.is_enabled(logging::log_level::debug)) { + std::vector> ranges; + for (auto& x : ranges_mm) { + ranges.push_back(x.first); + } + slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges); + } + ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); + } - // wait for the transfer runnables to signal the latch. - slogger.debug("waiting for stream acks."); - try { - stream_success.get(); - } catch (...) { - slogger.warn("unbootstrap fails to stream : {}", std::current_exception()); - throw; + set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); + + auto stream_success = stream_ranges(ranges_to_stream); + // Wait for batch log to complete before streaming hints. + slogger.debug("waiting for batch log processing."); + // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. + db::get_local_batchlog_manager().do_batch_log_replay().get(); + + set_mode(mode::LEAVING, "streaming hints to other nodes", true); + + // wait for the transfer runnables to signal the latch. + slogger.debug("waiting for stream acks."); + try { + stream_success.get(); + } catch (...) { + slogger.warn("unbootstrap fails to stream : {}", std::current_exception()); + throw; + } + slogger.debug("stream acks all received."); } - slogger.debug("stream acks all received."); leave_ring(); } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { + if (is_repair_based_node_ops_enabled()) { + return removenode_with_repair(_db, _token_metadata, endpoint).finally([this, notify_endpoint] () { + return send_replication_notification(notify_endpoint); + }); + } return seastar::async([this, endpoint, notify_endpoint] { auto streamer = make_lw_shared(_db, get_token_metadata(), _abort_source, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode); auto my_address = get_broadcast_address(); @@ -3492,5 +3527,9 @@ future storage_service::is_cleanup_allowed(sstring keyspace) { }); } +bool storage_service::is_repair_based_node_ops_enabled() { + return _db.local().get_config().enable_repair_based_node_ops(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 489233db04..f86618d2a6 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -923,6 +923,7 @@ private: void notify_cql_change(inet_address endpoint, bool ready); public: future is_cleanup_allowed(sstring keyspace); + bool is_repair_based_node_ops_enabled(); }; future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service,