Compare commits

..

3 Commits

Author SHA1 Message Date
Piotr Smaron
d4c28690e1 db: fail reads and writes with local consistencty level to a DC with RF=0
When read or write operations are performed on a DC with RF=0 with LOCAL_QUORUM
or LOCAL_ONE consistency level, Cassandra throws `Unavailable` exception.
Scylla allowed such read operations and failed write operations with a cryptic:
"broken promise" error. This occured because the initial availability
check passed (quorum of 0 requires 0 replicas), but execution failed
later when no replicas existed to process the mutation.

This patch adds an explicit RF=0 validation for LOCAL_ONE and LOCAL_QUORUM that
throws before attempting operation execution.

The change also requires `test_query_dc_with_rf_0_does_not_crash_db` to be
upgraded. This testcase was asserting somewhat similar scenario, but wasn't
taking into account the whole matrix of combinations:
- scenarios: successful vs unsuccesful operation outcome
- local consistency levels: LOCAL_QUORUM & LOCAL_ONE
- operations: SELECT (read) & INSERT (write)

and so it's been extended to cover both the pre-existing and the current issues
and the whole matrix of combinations.

Fixes: scylladb/scylladb#27893
2026-01-22 12:49:45 +01:00
Piotr Smaron
9475659ae8 db: consistency_level: split local_quorum_for()
The core of `local_quorum_for()` has been extracted to
`get_replication_factor_for_dc()`, which is going to be used later,
while `local_quorum_for()` itself has been recreated using the exracted
part.
2026-01-22 12:49:23 +01:00
Piotr Smaron
0b3ee197b6 db: consistency_level: fix nrs -> nts abbreviation
`network_topology_strategy` was abbreviated with `nrs`, and not `nts`. I
think someone incorrectly assumed it's 'network Replication strategy', hence
nrs.
2026-01-22 12:48:37 +01:00
39 changed files with 257 additions and 405 deletions

View File

@@ -1318,7 +1318,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.")
, prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.")
, prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.")
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, true, "Enable Prometheus protobuf with native histogram. Set to false to force text exposition format.")
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, false, "If set allows the experimental Prometheus protobuf with native histogram")
, abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.")
, murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.")
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")

View File

@@ -31,19 +31,23 @@ size_t quorum_for(const locator::effective_replication_map& erm) {
return replication_factor ? (replication_factor / 2) + 1 : 0;
}
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
static size_t get_replication_factor_for_dc(const locator::effective_replication_map& erm, const sstring& dc) {
using namespace locator;
const auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
const network_topology_strategy* nrs =
const network_topology_strategy* nts =
static_cast<const network_topology_strategy*>(&rs);
size_t replication_factor = nrs->get_replication_factor(dc);
return replication_factor ? (replication_factor / 2) + 1 : 0;
return nts->get_replication_factor(dc);
}
return quorum_for(erm);
return erm.get_replication_factor();
}
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
auto rf = get_replication_factor_for_dc(erm, dc);
return rf ? (rf / 2) + 1 : 0;
}
size_t block_for_local_serial(const locator::effective_replication_map& erm) {
@@ -188,18 +192,30 @@ void assure_sufficient_live_nodes(
return pending <= live ? live - pending : 0;
};
auto make_rf_zero_error_msg = [cl] (const sstring& local_dc) {
return format("Cannot achieve consistency level {} in datacenter '{}' with replication factor 0. "
"Ensure the keyspace is replicated to this datacenter or use a non-local consistency level.", cl, local_dc);
};
const auto& topo = erm.get_topology();
const sstring& local_dc = topo.get_datacenter();
switch (cl) {
case consistency_level::ANY:
// local hint is acceptable, and local node is always live
break;
case consistency_level::LOCAL_ONE:
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, 1, 0);
}
if (topo.count_local_endpoints(live_endpoints) < topo.count_local_endpoints(pending_endpoints) + 1) {
throw exceptions::unavailable_exception(cl, 1, 0);
}
break;
case consistency_level::LOCAL_QUORUM: {
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, need, 0);
}
size_t local_live = topo.count_local_endpoints(live_endpoints);
size_t pending = topo.count_local_endpoints(pending_endpoints);
if (local_live < need + pending) {

View File

@@ -7261,9 +7261,6 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
});
}
// Refresh is triggered after table creation, need to make sure we see the new tablets.
co_await _group0->group0_server().read_barrier(&_group0_as);
using table_ids_t = std::unordered_set<table_id>;
const auto table_ids = co_await std::invoke([this] () -> future<table_ids_t> {
table_ids_t ids;

View File

@@ -119,8 +119,7 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
}
} // namespace
class topology_coordinator : public endpoint_lifecycle_subscriber
, public migration_listener::empty_listener {
class topology_coordinator : public endpoint_lifecycle_subscriber {
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
gms::gossiper& _gossiper;
netw::messaging_service& _messaging;
@@ -2030,7 +2029,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
// Compute the average tablet size of existing replicas
uint64_t tablet_size_sum = 0;
size_t replica_count = 0;
bool incomplete = false;
const locator::range_based_tablet_id rb_tid {gid.table, trange};
auto tsi = get_migration_streaming_info(get_token_metadata().get_topology(), tinfo, trinfo);
for (auto& r : tsi.read_from) {
@@ -2038,15 +2036,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
if (tablet_size_opt) {
tablet_size_sum += *tablet_size_opt;
replica_count++;
} else {
incomplete = true;
}
}
if (!incomplete) {
if (replica_count) {
new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
auto size = replica_count ? tablet_size_sum / replica_count : 0;
new_load_stats->tablet_stats.at(pending->host).tablet_sizes[gid.table][trange] = size;
new_load_stats->tablet_stats.at(pending->host).tablet_sizes[gid.table][trange] = tablet_size_sum / replica_count;
}
}
break;
@@ -2301,30 +2296,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
});
}
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {
// New tablets were allocated, we need per-tablet stats for them for tablet balancer to make progress.
trigger_load_stats_refresh();
}
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override {
trigger_load_stats_refresh();
}
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override {
// Tablet hints may have changed. Wake up so that load balancer re-evaluates tablet distribution.
_topo_sm.event.broadcast();
}
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
// Tablet distribution has changed. Wake up the load balancer.
_topo_sm.event.broadcast();
}
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {
// Tablet distribution has changed. Wake up the load balancer.
_topo_sm.event.broadcast();
}
future<> cancel_all_requests(group0_guard guard, std::unordered_set<raft::server_id> dead_nodes) {
utils::chunked_vector<canonical_mutation> muts;
std::vector<raft::server_id> reject_join;
@@ -3649,27 +3620,21 @@ public:
, _tablet_allocator(tablet_allocator)
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
, _cdc_gens(cdc_gens)
, _tablet_load_stats_refresh([this] {
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
return refresh_tablet_load_stats();
});
})
, _tablet_load_stats_refresh([this] { return refresh_tablet_load_stats(); })
, _ring_delay(ring_delay)
, _group0_holder(_group0.hold_group0_gate())
, _voter_handler(group0, topo_sm._topology, gossiper, feature_service)
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{
_db.get_notifier().register_listener(this);
}
{}
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
future<bool> maybe_run_upgrade();
future<> run();
future<> stop();
virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); };
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); };
virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
private:
tablet_ops_metrics _tablet_ops_metrics;
@@ -3719,6 +3684,10 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_t
future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard guard) {
rtlogger.debug("Evaluating tablet balance");
if (utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) {
co_await _tablet_load_stats_refresh.trigger();
}
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm, &_topo_sm._topology, &_sys_ks, {}, get_dead_nodes());
if (plan.empty()) {
@@ -3870,7 +3839,6 @@ future<> topology_coordinator::refresh_tablet_load_stats() {
rtlogger.debug("raft topology: Refreshed table load stats for all DC(s).");
_tablet_allocator.set_load_stats(make_lw_shared<const locator::load_stats>(std::move(stats)));
_topo_sm.event.broadcast(); // wake up load balancer.
}
future<> topology_coordinator::start_tablet_load_stats_refresher() {
@@ -3879,6 +3847,7 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() {
bool sleep = true;
try {
co_await _tablet_load_stats_refresh.trigger();
_topo_sm.event.broadcast(); // wake up load balancer.
} catch (raft::request_aborted&) {
rtlogger.debug("raft topology: Tablet load stats refresher aborted");
sleep = false;
@@ -4206,21 +4175,10 @@ future<> topology_coordinator::run() {
auto group0_voter_refresher = group0_voter_refresher_fiber();
auto vb_coordinator_fiber = run_view_building_coordinator();
std::optional<future<>> event_wait;
while (!_as.abort_requested()) {
bool sleep = false;
try {
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min));
if (!_topo_sm._topology.tstate && utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) {
co_await _tablet_load_stats_refresh.trigger();
}
if (!event_wait) {
event_wait = _topo_sm.event.wait();
}
auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation());
if (_rollback) {
@@ -4232,14 +4190,9 @@ future<> topology_coordinator::run() {
bool had_work = co_await handle_topology_transition(std::move(guard));
if (!had_work) {
co_await utils::get_local_injector().inject("wait-before-topology-coordinator-goes-to-sleep", utils::wait_for_message(30s));
// Nothing to work on. Wait for topology change event.
rtlogger.debug("topology coordinator fiber has nothing to do. Sleeping.");
_as.check();
auto f = std::move(*event_wait);
event_wait.reset();
co_await std::move(f);
co_await await_event();
rtlogger.debug("topology coordinator fiber got an event");
}
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s));
@@ -4256,10 +4209,6 @@ future<> topology_coordinator::run() {
co_await coroutine::maybe_yield();
}
if (event_wait && event_wait->available()) {
event_wait->ignore_ready_future();
}
co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
@@ -4272,8 +4221,6 @@ future<> topology_coordinator::run() {
}
future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this);
// if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle
// futures in _tablets in case any of them failed, before these failures

View File

@@ -119,7 +119,8 @@ async def test_multi_column_lwt_during_migration(manager: ManagerClient):
}
servers = await manager.servers_add(6, config=cfg)
await manager.disable_tablet_balancing()
for server in servers:
await manager.api.disable_tablet_balancing(server.ip_addr)
rf_max = len(servers) - 1
rf = random.randint(2, rf_max)

View File

@@ -32,7 +32,7 @@ async def pin_the_only_tablet(manager, keyspace_name, table_or_view_name, server
# We need to send load-balancing commands to one of the nodes and they
# will be propagated to all of them. Since we already know of
# target_server, let's just use that.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
tablet_token = 0 # Doesn't matter since there is one tablet
source_replicas = await get_tablet_replicas(manager, server, keyspace_name, table_or_view_name, tablet_token)
# We assume RF=1 so get_tablet_replicas() returns just one replica
@@ -297,7 +297,7 @@ async def test_mv_tablet_split(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -330,7 +330,7 @@ async def test_mv_tablet_split(manager: ManagerClient):
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for(f"Detected tablet split for table {ks}.tv", from_mark=s1_mark)
await check()

View File

@@ -56,7 +56,7 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
base_replicas = await get_tablet_replicas(manager, servers[0], ks, "test", 0)
logger.info(f'{ks}.test replicas: {base_replicas}')

View File

@@ -82,7 +82,9 @@ async def test_mv_retried_writes_reach_all_replicas(manager: ManagerClient) -> N
await wait_for_view(cql, 'mv_cf_view', node_count)
# Disable tablet balancing so that the slow node doesn't get tablets moved away from it.
await manager.disable_tablet_balancing()
for s in servers:
await manager.api.disable_tablet_balancing(s.ip_addr)
await manager.api.disable_tablet_balancing(server.ip_addr)
# Make sure that the slow node has a base table tablet and no view tablets, so that the
# view updates from it are remote. (using shard 0 and token 0 when moving tablets as they don't make a difference here)

View File

@@ -92,7 +92,7 @@ async def test_start_scylla_with_view_building_disabled(manager: ManagerClient):
async def test_view_building_with_tablet_move(manager: ManagerClient, build_mode: str):
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
table = 'test'

View File

@@ -105,7 +105,7 @@ async def test_mv_update_on_pending_replica(manager: ManagerClient, intranode):
cmd = ['--smp', '2']
servers = [await manager.server_add(config=cfg, cmdline=cmd)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -400,7 +400,7 @@ async def test_mv_write_during_migration(manager: ManagerClient, migration_type:
servers = await manager.servers_add(3, cmdline=cmdline)
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
tablets_enabled = migration_type.startswith("tablets")
if tablets_enabled:

View File

@@ -782,7 +782,7 @@ async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerCl
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
ks = 'ks'
@@ -953,7 +953,7 @@ async def test_restore_primary_replica_same_rack_scope_rack(manager: ManagerClie
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
@@ -1005,7 +1005,7 @@ async def test_restore_primary_replica_different_rack_scope_dc(manager: ManagerC
servers, host_ids = await create_cluster(topology, True, manager, logger, object_storage)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
@@ -1049,7 +1049,7 @@ async def test_restore_primary_replica_same_dc_scope_dc(manager: ManagerClient,
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
@@ -1101,7 +1101,7 @@ async def test_restore_primary_replica_different_dc_scope_all(manager: ManagerCl
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)

View File

@@ -391,8 +391,6 @@ async def init_tablet_transfer(manager: ManagerClient,
LOGGER.info("Cannot perform a tablet migration because rack='%s' has only %i node(s)", target_rack, len(viable_targets))
return
await manager.disable_tablet_balancing()
await manager.cql.run_async(
"CREATE KEYSPACE test"
" WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 3 } AND"
@@ -403,6 +401,8 @@ async def init_tablet_transfer(manager: ManagerClient,
*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k})") for k in range(256)]
)
await asyncio.gather(*[manager.api.disable_tablet_balancing(node_ip=s.ip_addr) for s in servers])
replicas = await get_all_tablet_replicas(
manager=manager,
server=servers[0],

View File

@@ -196,9 +196,9 @@ async def prepare_migration_test(manager: ManagerClient):
s = await manager.server_add(cmdline=extra_scylla_cmdline_options)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server()
await manager.disable_tablet_balancing()
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
@@ -360,7 +360,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC
await asyncio.gather(repair_task(), wait_and_check_none())
async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
cql = manager.get_cql()
insert = cql.prepare(f"INSERT INTO {keyspace}.{table}(pk, c) VALUES(?, ?)")
@@ -371,7 +371,7 @@ async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: st
await manager.api.flush_keyspace(server.ip_addr, keyspace)
async def prepare_merge(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
cql = manager.get_cql()
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table} WHERE pk={k};") for k in keys])
@@ -382,7 +382,7 @@ async def enable_tablet_balancing_and_wait(manager: ManagerClient, server: Serve
s1_log = await manager.server_open_log(server.server_id)
s1_mark = await s1_log.mark()
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(server.ip_addr)
await s1_log.wait_for(message, from_mark=s1_mark)
@@ -400,7 +400,7 @@ async def test_tablet_resize_task(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
table1 = "test1"
@@ -421,7 +421,7 @@ async def test_tablet_resize_task(manager: ManagerClient):
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
async def wait_and_check_status(server, type, keyspace, table):
task = (await wait_tasks_created(tm, server, module_name, 1, type, keyspace, table))[0]
@@ -441,7 +441,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
table1 = "test1"
@@ -464,7 +464,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
await enable_injection(manager, servers, injection)
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", keyspace, table1))[0]
@@ -501,7 +501,7 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
table1 = "test1"
@@ -515,7 +515,7 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
log = await manager.server_open_log(servers[0].server_id)

View File

@@ -1087,8 +1087,6 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
server = await manager.server_add(config=config, cmdline=cmdline)
alternator = get_alternator(server.ip_addr)
await manager.disable_tablet_balancing()
logger.info("Creating alternator test table")
table = alternator.create_table(TableName=unique_table_name(),
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],

View File

@@ -328,7 +328,7 @@ async def test_cdc_colocation(manager: ManagerClient):
test_data[i] = i * 10
await cql.run_async(f"INSERT INTO {ks}.test(pk, v) VALUES({i}, {i * 10})")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# create map that maps each stream_id to a list of all partitions it contains
rows = await cql.run_async(f"SELECT \"cdc$stream_id\" as sid, pk FROM {ks}.test_scylla_cdc_log")

View File

@@ -2,7 +2,6 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import aiohttp
import logging
import pytest
import requests
@@ -40,86 +39,3 @@ async def test_non_liveupdatable_config(manager):
await manager.server_update_config(server.server_id, liveupdatable_param, True)
await wait_for_config(manager, server, liveupdatable_param, True)
await wait_for_config(manager, server, not_liveupdatable_param, True)
# Default Prometheus metrics port
PROMETHEUS_PORT = 9180
# Accept header for requesting Prometheus protobuf format with native histograms
PROMETHEUS_PROTOBUF_ACCEPT_HEADER = 'application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited'
@pytest.mark.asyncio
async def test_prometheus_allow_protobuf_default(manager):
"""
Test that prometheus_allow_protobuf is enabled by default,
while ensuring the configuration can be changed if needed.
"""
logging.info("Starting server with default configuration")
server = await manager.server_add()
logging.info("Verify prometheus_allow_protobuf defaults to true")
await wait_for_config(manager, server, "prometheus_allow_protobuf", True)
logging.info("Test that the configuration can be explicitly disabled")
server2 = await manager.server_add(config={'prometheus_allow_protobuf': False})
await wait_for_config(manager, server2, "prometheus_allow_protobuf", False)
logging.info("Test that the configuration can be explicitly enabled")
server3 = await manager.server_add(config={'prometheus_allow_protobuf': True})
await wait_for_config(manager, server3, "prometheus_allow_protobuf", True)
@pytest.mark.asyncio
async def test_prometheus_protobuf_native_histogram(manager):
"""
Test that when prometheus_allow_protobuf is enabled, the server actually
returns metrics in protobuf format with native histogram support when requested.
"""
logging.info("Starting server with prometheus_allow_protobuf enabled")
server = await manager.server_add(config={'prometheus_allow_protobuf': True})
metrics_url = f"http://{server.ip_addr}:{PROMETHEUS_PORT}/metrics"
logging.info(f"Requesting metrics in protobuf format from {metrics_url}")
# Request metrics with Accept header for protobuf format
headers = {
'Accept': PROMETHEUS_PROTOBUF_ACCEPT_HEADER
}
async with aiohttp.ClientSession() as session:
async with session.get(metrics_url, headers=headers) as resp:
assert resp.status == 200, f"Expected status 200, got {resp.status}"
# Check that we got protobuf content type in response
content_type = resp.headers.get('Content-Type', '')
logging.info(f"Response Content-Type: {content_type}")
# When protobuf is supported and requested, we should get protobuf back
assert 'application/vnd.google.protobuf' in content_type, \
f"Expected protobuf content type, got: {content_type}"
# Read the response body
body = await resp.read()
# Verify we got non-empty protobuf data
assert len(body) > 0, "Expected non-empty protobuf response"
logging.info(f"Successfully received protobuf response with {len(body)} bytes")
logging.info("Test that disabling prometheus_allow_protobuf prevents protobuf responses")
server2 = await manager.server_add(config={'prometheus_allow_protobuf': False})
metrics_url2 = f"http://{server2.ip_addr}:{PROMETHEUS_PORT}/metrics"
async with aiohttp.ClientSession() as session:
async with session.get(metrics_url2, headers=headers) as resp:
assert resp.status == 200, "Fail reading metrics from {metrics_url2}"
content_type = resp.headers.get('Content-Type', '')
logging.info(f"Response Content-Type (protobuf disabled): {content_type}")
# When protobuf is disabled, we should get text format even if requested
# The server should return text/plain or not include protobuf in content-type
assert 'application/vnd.google.protobuf' not in content_type, \
f"Expected text format when protobuf disabled, got: {content_type}"
logging.info("Confirmed that protobuf is not returned when disabled")

View File

@@ -40,7 +40,7 @@ async def test_counter_updates_during_tablet_migration(manager: ManagerClient, m
servers = await manager.servers_add(node_count, cmdline=cmdline)
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets={'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.counters (pk int PRIMARY KEY, c counter)")

View File

@@ -52,7 +52,7 @@ async def test_file_streaming_respects_encryption(manager: ManagerClient, workdi
cmdline = ['--smp=1']
servers = []
servers.append(await manager.server_add(config=cfg, cmdline=cmdline))
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.cql
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)

View File

@@ -84,7 +84,7 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
# This should be done before adding the last two servers,
# otherwise it can break the version == fence_version condition
# which the test relies on.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[2].ip_addr)
logger.info('Creating new tables')
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
@@ -146,7 +146,7 @@ async def test_fence_hints(request, manager: ManagerClient):
# This should be done before adding the last two servers,
# otherwise it can break the version == fence_version condition
# which the test relies on.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(s0.ip_addr)
[s1, s2] = await manager.servers_add(2, property_file=[
{"dc": "dc1", "rack": "r2"},
@@ -416,7 +416,7 @@ async def test_fenced_out_on_tablet_migration_while_handling_paxos_verb(manager:
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
logger.info("Disable tablet balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a test keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:

View File

@@ -374,7 +374,8 @@ async def test_hint_to_pending(manager: ManagerClient):
{"dc": "dc1", "rack": "r1"},
])
cql = await manager.get_cql_exclusive(servers[0])
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
table = f"{ks}.t"

View File

@@ -13,7 +13,8 @@ import pytest
from cassandra.policies import WhiteListRoundRobinPolicy
from test.cqlpy import nodetool
from cassandra import ConsistencyLevel
from cassandra import ConsistencyLevel, Unavailable
from cassandra.cluster import NoHostAvailable
from cassandra.protocol import InvalidRequest, ConfigurationException
from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
@@ -113,14 +114,19 @@ async def test_putget_2dc_with_rf(
@pytest.mark.asyncio
async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureRequest, manager: ManagerClient):
"""Test querying dc with CL=LOCAL_QUORUM when RF=0 for this dc, does not crash the node and returns None
Covers https://github.com/scylladb/scylla/issues/8354"""
async def test_read_or_write_to_dc_with_rf_0_fails(request: pytest.FixtureRequest, manager: ManagerClient):
"""
Verifies that operations using local consistency levels (LOCAL_QUORUM, LOCAL_ONE) fail
with a clear, actionable error message when the datacenter has replication factor 0.
Covers:
- https://github.com/scylladb/scylla/issues/8354 - Operations should not crash the DB
- https://github.com/scylladb/scylladb/issues/27893 - Should give a clear error about RF=0
"""
servers = []
ks = "test_ks"
table_name = "test_table_name"
expected = ["k1", "value1"]
dc_replication = {'dc2': 0}
dc_replication = {'dc1': 1, 'dc2': 0}
columns = [Column("name", TextType), Column("value", TextType)]
for i in [1, 2]:
@@ -136,18 +142,40 @@ async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureReque
random_tables = RandomTables(request.node.name, manager, ks, 1, dc_replication)
await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name)
dc1_connection.execute(
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('{expected[0]}', '{expected[1]}');")
select_query = SimpleStatement(f"SELECT * from {ks}.{table_name};",
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
nodetool.flush(dc1_connection, "{ks}.{table_name}")
first_node_results = list(dc1_connection.execute(select_query).one())
second_node_result = dc2_connection.execute(select_query).one()
assert first_node_results == expected, \
f"Expected {expected} from {select_query.query_string}, but got {first_node_results}"
assert second_node_result is None, \
f"Expected no results from {select_query.query_string}, but got {second_node_result}"
# sanity check: operations from dc1 (RF > 0) should succeed with all local consistency levels
local_cls = [ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_ONE]
for i, cl in enumerate(local_cls):
dc1_connection.execute(SimpleStatement(
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('k{i}', 'value{i}')", consistency_level=cl))
nodetool.flush(dc1_connection, f"{ks}.{table_name}")
select_query = SimpleStatement(f"SELECT * FROM {ks}.{table_name} WHERE {columns[0].name} = 'k{i}'", consistency_level=cl)
# asserting SELECT's results may seem excessive, but it could happen that it'd fail and return nothing and not throw,
# in which case it'd silently return nothing, which we don't want, and hence are asserting it's *really* working
result = list(dc1_connection.execute(select_query).one())
expected_row = [f"k{i}", f"value{i}"]
assert result == expected_row, \
f"Expected {expected_row} with CL={cl} from dc1, but got {result}"
def assert_operation_fails_with_rf0_error(cl: ConsistencyLevel, operation: str) -> None:
with pytest.raises((Unavailable, NoHostAvailable)) as exc_info:
dc2_connection.execute(SimpleStatement(operation, consistency_level=cl))
error_msg = str(exc_info.value)
assert "Cannot achieve consistency level LOCAL_" in error_msg and "use a non-local consistency level" in error_msg, \
f"Expected error indicating RF=0 and datacenter with CL={cl}, but received: {exc_info.value}"
# SELECT & INSERT from dc2 (with RF=0) using local CLs should fail with a clear error message
# indicating the replication factor is 0 and suggesting to use a non-local CL
for i, cl in enumerate(local_cls):
assert_operation_fails_with_rf0_error(cl,
f"SELECT * FROM {ks}.{table_name}")
assert_operation_fails_with_rf0_error(cl,
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('k_fail_{i}', 'value_fail_{i}')")
@pytest.mark.asyncio
async def test_create_and_alter_keyspace_with_altering_rf_and_racks(manager: ManagerClient):

View File

@@ -176,7 +176,7 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
# Disable load balancer on the topology coordinator node so that an ongoing tablet migration doesn't fail one of the
# check_system_topology_and_cdc_generations_v3_consistency calls below. A tablet migration can suddenly make
# version or fence_version inconsistent among nodes.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(live_servers[0].ip_addr)
cql, hosts = await manager.get_ready_cql(live_servers + new_servers)

View File

@@ -60,7 +60,7 @@ async def test_refresh_with_streaming_scopes(manager: ManagerClient, topology_rf
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
ks = 'ks'
cf = 'cf'
@@ -141,7 +141,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
ks = 'ks'
cf = 'cf'

View File

@@ -41,7 +41,7 @@ async def test_resurrection_while_file_streaming(manager: ManagerClient):
servers = [await manager.server_add(config=cfg)]
servers.append(await manager.server_add(config=cfg))
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)

View File

@@ -24,7 +24,7 @@ async def test_partitioned_sstable_set(manager: ManagerClient, mode):
cmdline = ['--smp=1']
server = await manager.server_add(config=cfg, cmdline=cmdline)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)

View File

@@ -151,7 +151,8 @@ async def test_reshape_with_tablets(manager: ManagerClient):
async def test_tablet_rf_change(manager: ManagerClient, direction):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
await manager.disable_tablet_balancing()
for s in servers:
await manager.api.disable_tablet_balancing(s.ip_addr)
cql = manager.get_cql()
res = await cql.run_async("SELECT data_center FROM system.local")
@@ -252,7 +253,7 @@ async def test_tablets_api_consistency(manager: ManagerClient, endpoint):
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack1'})
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack2'})
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack3'})
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
hosts = { await manager.get_host_id(s.server_id): s.ip_addr for s in servers }
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
@@ -520,7 +521,6 @@ async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode
cfg['error_injections_at_startup'] = [{'name': 'querier-cache-ttl-seconds', 'value': 999999999}]
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
@@ -595,7 +595,7 @@ async def test_read_of_pending_replica_during_migration(manager: ManagerClient,
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
@@ -667,7 +667,7 @@ async def test_explicit_tablet_movement_during_decommission(manager: ManagerClie
#
# Load balancing being enabled or disabled is a cluster-global property; we can use any node to toggle it.
logger.info("Disabling load balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Populating tablet")
# Create a table with just one partition and RF=1, so we have exactly one tablet.
@@ -831,7 +831,7 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create table, populate it and flush the table to disk")
cql = manager.get_cql()
@@ -896,7 +896,7 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
@@ -985,7 +985,7 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
'--logger-log-level', 'raft_topology=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
@@ -998,7 +998,7 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
logger.info("Start Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
s1_host_id = await manager.get_host_id(servers[1].server_id)
@@ -1214,7 +1214,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# create a table so that it has at least 2 tablets (and storage groups) per shard
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
@@ -1230,7 +1230,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
# enable the load balancer which should emmit a tablet split
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# wait for compaction groups to be created and split to begin
await s0_log.wait_for('split_storage_groups_wait: wait')
@@ -1260,7 +1260,7 @@ async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
cql = manager.get_cql()
# We don't want the load balancer to migrate tablets during the test
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
# Create the table, insert data and flush
@@ -1306,7 +1306,7 @@ async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
async def test_two_tablets_concurrent_repair_and_migration(manager: ManagerClient):
injection = "repair_shard_repair_task_impl_do_repair_ranges"
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
all_replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
all_replicas.sort(key=lambda x: x.last_token)
@@ -1384,7 +1384,7 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
s0_host_id = await manager.get_host_id(servers[0].server_id)
for cf in ["test", "blocker"]:
logger.info(f"Move all tablets of test.{cf} from Node 2 to Node 1")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
s1_replicas = await get_all_tablet_replicas(manager, servers[1], "test", cf)
migration_tasks = [
manager.api.move_tablet(servers[0].ip_addr, "test", cf,
@@ -1395,7 +1395,7 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
await asyncio.gather(*migration_tasks)
logger.info("Re-enable tablet balancing; it should be blocked by pending split finalization")
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
mark, _ = await log.wait_for("Setting tablet balancing to true")
logger.info("Unblock resize finalisation and verify that the finalisation is preferred over migrations")
@@ -1421,7 +1421,7 @@ async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(m
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async def insert_with_down(down_server):
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
@@ -1461,13 +1461,12 @@ async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server("r1")
await make_server("r1")
await make_server("r2")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
@@ -1551,7 +1550,6 @@ async def test_moving_replica_to_replica(manager: ManagerClient):
# For convenience when moving tablets.
cmdline = ["--smp=1"]
s1, s2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
await manager.disable_tablet_balancing()
host_id1 = await manager.get_host_id(s1.server_id)
host_id2 = await manager.get_host_id(s2.server_id)
@@ -1590,7 +1588,6 @@ async def test_moving_replica_within_single_rack(manager: ManagerClient):
# For convenience when moving tablets.
cmdline = ["--smp=1"]
s1, s2 = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "r1"})
await manager.disable_tablet_balancing()
host_id1 = await manager.get_host_id(s1.server_id)
host_id2 = await manager.get_host_id(s2.server_id)
@@ -1637,39 +1634,4 @@ async def test_disabling_balancing_preempts_balancer(manager: ManagerClient):
await log.wait_for('Initiating tablet', from_mark=mark)
# Should preempt balancing
await manager.disable_tablet_balancing()
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_table_creation_wakes_up_balancer(manager: ManagerClient):
"""
Reproduces both https://github.com/scylladb/scylladb/issues/25163 and https://github.com/scylladb/scylladb/issues/27958
Scenario:
1. Start a cluster
2. Block the topology coordinator right before it goes to sleep
3. Create a table, which should wake up the coordinator
4. Verify that the coordinator didn't go to sleep
"""
server = await manager.server_add()
log = await manager.server_open_log(server.server_id)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
# Block coordinator right before going to sleep
# We use node bootstrap as an operation which is going to be trapped on exit, but it's arbitrary.
mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep', one_shot=True)
await manager.server_add()
await log.wait_for('wait-before-topology-coordinator-goes-to-sleep: wait', from_mark=mark)
# Create a table, which should prevent the coordinator from sleeping
await manager.api.enable_injection(server.ip_addr, 'wait-after-topology-coordinator-gets-event', one_shot=True)
mark = await log.mark()
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# Verify it didn't go to sleep. If it did, the wait for wakeup would be 30s on average,
# up to stats refresh period, which is 60s. So use a small timeout.
await manager.api.message_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep')
await log.wait_for('wait-after-topology-coordinator-gets-event: wait', from_mark=mark, timeout=5)
await manager.api.disable_tablet_balancing(coord_srv.ip_addr)

View File

@@ -224,7 +224,7 @@ async def get_two_servers_to_move_tablet(manager: ManagerClient):
cmdline = ['--enable-file-stream', 'false']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
@@ -330,7 +330,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -405,7 +405,7 @@ async def test_table_dropped_during_streaming(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -475,7 +475,7 @@ async def test_tablet_cleanup(manager: ManagerClient):
logger.info("Start first node")
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Populate table")
cql = manager.get_cql()
@@ -549,7 +549,7 @@ async def test_tablet_cleanup_failure(manager: ManagerClient):
# Disable load balancing, so that after the move_tablet API the load balancer does
# not attempt to migrate the tablet back to the first node.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
n_tablets = 1
@@ -631,7 +631,7 @@ async def test_tablet_split(manager: ManagerClient, injection_error: str):
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -671,7 +671,7 @@ async def test_tablet_split(manager: ManagerClient, injection_error: str):
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
time.sleep(5) # Give load balancer some time to do work
@@ -700,7 +700,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
servers.append(await manager.server_add(config={
'error_injections_at_startup': ['delay_split_compaction']
@@ -736,7 +736,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for('Finalizing resize decision for table', from_mark=s1_mark)
@@ -744,7 +744,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
time.sleep(1)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
await manager.server_start(servers[1].server_id)
@@ -752,7 +752,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
await manager.servers_see_each_other(servers)
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
@@ -769,7 +769,7 @@ async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, inj
cmdline = []
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -826,7 +826,7 @@ async def test_concurrent_table_drop_and_major(manager: ManagerClient):
cmdline = ['--logger-log-level', 'compaction_manager=debug',]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -913,7 +913,7 @@ async def test_tablet_count_metric_per_shard(manager: ManagerClient):
servers = await manager.servers_add(2, cmdline=cmdline)
# And given disabled load balancing
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# When two tables are created
cql = manager.get_cql()
@@ -1002,7 +1002,7 @@ async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_on
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
@@ -1063,14 +1063,14 @@ async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_on
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
# Trigger concurrent migration and load-and-stream
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await manager.api.load_new_sstables(servers[0].ip_addr, ks2, "test", primary_replica_only)
@@ -1131,7 +1131,7 @@ async def test_storage_service_api_uneven_ownership_keyspace_and_table_params_us
async def test_tablet_storage_freeing(manager: ManagerClient):
logger.info("Start first node")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
@@ -1173,7 +1173,7 @@ async def test_tablet_storage_freeing(manager: ManagerClient):
async def test_schema_change_during_cleanup(manager: ManagerClient):
logger.info("Start first node")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
@@ -1223,7 +1223,7 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -1265,7 +1265,7 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
logger.info("Enable balancing so split will be emitted")
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waits for split of sstable containing expired tombstones")
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
@@ -1421,7 +1421,8 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.debug("Temporarily disable tablet load balancing")
await manager.disable_tablet_balancing()
node1 = sorted(initial_servers.values(), key=lambda s: s.server_id)[0]
await manager.api.disable_tablet_balancing(node1.ip_addr)
logger.info("Add a new rack")
new_rack = f"rack{num_racks}"
@@ -1437,7 +1438,7 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
logger.debug("Reenable tablet load balancing")
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(node1.ip_addr)
live_servers: dict[ServerNum, ServerInfo] = dict()
dead_servers: dict[ServerNum, ServerInfo] = dict()
@@ -1505,7 +1506,6 @@ async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
cmdline = ['--smp=1']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
n_tablets = 1
@@ -1548,7 +1548,7 @@ async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, o
cfg = { 'auto_snapshot': True }
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}")
@@ -1672,7 +1672,7 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
initial_tablets = 2
@@ -1696,7 +1696,7 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
log_mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(server.ip_addr)
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
@@ -1738,7 +1738,7 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
}, cmdline=cmdline)]
server = servers[0]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
@@ -1786,14 +1786,14 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
@@ -1887,7 +1887,7 @@ async def test_update_load_stats_after_migration(manager: ManagerClient):
# Disable load balancing to avoid the balancer moving the tablet from a node with less to a node with more
# available disk space. Otherwise, the move_tablet API can fail (if the tablet is already in transisiton) or
# be a no-op (in case the tablet has already been migrated)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
@@ -1936,7 +1936,7 @@ async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -2009,7 +2009,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 2
@@ -2049,7 +2049,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie
token = 'all'
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
if repair_before_split:
await generate_repair_work()
@@ -2100,7 +2100,7 @@ async def test_split_and_intranode_synchronization(manager: ManagerClient):
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 1
@@ -2128,7 +2128,7 @@ async def test_split_and_intranode_synchronization(manager: ManagerClient):
dst_shard = 1
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(server.ip_addr)
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)

View File

@@ -32,13 +32,19 @@ async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def disable_tablet_balancing(manager, servers):
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
async def enable_tablet_balancing(manager, servers):
await asyncio.gather(*[manager.api.enable_tablet_balancing(s.ip_addr) for s in servers])
@asynccontextmanager
async def no_tablet_balancing(manager):
await manager.disable_tablet_balancing()
async def no_tablet_balancing(manager, servers):
await disable_tablet_balancing(manager, servers)
try:
yield
finally:
await manager.enable_tablet_balancing()
await enable_tablet_balancing(manager, servers)
async def wait_for_tablet_stage(manager, server, keyspace_name, table_name, token, stage):
async def tablet_is_in_stage():
@@ -107,7 +113,7 @@ async def test_move_tablet(manager: ManagerClient, move_table: str):
'--logger-log-level', 'raft_topology=debug',
]
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
await manager.disable_tablet_balancing()
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
cql = manager.get_cql()
@@ -188,7 +194,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -230,7 +236,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
await asyncio.sleep(2) # Give load balancer some time to do work
@@ -240,7 +246,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
await check()
async with no_tablet_balancing(manager):
async with no_tablet_balancing(manager, servers):
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
@@ -267,7 +273,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
keys = range(total_keys - 1, total_keys)
# To avoid race of major with migration
async with no_tablet_balancing(manager):
async with no_tablet_balancing(manager, servers):
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
@@ -287,7 +293,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
await s1_log.wait_for(f"Detected tablet merge for table {ks}.test", from_mark=s1_mark, timeout=60)
await s1_log.wait_for(f"Detected tablet merge for table {ks}.tv", from_mark=s1_mark, timeout=60)
async with no_tablet_balancing(manager):
async with no_tablet_balancing(manager, servers):
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
@@ -310,7 +316,7 @@ async def test_create_colocated_table_while_base_is_migrating(manager: ManagerCl
'--logger-log-level', 'raft_topology=debug',
]
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
await manager.disable_tablet_balancing()
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
cql = manager.get_cql()
@@ -397,7 +403,7 @@ async def test_repair_colocated_base_and_view(manager: ManagerClient):
'--logger-log-level', 'repair=debug',
]
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
await manager.disable_tablet_balancing()
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:

View File

@@ -35,7 +35,7 @@ async def test_intranode_migration(manager: ManagerClient):
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -70,7 +70,7 @@ async def test_crash_during_intranode_migration(manager: ManagerClient):
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
@@ -138,7 +138,7 @@ async def test_cross_shard_migration(manager: ManagerClient):
servers = await manager.servers_add(2, cmdline=cmdline)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:

View File

@@ -56,7 +56,7 @@ async def test_lwt(manager: ManagerClient):
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
quorum = len(hosts) // 2 + 1
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# We use capital letters to check that the proper quotes are used in paxos store queries.
ks = unique_name() + '_Test'
@@ -131,7 +131,7 @@ async def test_lwt_during_migration(manager: ManagerClient):
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
logger.info("Disable tablet balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
@@ -245,7 +245,7 @@ async def test_lwt_state_is_preserved_on_tablet_migration(manager: ManagerClient
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Disable tablet balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -354,7 +354,7 @@ async def test_lwt_state_is_preserved_on_tablet_rebuild(manager: ManagerClient):
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Disable tablet balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -650,7 +650,7 @@ async def test_lwt_coordinator_shard(manager: ManagerClient):
cql = manager.get_cql()
logger.info("Disable tablet balancing")
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:

View File

@@ -48,7 +48,7 @@ async def test_tablet_merge_simple(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
@@ -89,7 +89,7 @@ async def test_tablet_merge_simple(manager: ManagerClient):
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
time.sleep(2) # Give load balancer some time to do work
@@ -119,12 +119,12 @@ async def test_tablet_merge_simple(manager: ManagerClient):
keys = range(total_keys - 1, total_keys)
# To avoid race of major with migration
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
@@ -214,7 +214,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
for cycle in range(2):
logger.info("Running split-merge cycle #{}".format(cycle))
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Inserting data")
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
@@ -248,7 +248,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
logger.info("Enabling balancing")
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
topology_ops_task = asyncio.create_task(perform_topology_ops())
@@ -283,13 +283,13 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
# To avoid race of major with migration
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Flushing keyspace and performing major")
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waiting for merge decision")
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
@@ -457,7 +457,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
cfg = {'force_capacity_based_balancing': True}
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
@@ -479,7 +479,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True)
await manager.api.enable_injection(servers[0].ip_addr, "replica_merge_completion_wait", one_shot=True)
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
@@ -490,7 +490,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
await wait_for(finished_merging, time.time() + 120)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "take_storage_snapshot", one_shot=True)
await s0_log.wait_for(f"merge_completion_fiber: waiting", from_mark=s0_mark)
@@ -540,7 +540,7 @@ async def test_missing_data(manager: ManagerClient):
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
inital_tablets = 32
@@ -560,7 +560,7 @@ async def test_missing_data(manager: ManagerClient):
expected_tablet_count = inital_tablets // 2
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}")
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(server.ip_addr)
# wait for merge to complete
actual_tablet_count = 0
@@ -608,7 +608,7 @@ async def test_merge_with_drop(manager: ManagerClient):
cql = manager.get_cql()
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(server.ip_addr)
initial_tablets = 32
@@ -633,7 +633,7 @@ async def test_merge_with_drop(manager: ManagerClient):
s0_log = await manager.server_open_log(server.server_id)
s0_mark = await s0_log.mark()
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(server.ip_addr)
# wait for merge to complete
actual_tablet_count = 0

View File

@@ -37,13 +37,12 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
host = await manager.get_host_id(s.server_id)
hosts_by_rack[rack].append(host)
host_ids.append(host)
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server("r1")
await make_server("r1")
await make_server("r2")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
@@ -125,13 +124,11 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server("r1")
await make_server("r2")
await make_server("r3")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -275,13 +272,13 @@ async def test_tablet_back_and_forth_migration(manager: ManagerClient):
s = await manager.server_add(config=cfg)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
async def assert_rows(num):
res = await cql.run_async(f"SELECT * FROM {ks}.test")
assert len(res) == num
await make_server()
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
@@ -321,7 +318,7 @@ async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: M
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
servers = [await manager.server_add(config=cfg)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
@@ -429,7 +426,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
@@ -470,7 +467,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# Trigger tablet merge to reproduce #23481
table_id = await manager.get_table_id(ks, 'test')
@@ -490,7 +487,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
# Workaround for https://github.com/scylladb/scylladb/issues/21779. We don't want the keyspace drop at the end
# of new_test_keyspace to fail because of concurrent tablet migrations.
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@@ -503,7 +500,7 @@ async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:

View File

@@ -112,7 +112,7 @@ async def test_replace(manager: ManagerClient):
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
finish_writes = await start_writes(cql, ks3, "test2")
@@ -195,7 +195,7 @@ async def test_removenode(manager: ManagerClient):
# Disable migrations concurrent with removenode since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info('Removing a node')
await manager.server_stop(servers[0].server_id)
@@ -244,7 +244,7 @@ async def test_removenode_with_ignored_node(manager: ManagerClient):
# Disable migrations concurrent with removenode since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info('Removing a node with another node down')
await manager.server_stop(servers[0].server_id) # removed

View File

@@ -35,7 +35,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
# Background migrations concurrent with node shutdown can cause background (after CL is met)
# replica-side writes to fail due to barriers fencing out the node which is shutting down.
# This trips check_node_log_for_failed_mutations().
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info(f"Restarting node {servers[0]}")
await manager.server_stop_gracefully(servers[0].server_id)

View File

@@ -42,7 +42,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
# Background migrations concurrent with node shutdown can cause background (after CL is met)
# replica-side writes to fail due to barriers fencing out the node which is shutting down.
# This trips check_node_log_for_failed_mutations().
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info(f"Restarting node {servers[0]}")
await manager.server_stop_gracefully(servers[0].server_id)

View File

@@ -19,7 +19,7 @@ from cassandra.query import SimpleStatement
from cassandra.protocol import InvalidRequest
from test.cluster.util import new_test_keyspace
from test.cluster.test_view_building_coordinator import mark_all_servers, pause_view_building_tasks, \
unpause_view_building_tasks, wait_for_some_view_build_tasks_to_get_stuck
unpause_view_building_tasks, wait_for_some_view_build_tasks_to_get_stuck, disable_tablet_load_balancing_on_all_servers
logger = logging.getLogger(__name__)
@@ -626,7 +626,7 @@ async def test_view_build_status_marked_started_on_node_added_during_building(ma
'--logger-log-level', 'view_building_worker=debug',
])
cql, hosts = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await create_table(cql, ks)

View File

@@ -78,6 +78,10 @@ async def wait_for_message_on_any_server(manager: ManagerClient, message: str, m
async def wait_for_some_view_build_tasks_to_get_stuck(manager: ManagerClient, marks: list[int]):
return await wait_for_message_on_any_server(manager, "do_build_range: paused, waiting for message", marks)
async def disable_tablet_load_balancing_on_all_servers(manager: ManagerClient):
servers = await manager.running_servers()
await asyncio.gather(*(manager.api.disable_tablet_balancing(s.ip_addr) for s in servers))
async def populate_base_table(cql: Session, ks: str, tbl: str):
for i in range(ROW_COUNT):
await cql.run_async(f"INSERT INTO {ks}.{tbl} (key, c, v) VALUES ({i // ROWS_PER_PARTITION}, {i % ROWS_PER_PARTITION}, '{i}')")
@@ -153,7 +157,7 @@ async def test_build_two_views(manager: ManagerClient):
{"dc": "dc1", "rack": "r3"},
])
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -183,7 +187,7 @@ async def test_add_view_while_build_in_progress(manager: ManagerClient):
{"dc": "dc1", "rack": "r3"},
])
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -213,7 +217,7 @@ async def test_remove_some_view_while_build_in_progress(manager: ManagerClient):
node_count = 3
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -242,7 +246,7 @@ async def test_abort_building_by_remove_view(manager: ManagerClient):
node_count = 3
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -270,7 +274,7 @@ async def test_alter_base_schema_while_build_in_progress(manager: ManagerClient,
node_count = 3
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -320,7 +324,7 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
property_file=property_file)
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {old_rf}}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -358,7 +362,7 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
property_file=property_file)
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -404,7 +408,7 @@ async def test_leader_change_while_building(manager: ManagerClient):
])
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -438,7 +442,7 @@ async def test_truncate_while_building(manager: ManagerClient):
{"dc": "dc1", "rack": "r3"},
])
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
@@ -470,7 +474,7 @@ async def test_scylla_views_builds_in_progress(manager: ManagerClient, view_acti
{"dc": "dc1", "rack": "r3"},
])
cql, hosts = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async def check_scylla_views_builds_in_progress(expect_zero_rows: bool):
async def check():
@@ -511,7 +515,7 @@ async def test_scylla_views_builds_in_progress(manager: ManagerClient, view_acti
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_view_building_while_tablet_streaming_fail(manager: ManagerClient):
servers = [await manager.server_add(cmdline=cmdline_loggers)]
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
@@ -608,7 +612,7 @@ async def test_concurrent_tablet_migrations(manager: ManagerClient):
for property_file in rack_property_files:
servers.append(await manager.server_add(property_file=property_file, cmdline=cmdline_loggers))
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await pause_view_building_tasks(manager)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1} AND tablets = {'initial': 2}") as ks:
@@ -628,7 +632,7 @@ async def test_concurrent_tablet_migrations(manager: ManagerClient):
servers.append(await manager.server_add(property_file=property_file, cmdline=cmdline_loggers))
assert len(await get_nodes_which_are_tablet_replicas()) == 3
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# The effect of unpausing the balancer should be that all replicas are distributed evenly between nodes
# (1 base + 1 view tablet for each node).
@@ -680,7 +684,7 @@ async def test_file_streaming(manager: ManagerClient):
{"dc": "dc1", "rack": "r2"},
])
cql, hosts = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND tablets = {{'initial': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key))")
@@ -727,7 +731,7 @@ async def test_file_streaming(manager: ManagerClient):
await manager.server_start(servers[1].server_id)
# Add node3
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
new_server = await manager.server_add(cmdline=cmdline_loggers + ['--logger-log-level', 'view_update_generator=trace'], property_file={"dc": "dc1", "rack": "r1"})
s0_host_id = await manager.get_host_id(servers[0].server_id)
@@ -815,7 +819,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
'tablet_load_stats_refresh_interval_in_seconds': 1,
})
cql, hosts = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key)) WITH tablets = {{'min_tablet_count': 2}}")
@@ -864,7 +868,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
await manager.server_start(servers[1].server_id)
# Add node3
await manager.disable_tablet_balancing()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
new_server = await manager.server_add(cmdline=cmdline_loggers + ['--logger-log-level', 'view_update_generator=trace'], property_file={"dc": "dc1", "rack": "r1"}, config={
'tablet_load_stats_refresh_interval_in_seconds': 1,
})
@@ -894,7 +898,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
if new_tablet_count == expected_tablet_count:
return True
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
assert await get_tablet_count(manager, servers[0], ks, "tab") == 2
await cql.run_async(f"ALTER TABLE {ks}.tab WITH tablets = {{'min_tablet_count': 1}}")
await wait_for(lambda: tablet_count_is(1), time.time() + 60)
@@ -918,7 +922,7 @@ async def test_tablet_migration_during_view_building(manager: ManagerClient):
node_count = 1
server = new_server = await manager.server_add(cmdline=cmdline_loggers, property_file={"dc": "dc1", "rack": "r1"})
cql, _ = await manager.get_ready_cql([server])
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c)) WITH tablets = {{'min_tablet_count': 1}}")
@@ -953,7 +957,7 @@ async def test_tablet_merge_during_view_building(manager: ManagerClient):
{"dc": "dc1", "rack": "r3"},
])
cql, _ = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
await disable_tablet_load_balancing_on_all_servers(manager)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c)) WITH tablets = {{'min_tablet_count': 2}}")
@@ -972,7 +976,7 @@ async def test_tablet_merge_during_view_building(manager: ManagerClient):
if new_tablet_count == expected_tablet_count:
return True
await manager.enable_tablet_balancing()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
assert await get_tablet_count(manager, servers[0], ks, "tab") == 2
await cql.run_async(f"ALTER TABLE {ks}.tab WITH tablets = {{'min_tablet_count': 1}}")
await wait_for(lambda: tablet_count_is(1), time.time() + 60)

View File

@@ -336,29 +336,6 @@ class ManagerClient:
logger.debug("ManagerClient stopping gracefully %s", server_id)
await self.client.put_json(f"/cluster/server/{server_id}/stop_gracefully", timeout=timeout)
async def disable_tablet_balancing(self):
"""
Disables background tablet load-balancing.
If there are already active migrations, it waits for them to finish before returning.
Doesn't block migrations on behalf of node operations like decommission, removenode or replace.
:return:
"""
servers = await self.running_servers()
if not servers:
raise Exception("No running servers")
# Any server will do, it's a group0 operation
await self.api.disable_tablet_balancing(servers[0].ip_addr)
async def enable_tablet_balancing(self):
"""
Enables background tablet load-balancing.
"""
servers = await self.running_servers()
if not servers:
raise Exception("No running servers")
# Any server will do, it's a group0 operation
await self.api.enable_tablet_balancing(servers[0].ip_addr)
async def server_start(self,
server_id: ServerNum,
expected_error: str | None = None,

View File

@@ -284,7 +284,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
await manager.disable_tablet_balancing()
await asyncio.gather(*[manager.api.disable_tablet_balancing(server.ip_addr) for server in servers])
cql, _ = await manager.get_ready_cql(servers)