Merge 'api: repair_async: refuse repairing tablet keyspaces' from Aleksandra Martyniuk
A tablet repair started with /storage_service/repair_async/ API bypasses tablet repair scheduler and repairs only the tablets that are owned by the requested node. Due to that, to safely repair the whole keyspace, we need to first disable tablet migrations and then start repair on all nodes. With the new API - /storage_service/tablets/repair - tailored to tablet repair requirements, we do not need additional preparation before repair. We may request it on one node in a cluster only and, thanks to tablet repair scheduler, a whole keyspace will be safely repaired. Both nodetool and Scylla Manager have already started using the new API to repair tablets. Refuse repairing tablet keyspaces with /storage_service/repair_async - 403 Forbidden is returned. repair_async should still be used to repair vnode keyspaces. Fixes: https://github.com/scylladb/scylladb/issues/23008. Breaking change; no backport. Closes scylladb/scylladb#24678 * github.com:scylladb/scylladb: repair: remove unused code api: repair_async: forbid repairing tablet keyspaces
This commit is contained in:
@@ -359,6 +359,9 @@ void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair, s
|
||||
// if the option is not sane, repair_start() throws immediately, so
|
||||
// convert the exception to an HTTP error
|
||||
throw httpd::bad_param_exception(e.what());
|
||||
} catch (const tablets_unsupported& e) {
|
||||
throw base_exception("Cannot repair tablet keyspace. Use /storage_service/tablets/repair to repair tablet keyspaces.",
|
||||
http::reply::status_type::forbidden);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -531,13 +531,6 @@ tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
|
||||
return replicas.at((size_t(id)+1) % replicas.size());
|
||||
}
|
||||
|
||||
tablet_replica tablet_map::get_primary_replica_within_dc(tablet_id id, const topology& topo, sstring dc) const {
|
||||
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, [&] (const auto& tr) {
|
||||
const auto& node = topo.get_node(tr.host);
|
||||
return node.dc_rack().dc == dc;
|
||||
}).value();
|
||||
}
|
||||
|
||||
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {
|
||||
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, [&] (const auto& tr) {
|
||||
return tablet_task_info.selected_by_filters(tr, topo);
|
||||
|
||||
@@ -501,7 +501,6 @@ public:
|
||||
|
||||
/// Returns the primary replica for the tablet
|
||||
tablet_replica get_primary_replica(tablet_id id) const;
|
||||
tablet_replica get_primary_replica_within_dc(tablet_id id, const topology& topo, sstring dc) const;
|
||||
|
||||
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
|
||||
/// \throws std::runtime_error if the tablet has less than 2 replicas.
|
||||
|
||||
240
repair/repair.cc
240
repair/repair.cc
@@ -1193,7 +1193,6 @@ future<int> repair_service::do_repair_start(gms::gossip_address_map& addr_map, s
|
||||
|
||||
size_t nr_tablet_table = 0;
|
||||
size_t nr_vnode_table = 0;
|
||||
bool is_tablet = false;
|
||||
for (auto& table_name : cfs) {
|
||||
auto& t = db.find_column_family(keyspace, table_name);
|
||||
if (t.uses_tablets()) {
|
||||
@@ -1207,53 +1206,7 @@ future<int> repair_service::do_repair_start(gms::gossip_address_map& addr_map, s
|
||||
if (nr_vnode_table != 0) {
|
||||
throw std::invalid_argument("Mixed vnode table and tablet table");
|
||||
}
|
||||
is_tablet = true;
|
||||
}
|
||||
if (is_tablet) {
|
||||
// Reject unsupported options for tablet repair
|
||||
if (!options.start_token.empty()) {
|
||||
throw std::invalid_argument("The startToken option is not supported for tablet repair");
|
||||
}
|
||||
if (!options.end_token.empty()) {
|
||||
throw std::invalid_argument("The endToken option is not supported for tablet repair");
|
||||
}
|
||||
if (options.small_table_optimization) {
|
||||
throw std::invalid_argument("The small_table_optimization option is not supported for tablet repair");
|
||||
}
|
||||
|
||||
// Reject unsupported option combinations.
|
||||
if (!options.data_centers.empty() && !options.hosts.empty()) {
|
||||
throw std::invalid_argument("Cannot combine dataCenters and hosts options.");
|
||||
}
|
||||
if (!options.ignore_nodes.empty() && !options.hosts.empty()) {
|
||||
throw std::invalid_argument("Cannot combine ignore_nodes and hosts options.");
|
||||
}
|
||||
|
||||
std::unordered_set<locator::host_id> hosts;
|
||||
for (const auto& n : options.hosts) {
|
||||
try {
|
||||
auto node = gms::inet_address(n);
|
||||
hosts.insert(_gossiper.local().get_host_id(node));
|
||||
} catch(...) {
|
||||
throw std::invalid_argument(fmt::format("Failed to parse node={} in hosts={} specified by user: {}",
|
||||
n, options.hosts, std::current_exception()));
|
||||
}
|
||||
}
|
||||
std::unordered_set<locator::host_id> ignore_nodes;
|
||||
for (const auto& n : options.ignore_nodes) {
|
||||
try {
|
||||
auto node = gms::inet_address(n);
|
||||
ignore_nodes.insert(_gossiper.local().get_host_id(node));
|
||||
} catch(...) {
|
||||
throw std::invalid_argument(fmt::format("Failed to parse node={} in ignore_nodes={} specified by user: {}",
|
||||
n, options.ignore_nodes, std::current_exception()));
|
||||
}
|
||||
}
|
||||
|
||||
bool primary_replica_only = options.primary_range;
|
||||
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
|
||||
co_await repair_tablets(id, keyspace, cfs, primary_replica_only, options.ranges, options.data_centers, hosts, ignore_nodes, ranges_parallelism);
|
||||
co_return id.id;
|
||||
throw tablets_unsupported();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2271,197 +2224,6 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes), replaced_node);
|
||||
}
|
||||
|
||||
static std::unordered_set<locator::host_id> get_token_owners_in_dcs(std::vector<sstring> data_centers, locator::effective_replication_map_ptr erm) {
|
||||
auto dc_endpoints_map = erm->get_token_metadata().get_datacenter_token_owners();
|
||||
std::unordered_set<locator::host_id> dc_endpoints;
|
||||
for (const sstring& dc : data_centers) {
|
||||
auto it = dc_endpoints_map.find(dc);
|
||||
if (it == dc_endpoints_map.end()) {
|
||||
throw std::runtime_error(fmt::format("Unknown dc={}", dc));
|
||||
}
|
||||
for (const auto& endpoint : it->second) {
|
||||
dc_endpoints.insert(endpoint);
|
||||
}
|
||||
}
|
||||
return dc_endpoints;
|
||||
}
|
||||
|
||||
// Repair all tablets belong to this node for the given table
|
||||
future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_name, std::vector<sstring> table_names, bool primary_replica_only, dht::token_range_vector ranges_specified, std::vector<sstring> data_centers, std::unordered_set<locator::host_id> hosts, std::unordered_set<locator::host_id> ignore_nodes, std::optional<int> ranges_parallelism) {
|
||||
std::vector<tablet_repair_task_meta> task_metas;
|
||||
for (auto& table_name : table_names) {
|
||||
lw_shared_ptr<replica::table> t;
|
||||
try {
|
||||
t = _db.local().find_column_family(keyspace_name, table_name).shared_from_this();
|
||||
} catch (replica::no_such_column_family& e) {
|
||||
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
|
||||
continue;
|
||||
}
|
||||
if (!t->uses_tablets()) {
|
||||
throw std::runtime_error(format("repair[{}] Table {}.{} is not a tablet table", rid.uuid(), keyspace_name, table_name));
|
||||
}
|
||||
table_id tid = t->schema()->id();
|
||||
// Invoke group0 read barrier before obtaining erm pointer so that it sees all prior metadata changes
|
||||
auto dropped = !utils::get_local_injector().enter("repair_tablets_no_sync") &&
|
||||
co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
|
||||
if (dropped) {
|
||||
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
|
||||
continue;
|
||||
}
|
||||
locator::effective_replication_map_ptr erm;
|
||||
while (true) {
|
||||
_repair_module->check_in_shutdown();
|
||||
erm = t->get_effective_replication_map();
|
||||
auto local_version = erm->get_token_metadata().get_version();
|
||||
const locator::tablet_map& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
|
||||
if (!tmap.has_transitions() && co_await container().invoke_on(0, [local_version] (repair_service& rs) {
|
||||
// We need to ensure that there is no ongoing global request.
|
||||
return local_version == rs._tsm.local()._topology.version && !rs._tsm.local()._topology.is_busy();
|
||||
})) {
|
||||
break;
|
||||
}
|
||||
rlogger.info("repair[{}] Topology is busy, waiting for it to quiesce", rid.uuid());
|
||||
erm = nullptr;
|
||||
co_await container().invoke_on(0, [] (repair_service& rs) {
|
||||
return rs._tsm.local().await_not_busy();
|
||||
});
|
||||
rlogger.info("repair[{}] Topology quiesced", rid.uuid());
|
||||
}
|
||||
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
|
||||
struct repair_tablet_meta {
|
||||
locator::tablet_id id;
|
||||
dht::token_range range;
|
||||
locator::host_id master_host_id;
|
||||
shard_id master_shard_id;
|
||||
locator::tablet_replica_set replicas;
|
||||
};
|
||||
std::vector<repair_tablet_meta> metas;
|
||||
auto myhostid = erm->get_token_metadata_ptr()->get_my_id();
|
||||
auto mydc = erm->get_topology().get_datacenter();
|
||||
bool select_primary_ranges_within_dc = false;
|
||||
// If the user specified the ranges option, ignore the primary_replica_only option.
|
||||
// Since the ranges are requested explicitly.
|
||||
if (!ranges_specified.empty()) {
|
||||
primary_replica_only = false;
|
||||
}
|
||||
if (primary_replica_only) {
|
||||
// The logic below follows existing vnode table repair.
|
||||
// When "primary_range" option is on, neither data_centers nor hosts
|
||||
// may be set, except data_centers may contain only local DC (-local)
|
||||
if (data_centers.size() == 1 && data_centers[0] == mydc) {
|
||||
select_primary_ranges_within_dc = true;
|
||||
} else if (data_centers.size() > 0 || hosts.size() > 0) {
|
||||
throw std::runtime_error("You need to run primary range repair on all nodes in the cluster.");
|
||||
}
|
||||
}
|
||||
if (!hosts.empty()) {
|
||||
if (!hosts.contains(myhostid)) {
|
||||
throw std::runtime_error("The current host must be part of the repair");
|
||||
}
|
||||
}
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
|
||||
auto range = tmap.get_token_range(id);
|
||||
auto& replicas = info.replicas;
|
||||
|
||||
if (primary_replica_only) {
|
||||
const auto pr = select_primary_ranges_within_dc ? tmap.get_primary_replica_within_dc(id, erm->get_topology(), mydc) : tmap.get_primary_replica(id);
|
||||
if (pr.host == myhostid) {
|
||||
metas.push_back(repair_tablet_meta{id, range, myhostid, pr.shard, replicas});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
shard_id master_shard_id;
|
||||
// Repair all tablets belong to this node
|
||||
for (auto& r : replicas) {
|
||||
if (r.host == myhostid) {
|
||||
master_shard_id = r.shard;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
metas.push_back(repair_tablet_meta{id, range, myhostid, master_shard_id, replicas});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
std::unordered_set<locator::host_id> dc_endpoints = get_token_owners_in_dcs(data_centers, erm);
|
||||
if (!data_centers.empty() && !dc_endpoints.contains(myhostid)) {
|
||||
throw std::runtime_error("The current host must be part of the repair");
|
||||
}
|
||||
|
||||
size_t nr = 0;
|
||||
for (auto& m : metas) {
|
||||
nr++;
|
||||
rlogger.debug("repair[{}] Collect {} out of {} tablets: table={}.{} tablet_id={} range={} replicas={} primary_replica_only={}",
|
||||
rid.uuid(), nr, metas.size(), keyspace_name, table_name, m.id, m.range, m.replicas, primary_replica_only);
|
||||
std::vector<locator::host_id> nodes;
|
||||
auto master_shard_id = m.master_shard_id;
|
||||
auto range = m.range;
|
||||
dht::token_range_vector intersection_ranges;
|
||||
if (!ranges_specified.empty()) {
|
||||
for (auto& r : ranges_specified) {
|
||||
// When the management tool, e.g., scylla manager, uses
|
||||
// ranges option to select which ranges to repair for a
|
||||
// tablet table to schedule repair work, it needs to
|
||||
// disable tablet migration to make sure the node and token
|
||||
// range mapping does not change.
|
||||
//
|
||||
// If the migration is not disabled, the ranges provided by
|
||||
// the user might not match exactly with the token range of the
|
||||
// tablets. We do the intersection of the ranges to repair
|
||||
// as a best effort.
|
||||
auto intersection_opt = range.intersection(r, dht::token_comparator());
|
||||
if (intersection_opt) {
|
||||
intersection_ranges.push_back(intersection_opt.value());
|
||||
rlogger.debug("repair[{}] Select tablet table={}.{} tablet_id={} range={} replicas={} primary_replica_only={} ranges_specified={} intersection_ranges={}",
|
||||
rid.uuid(), keyspace_name, table_name, m.id, m.range, m.replicas, primary_replica_only, ranges_specified, intersection_opt.value());
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
} else {
|
||||
intersection_ranges.push_back(range);
|
||||
}
|
||||
if (intersection_ranges.empty()) {
|
||||
rlogger.debug("repair[{}] Skip tablet table={}.{} tablet_id={} range={} replicas={} primary_replica_only={} ranges_specified={}",
|
||||
rid.uuid(), keyspace_name, table_name, m.id, m.range, m.replicas, primary_replica_only, ranges_specified);
|
||||
continue;
|
||||
}
|
||||
std::vector<shard_id> shards;
|
||||
for (auto& r : m.replicas) {
|
||||
auto shard = r.shard;
|
||||
if (r.host != myhostid) {
|
||||
bool select = data_centers.empty() ? true : dc_endpoints.contains(r.host);
|
||||
|
||||
if (select && !hosts.empty()) {
|
||||
select = hosts.contains(r.host);
|
||||
}
|
||||
|
||||
if (select && !ignore_nodes.empty()) {
|
||||
select = !ignore_nodes.contains(r.host);
|
||||
}
|
||||
|
||||
if (select) {
|
||||
rlogger.debug("repair[{}] Repair get neighbors table={}.{} hostid={} shard={} myhostid={}",
|
||||
rid.uuid(), keyspace_name, table_name, r.host, shard, myhostid);
|
||||
shards.push_back(shard);
|
||||
nodes.push_back(r.host);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& r : intersection_ranges) {
|
||||
rlogger.debug("repair[{}] Repair tablet task table={}.{} master_shard_id={} range={} neighbors={} replicas={}",
|
||||
rid.uuid(), keyspace_name, table_name, master_shard_id, r, repair_neighbors(nodes, shards).shard_map, m.replicas);
|
||||
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, tid, master_shard_id, r, repair_neighbors(nodes, shards), m.replicas, erm});
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::tablet_repair_task_impl>({}, rid, keyspace_name, tasks::task_id::create_null_id(), table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism, service::default_session_id);
|
||||
}
|
||||
|
||||
// It is called by the repair_tablet rpc verb to repair the given tablet
|
||||
future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas) {
|
||||
auto id = _repair_module->new_repair_uniq_id();
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdexcept>
|
||||
#include <unordered_map>
|
||||
#include <exception>
|
||||
#include <fmt/core.h>
|
||||
@@ -27,6 +28,10 @@
|
||||
#include "tasks/types.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
|
||||
struct tablets_unsupported : std::runtime_error {
|
||||
tablets_unsupported() : std::runtime_error("tablets are not supported for this operation") {}
|
||||
};
|
||||
|
||||
namespace tasks {
|
||||
namespace repair {
|
||||
class task_manager_module;
|
||||
@@ -91,6 +96,7 @@ constexpr shard_id repair_unspecified_shard = shard_id(-1);
|
||||
// repair_get_status(). The returned future<int> becomes available quickly,
|
||||
// as soon as repair_get_status() can be used - it doesn't wait for the
|
||||
// repair to complete.
|
||||
// repair_start() repairs only vnode keyspaces.
|
||||
future<int> repair_start(seastar::sharded<repair_service>& repair, sharded<gms::gossip_address_map>& am,
|
||||
sstring keyspace, std::unordered_map<sstring, sstring> options);
|
||||
|
||||
|
||||
@@ -178,8 +178,6 @@ private:
|
||||
shared_ptr<node_ops_info> ops_info);
|
||||
|
||||
public:
|
||||
future<> repair_tablets(repair_uniq_id id, sstring keyspace_name, std::vector<sstring> table_names, bool primary_replica_only = true, dht::token_range_vector ranges_specified = {}, std::vector<sstring> dcs = {}, std::unordered_set<locator::host_id> hosts = {}, std::unordered_set<locator::host_id> ignore_nodes = {}, std::optional<int> ranges_parallelism = std::nullopt);
|
||||
|
||||
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas);
|
||||
private:
|
||||
|
||||
|
||||
@@ -165,8 +165,9 @@ async def do_batchlog_flush_in_repair(manager, cache_time_in_ms):
|
||||
nr_repairs = 2 * nr_repairs_per_node
|
||||
total_repair_duration = 0
|
||||
|
||||
cfg = { 'tablets_mode_for_new_keyspaces': 'disabled' }
|
||||
cmdline = ["--repair-hints-batchlog-flush-cache-time-in-ms", str(cache_time_in_ms), "--smp", "1", "--logger-log-level", "api=trace"]
|
||||
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
node1, node2 = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
@@ -216,41 +217,6 @@ async def test_batchlog_flush_in_repair_with_cache(manager):
|
||||
async def test_batchlog_flush_in_repair_without_cache(manager):
|
||||
await do_batchlog_flush_in_repair(manager, 0);
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_abort(manager):
|
||||
cfg = {'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
||||
cql = manager.get_cql()
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
|
||||
|
||||
await manager.api.client.post(f"/task_manager/ttl", params={ "ttl": "100000" },
|
||||
host=servers[0].ip_addr)
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "repair_tablet_repair_task_impl_run", False, {})
|
||||
|
||||
# Start repair.
|
||||
sequence_number = await manager.api.client.post_json(f"/storage_service/repair_async/ks", host=servers[0].ip_addr)
|
||||
|
||||
# Get repair id.
|
||||
stats_list = await manager.api.client.get_json("/task_manager/list_module_tasks/repair", host=servers[0].ip_addr)
|
||||
ids = [stats["task_id"] for stats in stats_list if stats["sequence_number"] == sequence_number]
|
||||
assert len(ids) == 1
|
||||
id = ids[0]
|
||||
|
||||
# Abort repair.
|
||||
await manager.api.client.post("/storage_service/force_terminate_repair", host=servers[0].ip_addr)
|
||||
|
||||
await manager.api.message_injection(servers[0].ip_addr, "repair_tablet_repair_task_impl_run")
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "repair_tablet_repair_task_impl_run")
|
||||
|
||||
# Check if repair was aborted.
|
||||
await manager.api.client.get_json(f"/task_manager/wait_task/{id}", host=servers[0].ip_addr)
|
||||
statuses = await manager.api.client.get_json(f"/task_manager/task_status_recursive/{id}", host=servers[0].ip_addr)
|
||||
assert all([status["state"] == "failed" for status in statuses])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_keyspace_drop_during_data_sync_repair(manager):
|
||||
|
||||
@@ -1088,46 +1088,6 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
|
||||
logger.info("Waiting for migrations to complete")
|
||||
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_split_finalization_with_repair(manager: ManagerClient):
|
||||
injection = "handle_tablet_resize_finalization_wait"
|
||||
cfg = {
|
||||
'enable_tablets': True,
|
||||
'error_injections_at_startup': [
|
||||
injection,
|
||||
"repair_tablets_no_sync",
|
||||
'short_tablet_stats_refresh_interval',
|
||||
]
|
||||
}
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH compaction = {'class': 'NullCompactionStrategy'};")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
||||
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
|
||||
|
||||
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
||||
marks = [await log.mark() for log in logs]
|
||||
|
||||
logger.info("Trigger split in table")
|
||||
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
|
||||
|
||||
logger.info("Wait for tablets to split")
|
||||
done, pending = await asyncio.wait([asyncio.create_task(log.wait_for('handle_tablet_resize_finalization: waiting', from_mark=mark)) for log, mark in zip(logs, marks)], return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
async def repair():
|
||||
await manager.api.client.post(f"/storage_service/repair_async/test", host=servers[0].ip_addr)
|
||||
|
||||
async def check_repair_waits():
|
||||
await logs[0].wait_for("Topology is busy, waiting for it to quiesce", from_mark=marks[0])
|
||||
await manager.api.message_injection(servers[0].ip_addr, injection)
|
||||
|
||||
await asyncio.gather(repair(), check_repair_waits())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
from typing import Any
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
from cassandra.query import ConsistencyLevel
|
||||
|
||||
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
@@ -43,27 +43,6 @@ async def disable_injection_on(manager, error_name, servers):
|
||||
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
||||
await asyncio.gather(*errs)
|
||||
|
||||
async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo], keyspace, table = "test", ranges: str = ''):
|
||||
node = server.ip_addr
|
||||
await manager.servers_see_each_other(servers)
|
||||
live_nodes_wanted = [s.ip_addr for s in servers]
|
||||
live_nodes = await manager.api.get_alive_endpoints(node)
|
||||
live_nodes_wanted.sort()
|
||||
live_nodes.sort()
|
||||
assert live_nodes == live_nodes_wanted
|
||||
logger.info(f"Repair table on node {node} live_nodes={live_nodes} live_nodes_wanted={live_nodes_wanted}")
|
||||
await manager.api.repair(node, keyspace, table, ranges)
|
||||
|
||||
async def load_repair_history(cql, hosts):
|
||||
all_rows = []
|
||||
for host in hosts:
|
||||
logging.info(f'Query hosts={host}');
|
||||
rows = await cql.run_async("SELECT * from system.repair_history", host=host)
|
||||
all_rows += rows
|
||||
for row in all_rows:
|
||||
logging.info(f"Got repair_history_entry={row}")
|
||||
return all_rows
|
||||
|
||||
async def safe_server_stop_gracefully(manager, server_id, timeout: float = 60, reconnect: bool = False):
|
||||
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
|
||||
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
|
||||
@@ -489,244 +468,6 @@ async def test_table_dropped_during_streaming(manager: ManagerClient):
|
||||
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
|
||||
assert replica == (s1_host_id, 0)
|
||||
|
||||
@pytest.mark.repair
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = [
|
||||
'--logger-log-level', 'repair=trace',
|
||||
'--task-ttl-in-seconds', '3600', # Make sure the test passes with non-zero task_ttl.
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': 2} AND tablets = {'initial': 32}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
|
||||
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
||||
stmt.consistency_level = ConsistencyLevel.ONE
|
||||
|
||||
# Repair runs concurrently with tablet shuffling which exercises issues with serialization
|
||||
# of repair and tablet migration.
|
||||
#
|
||||
# We do it 30 times because it's been experimentally shown to be enough to trigger the issue with high probability.
|
||||
# Lack of proper synchronization would manifest as repair failure with the following cause:
|
||||
#
|
||||
# failed_because=std::runtime_error (multishard_writer: No shards for token 7505809055260144771 of test.test)
|
||||
#
|
||||
# ...which indicates that repair tried to stream data to a node which is no longer a tablet replica.
|
||||
repair_cycles = 30
|
||||
for i in range(repair_cycles):
|
||||
# Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes.
|
||||
inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, i]) for k in keys])
|
||||
|
||||
# Disable in the background so that repair is started with migrations in progress.
|
||||
# We need to disable balancing so that repair which blocks on migrations eventually gets unblocked.
|
||||
# Otherwise, shuffling would keep the topology busy forever.
|
||||
disable_balancing_future = asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr))
|
||||
|
||||
await repair_on_node(manager, servers[0], servers, ks)
|
||||
|
||||
await inserts_future
|
||||
await disable_balancing_future
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
key_count = len(keys)
|
||||
stmt = cql.prepare(f"SELECT * FROM {ks}.test;")
|
||||
stmt.consistency_level = ConsistencyLevel.ALL
|
||||
rows = await cql.run_async(stmt)
|
||||
assert len(rows) == key_count
|
||||
for r in rows:
|
||||
assert r.c == repair_cycles - 1
|
||||
|
||||
# Reproducer for race between split and repair: https://github.com/scylladb/scylladb/issues/19378
|
||||
# Verifies repair will not complete with sstables that still require split, causing split
|
||||
# execution to fail.
|
||||
@pytest.mark.repair
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_concurrent_tablet_repair_and_split(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = [
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
'--target-tablet-size-in-bytes', '1024',
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, config={
|
||||
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
|
||||
}, property_file=[
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"}
|
||||
])
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': 2} AND tablets = {'initial': 32}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(5000) # Enough keys to trigger repair digest mismatch with a high chance.
|
||||
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
||||
stmt.consistency_level = ConsistencyLevel.ONE
|
||||
|
||||
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
||||
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
s0_mark = await s0_log.mark()
|
||||
|
||||
await asyncio.gather(*[cql.run_async(stmt, [k, -1]) for k in keys])
|
||||
|
||||
# split decision is sstable size based, so data must be flushed first
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", False)
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Waiting for split prepare...")
|
||||
await s0_log.wait_for('Setting split ready sequence number to', from_mark=s0_mark)
|
||||
s0_mark = await s0_log.mark()
|
||||
logger.info("Waited for split prepare")
|
||||
|
||||
# Balancer is re-enabled later for split execution
|
||||
await asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr))
|
||||
|
||||
# Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes.
|
||||
inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, 1]) for k in keys])
|
||||
|
||||
await repair_on_node(manager, servers[0], servers, ks)
|
||||
|
||||
await inserts_future
|
||||
|
||||
logger.info("Waiting for split execute...")
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
await s0_log.wait_for('Detected tablet split for table', from_mark=s0_mark)
|
||||
await inject_error_one_shot_on(manager, "tablet_split_finalization_postpone", servers)
|
||||
logger.info("Waited for split execute...")
|
||||
|
||||
key_count = len(keys)
|
||||
stmt = cql.prepare(f"SELECT * FROM {ks}.test;")
|
||||
stmt.consistency_level = ConsistencyLevel.ALL
|
||||
rows = await cql.run_async(stmt)
|
||||
assert len(rows) == key_count
|
||||
|
||||
@pytest.mark.repair
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_missing_data_repair(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = [
|
||||
'--hinted-handoff-enabled', 'false',
|
||||
]
|
||||
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': 3} AND tablets = {'initial': 32}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
keys_list = [range(0, 100), range(100, 200), range(200, 300)]
|
||||
keys_for_server = dict([(s.server_id, keys_list[idx]) for idx, s in enumerate(servers)])
|
||||
keys = range(0, 300)
|
||||
|
||||
async def insert_with_down(down_server):
|
||||
logger.info(f"Stopped server {down_server.server_id}")
|
||||
logger.info(f"Insert into server {down_server.server_id}")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});")
|
||||
for k in keys_for_server[down_server.server_id]])
|
||||
|
||||
cql = await safe_rolling_restart(manager, servers, with_down=insert_with_down)
|
||||
|
||||
await repair_on_node(manager, servers[0], servers, ks)
|
||||
|
||||
async def check_with_down(down_node):
|
||||
logger.info("Checking table")
|
||||
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
cql = await safe_rolling_restart(manager, servers, with_down=check_with_down)
|
||||
|
||||
|
||||
@pytest.mark.repair
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_history(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = await manager.servers_add(3, auto_rack_dc="dc1")
|
||||
|
||||
rf = 3
|
||||
tablets = 8
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {tablets}}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f'Got hosts={hosts}');
|
||||
|
||||
await repair_on_node(manager, servers[0], servers, ks)
|
||||
|
||||
all_rows = await load_repair_history(cql, hosts)
|
||||
assert len(all_rows) == rf * tablets
|
||||
|
||||
@pytest.mark.repair
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_ranges_selection(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
|
||||
rf = 2
|
||||
tablets = 4
|
||||
nr_ranges = 0;
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {tablets}}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f'Got hosts={hosts}');
|
||||
|
||||
await repair_on_node(manager, servers[0], servers, ks, ranges='-4611686018427387905:-1,4611686018427387903:9223372036854775807')
|
||||
nr_ranges = nr_ranges + 2
|
||||
await repair_on_node(manager, servers[0], servers, ks, ranges='-2000:-1000,1000:2000')
|
||||
nr_ranges = nr_ranges + 2
|
||||
await repair_on_node(manager, servers[0], servers, ks, ranges='3000:-3000')
|
||||
# The wrap around range (3000, -3000] will produce the following intersection range
|
||||
# range=(minimum token,-4611686018427387905] ranges_specified={(3000,+inf), (-inf, -3000]} intersection_ranges=(minimum token,-4611686018427387905]
|
||||
# range=(-4611686018427387905,-1] ranges_specified={(3000,+inf), (-inf, -3000]} intersection_ranges=(-4611686018427387905,-3000]
|
||||
# range=(-1,4611686018427387903] ranges_specified={(3000,+inf), (-inf, -3000]} intersection_ranges=(3000,4611686018427387903]
|
||||
# range=(4611686018427387903,9223372036854775807] ranges_specified={(3000,+inf), (-inf, -3000]} intersection_ranges=(4611686018427387903,9223372036854775807]
|
||||
nr_ranges = nr_ranges + 4
|
||||
|
||||
all_rows = await load_repair_history(cql, hosts)
|
||||
assert len(all_rows) == rf * nr_ranges;
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_cleanup(manager: ManagerClient):
|
||||
cmdline = ['--smp=2', '--commitlog-sync=batch']
|
||||
|
||||
@@ -441,14 +441,26 @@ class ScyllaRESTAPIClient:
|
||||
|
||||
async def repair(self, node_ip: str, keyspace: str, table: str, ranges: str = '') -> None:
|
||||
"""Repair the given table and wait for it to complete"""
|
||||
if ranges:
|
||||
params = {"columnFamilies": table, "ranges": ranges}
|
||||
vnode_keyspaces = await self.client.get_json(f"/storage_service/keyspaces", host=node_ip, params={"replication": "vnodes"})
|
||||
if keyspace in vnode_keyspaces:
|
||||
if ranges:
|
||||
params = {"columnFamilies": table, "ranges": ranges}
|
||||
else:
|
||||
params = {"columnFamilies": table}
|
||||
sequence_number = await self.client.post_json(f"/storage_service/repair_async/{keyspace}", host=node_ip, params=params)
|
||||
status = await self.client.get_json(f"/storage_service/repair_status", host=node_ip, params={"id": str(sequence_number)})
|
||||
if status != 'SUCCESSFUL':
|
||||
raise Exception(f"Repair id {sequence_number} on node {node_ip} for table {keyspace}.{table} failed: status={status}")
|
||||
else:
|
||||
params = {"columnFamilies": table}
|
||||
sequence_number = await self.client.post_json(f"/storage_service/repair_async/{keyspace}", host=node_ip, params=params)
|
||||
status = await self.client.get_json(f"/storage_service/repair_status", host=node_ip, params={"id": str(sequence_number)})
|
||||
if status != 'SUCCESSFUL':
|
||||
raise Exception(f"Repair id {sequence_number} on node {node_ip} for table {keyspace}.{table} failed: status={status}")
|
||||
if ranges:
|
||||
raise ValueError(f"Ranges parameter is not supported for tablet keyspaces")
|
||||
params={
|
||||
"ks": keyspace,
|
||||
"table": table,
|
||||
"tokens": "all",
|
||||
"await_completion": "true",
|
||||
}
|
||||
await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, params=params)
|
||||
|
||||
def __get_autocompaction_url(self, keyspace: str, table: Optional[str] = None) -> str:
|
||||
"""Return autocompaction url for the given keyspace/table"""
|
||||
|
||||
@@ -14,105 +14,105 @@ def run_repair_and_wait(rest_api, keyspace):
|
||||
resp.raise_for_status()
|
||||
return sequence_number
|
||||
|
||||
def test_repair_task_progress_finished_task(cql, this_dc, rest_api):
|
||||
def test_repair_task_progress_finished_task(cql, this_dc, rest_api, test_keyspace_vnodes):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
# Insert some data.
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
keyspace = test_keyspace_vnodes
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
|
||||
sequence_number = run_repair_and_wait(rest_api, keyspace)
|
||||
sequence_number = run_repair_and_wait(rest_api, keyspace)
|
||||
|
||||
# Get all repairs.
|
||||
ids = [task["task_id"] for task in list_tasks(rest_api, "repair") if task["sequence_number"] == sequence_number]
|
||||
assert len(ids) == 1, "Wrong number of internal repair tasks"
|
||||
status_tree = get_task_status_recursively(rest_api, ids[0])
|
||||
status = status_tree[0]
|
||||
assert status["progress_completed"] == status["progress_total"], "Incorrect task progress"
|
||||
|
||||
assert "children_ids" in status, "Shard tasks weren't created"
|
||||
children = [s for s in status_tree if s["parent_id"] == status["id"]]
|
||||
assert all([child["progress_completed"] == child["progress_total"] for child in children]), "Some shard tasks have incorrect progress"
|
||||
|
||||
assert sum([child["progress_total"] for child in children]) == status["progress_total"], "Total progress of parent is not equal to children total progress sum"
|
||||
assert sum([child["progress_completed"] for child in children]) == status["progress_completed"], "Completed progress of parent is not equal to children completed progress sum"
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def test_repair_task_tree(cql, this_dc, rest_api, test_keyspace_vnodes):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
keyspace = test_keyspace_vnodes
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
|
||||
run_repair_and_wait(rest_api, keyspace)
|
||||
|
||||
ids = [task["task_id"] for task in list_tasks(rest_api, module_name)]
|
||||
assert ids, "repair task was not created"
|
||||
assert len(ids) == 1, "incorrect number of non-internal tasks"
|
||||
|
||||
status_tree = get_task_status_recursively(rest_api, ids[0])
|
||||
status = status_tree[0]
|
||||
assert status["state"] == "done", f"tasks with id {status['id']} failed"
|
||||
|
||||
repair_tree_depth = 1
|
||||
check_child_parent_relationship(rest_api, status_tree, status, False)
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def test_repair_task_progress(cql, this_dc, rest_api, test_keyspace_vnodes):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
# Insert some data.
|
||||
keyspace = test_keyspace_vnodes
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
|
||||
injection = "repair_shard_repair_task_impl_do_repair_ranges"
|
||||
with scylla_inject_error(rest_api, injection, True):
|
||||
resp = rest_api.send("POST", f"storage_service/repair_async/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
sequence_number = resp.json()
|
||||
|
||||
# Get all repairs.
|
||||
ids = [task["task_id"] for task in list_tasks(rest_api, "repair") if task["sequence_number"] == sequence_number]
|
||||
assert len(ids) == 1, "Wrong number of internal repair tasks"
|
||||
status_tree = get_task_status_recursively(rest_api, ids[0])
|
||||
status = status_tree[0]
|
||||
assert status["progress_completed"] == status["progress_total"], "Incorrect task progress"
|
||||
statuses = []
|
||||
while not statuses or "children_ids" not in statuses[0]:
|
||||
statuses = [get_task_status(rest_api, task["task_id"]) for task in list_tasks(rest_api, "repair") if task["sequence_number"] == sequence_number]
|
||||
assert len(statuses) == 1, "Wrong number of internal repair tasks"
|
||||
status = statuses[0]
|
||||
|
||||
assert "children_ids" in status, "Shard tasks weren't created"
|
||||
children = [s for s in status_tree if s["parent_id"] == status["id"]]
|
||||
assert all([child["progress_completed"] == child["progress_total"] for child in children]), "Some shard tasks have incorrect progress"
|
||||
for child_ident in status["children_ids"]:
|
||||
# Check if task state is correct.
|
||||
child_status = get_task_status(rest_api, child_ident["task_id"])
|
||||
assert child_status["state"] == "running", "Incorrect task progress"
|
||||
assert child_status["progress_completed"] * 2 <= child_status["progress_total"], "Incorrect task progress"
|
||||
|
||||
assert sum([child["progress_total"] for child in children]) == status["progress_total"], "Total progress of parent is not equal to children total progress sum"
|
||||
assert sum([child["progress_completed"] for child in children]) == status["progress_completed"], "Completed progress of parent is not equal to children completed progress sum"
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def test_repair_task_tree(cql, this_dc, rest_api):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
|
||||
run_repair_and_wait(rest_api, keyspace)
|
||||
|
||||
ids = [task["task_id"] for task in list_tasks(rest_api, module_name)]
|
||||
assert ids, "repair task was not created"
|
||||
assert len(ids) == 1, "incorrect number of non-internal tasks"
|
||||
|
||||
status_tree = get_task_status_recursively(rest_api, ids[0])
|
||||
status = status_tree[0]
|
||||
assert status["state"] == "done", f"tasks with id {status['id']} failed"
|
||||
|
||||
repair_tree_depth = 1
|
||||
check_child_parent_relationship(rest_api, status_tree, status, False)
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def test_repair_task_progress(cql, this_dc, rest_api):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
# Insert some data.
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
with new_test_table(cql, keyspace, schema) as t1:
|
||||
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [2, 'hello'])
|
||||
cql.execute(stmt, [3, 'world'])
|
||||
|
||||
injection = "repair_shard_repair_task_impl_do_repair_ranges"
|
||||
with scylla_inject_error(rest_api, injection, True):
|
||||
resp = rest_api.send("POST", f"storage_service/repair_async/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
sequence_number = resp.json()
|
||||
|
||||
# Get all repairs.
|
||||
statuses = []
|
||||
while not statuses or "children_ids" not in statuses[0]:
|
||||
statuses = [get_task_status(rest_api, task["task_id"]) for task in list_tasks(rest_api, "repair") if task["sequence_number"] == sequence_number]
|
||||
assert len(statuses) == 1, "Wrong number of internal repair tasks"
|
||||
status = statuses[0]
|
||||
|
||||
for child_ident in status["children_ids"]:
|
||||
# Check if task state is correct.
|
||||
child_status = get_task_status(rest_api, child_ident["task_id"])
|
||||
assert child_status["state"] == "running", "Incorrect task progress"
|
||||
assert child_status["progress_completed"] * 2 <= child_status["progress_total"], "Incorrect task progress"
|
||||
|
||||
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
|
||||
resp.raise_for_status()
|
||||
|
||||
child_status = wait_for_task(rest_api, status["id"])
|
||||
status_tree = get_task_status_recursively(rest_api, status["id"])
|
||||
for child_status in get_children(status_tree, status["id"]):
|
||||
assert child_status["progress_completed"] == child_status["progress_total"], "Incorrect task progress"
|
||||
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
|
||||
resp.raise_for_status()
|
||||
|
||||
child_status = wait_for_task(rest_api, status["id"])
|
||||
status_tree = get_task_status_recursively(rest_api, status["id"])
|
||||
for child_status in get_children(status_tree, status["id"]):
|
||||
assert child_status["progress_completed"] == child_status["progress_total"], "Incorrect task progress"
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
Reference in New Issue
Block a user