Compare commits
3 Commits
copilot/en
...
copilot/cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4c28690e1 | ||
|
|
9475659ae8 | ||
|
|
0b3ee197b6 |
@@ -1318,7 +1318,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.")
|
||||
, prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.")
|
||||
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, true, "Enable Prometheus protobuf with native histogram. Set to false to force text exposition format.")
|
||||
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, false, "If set allows the experimental Prometheus protobuf with native histogram")
|
||||
, abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.")
|
||||
, murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.")
|
||||
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
|
||||
|
||||
@@ -31,19 +31,23 @@ size_t quorum_for(const locator::effective_replication_map& erm) {
|
||||
return replication_factor ? (replication_factor / 2) + 1 : 0;
|
||||
}
|
||||
|
||||
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
|
||||
static size_t get_replication_factor_for_dc(const locator::effective_replication_map& erm, const sstring& dc) {
|
||||
using namespace locator;
|
||||
|
||||
const auto& rs = erm.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
const network_topology_strategy* nrs =
|
||||
const network_topology_strategy* nts =
|
||||
static_cast<const network_topology_strategy*>(&rs);
|
||||
size_t replication_factor = nrs->get_replication_factor(dc);
|
||||
return replication_factor ? (replication_factor / 2) + 1 : 0;
|
||||
return nts->get_replication_factor(dc);
|
||||
}
|
||||
|
||||
return quorum_for(erm);
|
||||
return erm.get_replication_factor();
|
||||
}
|
||||
|
||||
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
|
||||
auto rf = get_replication_factor_for_dc(erm, dc);
|
||||
return rf ? (rf / 2) + 1 : 0;
|
||||
}
|
||||
|
||||
size_t block_for_local_serial(const locator::effective_replication_map& erm) {
|
||||
@@ -188,18 +192,30 @@ void assure_sufficient_live_nodes(
|
||||
return pending <= live ? live - pending : 0;
|
||||
};
|
||||
|
||||
auto make_rf_zero_error_msg = [cl] (const sstring& local_dc) {
|
||||
return format("Cannot achieve consistency level {} in datacenter '{}' with replication factor 0. "
|
||||
"Ensure the keyspace is replicated to this datacenter or use a non-local consistency level.", cl, local_dc);
|
||||
};
|
||||
|
||||
const auto& topo = erm.get_topology();
|
||||
const sstring& local_dc = topo.get_datacenter();
|
||||
|
||||
switch (cl) {
|
||||
case consistency_level::ANY:
|
||||
// local hint is acceptable, and local node is always live
|
||||
break;
|
||||
case consistency_level::LOCAL_ONE:
|
||||
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
|
||||
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, 1, 0);
|
||||
}
|
||||
if (topo.count_local_endpoints(live_endpoints) < topo.count_local_endpoints(pending_endpoints) + 1) {
|
||||
throw exceptions::unavailable_exception(cl, 1, 0);
|
||||
}
|
||||
break;
|
||||
case consistency_level::LOCAL_QUORUM: {
|
||||
if (size_t local_rf = get_replication_factor_for_dc(erm, local_dc); local_rf == 0) {
|
||||
throw exceptions::unavailable_exception(make_rf_zero_error_msg(local_dc), cl, need, 0);
|
||||
}
|
||||
size_t local_live = topo.count_local_endpoints(live_endpoints);
|
||||
size_t pending = topo.count_local_endpoints(pending_endpoints);
|
||||
if (local_live < need + pending) {
|
||||
|
||||
@@ -7261,9 +7261,6 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
});
|
||||
}
|
||||
|
||||
// Refresh is triggered after table creation, need to make sure we see the new tablets.
|
||||
co_await _group0->group0_server().read_barrier(&_group0_as);
|
||||
|
||||
using table_ids_t = std::unordered_set<table_id>;
|
||||
const auto table_ids = co_await std::invoke([this] () -> future<table_ids_t> {
|
||||
table_ids_t ids;
|
||||
|
||||
@@ -119,8 +119,7 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
|
||||
}
|
||||
} // namespace
|
||||
|
||||
class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
, public migration_listener::empty_listener {
|
||||
class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
gms::gossiper& _gossiper;
|
||||
netw::messaging_service& _messaging;
|
||||
@@ -2030,7 +2029,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
// Compute the average tablet size of existing replicas
|
||||
uint64_t tablet_size_sum = 0;
|
||||
size_t replica_count = 0;
|
||||
bool incomplete = false;
|
||||
const locator::range_based_tablet_id rb_tid {gid.table, trange};
|
||||
auto tsi = get_migration_streaming_info(get_token_metadata().get_topology(), tinfo, trinfo);
|
||||
for (auto& r : tsi.read_from) {
|
||||
@@ -2038,15 +2036,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
if (tablet_size_opt) {
|
||||
tablet_size_sum += *tablet_size_opt;
|
||||
replica_count++;
|
||||
} else {
|
||||
incomplete = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!incomplete) {
|
||||
if (replica_count) {
|
||||
new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
|
||||
auto size = replica_count ? tablet_size_sum / replica_count : 0;
|
||||
new_load_stats->tablet_stats.at(pending->host).tablet_sizes[gid.table][trange] = size;
|
||||
new_load_stats->tablet_stats.at(pending->host).tablet_sizes[gid.table][trange] = tablet_size_sum / replica_count;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -2301,30 +2296,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
});
|
||||
}
|
||||
|
||||
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
// New tablets were allocated, we need per-tablet stats for them for tablet balancer to make progress.
|
||||
trigger_load_stats_refresh();
|
||||
}
|
||||
|
||||
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override {
|
||||
trigger_load_stats_refresh();
|
||||
}
|
||||
|
||||
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override {
|
||||
// Tablet hints may have changed. Wake up so that load balancer re-evaluates tablet distribution.
|
||||
_topo_sm.event.broadcast();
|
||||
}
|
||||
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
// Tablet distribution has changed. Wake up the load balancer.
|
||||
_topo_sm.event.broadcast();
|
||||
}
|
||||
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {
|
||||
// Tablet distribution has changed. Wake up the load balancer.
|
||||
_topo_sm.event.broadcast();
|
||||
}
|
||||
|
||||
future<> cancel_all_requests(group0_guard guard, std::unordered_set<raft::server_id> dead_nodes) {
|
||||
utils::chunked_vector<canonical_mutation> muts;
|
||||
std::vector<raft::server_id> reject_join;
|
||||
@@ -3649,27 +3620,21 @@ public:
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
|
||||
, _cdc_gens(cdc_gens)
|
||||
, _tablet_load_stats_refresh([this] {
|
||||
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
|
||||
return refresh_tablet_load_stats();
|
||||
});
|
||||
})
|
||||
, _tablet_load_stats_refresh([this] { return refresh_tablet_load_stats(); })
|
||||
, _ring_delay(ring_delay)
|
||||
, _group0_holder(_group0.hold_group0_gate())
|
||||
, _voter_handler(group0, topo_sm._topology, gossiper, feature_service)
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_db.get_notifier().register_listener(this);
|
||||
}
|
||||
{}
|
||||
|
||||
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
|
||||
future<bool> maybe_run_upgrade();
|
||||
future<> run();
|
||||
future<> stop();
|
||||
|
||||
virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); };
|
||||
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); };
|
||||
virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
|
||||
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
|
||||
|
||||
private:
|
||||
tablet_ops_metrics _tablet_ops_metrics;
|
||||
@@ -3719,6 +3684,10 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_t
|
||||
future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard guard) {
|
||||
rtlogger.debug("Evaluating tablet balance");
|
||||
|
||||
if (utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) {
|
||||
co_await _tablet_load_stats_refresh.trigger();
|
||||
}
|
||||
|
||||
auto tm = get_token_metadata_ptr();
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(tm, &_topo_sm._topology, &_sys_ks, {}, get_dead_nodes());
|
||||
if (plan.empty()) {
|
||||
@@ -3870,7 +3839,6 @@ future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
rtlogger.debug("raft topology: Refreshed table load stats for all DC(s).");
|
||||
|
||||
_tablet_allocator.set_load_stats(make_lw_shared<const locator::load_stats>(std::move(stats)));
|
||||
_topo_sm.event.broadcast(); // wake up load balancer.
|
||||
}
|
||||
|
||||
future<> topology_coordinator::start_tablet_load_stats_refresher() {
|
||||
@@ -3879,6 +3847,7 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() {
|
||||
bool sleep = true;
|
||||
try {
|
||||
co_await _tablet_load_stats_refresh.trigger();
|
||||
_topo_sm.event.broadcast(); // wake up load balancer.
|
||||
} catch (raft::request_aborted&) {
|
||||
rtlogger.debug("raft topology: Tablet load stats refresher aborted");
|
||||
sleep = false;
|
||||
@@ -4206,21 +4175,10 @@ future<> topology_coordinator::run() {
|
||||
auto group0_voter_refresher = group0_voter_refresher_fiber();
|
||||
auto vb_coordinator_fiber = run_view_building_coordinator();
|
||||
|
||||
std::optional<future<>> event_wait;
|
||||
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min));
|
||||
|
||||
if (!_topo_sm._topology.tstate && utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) {
|
||||
co_await _tablet_load_stats_refresh.trigger();
|
||||
}
|
||||
|
||||
if (!event_wait) {
|
||||
event_wait = _topo_sm.event.wait();
|
||||
}
|
||||
|
||||
auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation());
|
||||
|
||||
if (_rollback) {
|
||||
@@ -4232,14 +4190,9 @@ future<> topology_coordinator::run() {
|
||||
bool had_work = co_await handle_topology_transition(std::move(guard));
|
||||
|
||||
if (!had_work) {
|
||||
co_await utils::get_local_injector().inject("wait-before-topology-coordinator-goes-to-sleep", utils::wait_for_message(30s));
|
||||
|
||||
// Nothing to work on. Wait for topology change event.
|
||||
rtlogger.debug("topology coordinator fiber has nothing to do. Sleeping.");
|
||||
_as.check();
|
||||
auto f = std::move(*event_wait);
|
||||
event_wait.reset();
|
||||
co_await std::move(f);
|
||||
co_await await_event();
|
||||
rtlogger.debug("topology coordinator fiber got an event");
|
||||
}
|
||||
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s));
|
||||
@@ -4256,10 +4209,6 @@ future<> topology_coordinator::run() {
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
if (event_wait && event_wait->available()) {
|
||||
event_wait->ignore_ready_future();
|
||||
}
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
@@ -4272,8 +4221,6 @@ future<> topology_coordinator::run() {
|
||||
}
|
||||
|
||||
future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
// futures in _tablets in case any of them failed, before these failures
|
||||
|
||||
@@ -119,7 +119,8 @@ async def test_multi_column_lwt_during_migration(manager: ManagerClient):
|
||||
}
|
||||
|
||||
servers = await manager.servers_add(6, config=cfg)
|
||||
await manager.disable_tablet_balancing()
|
||||
for server in servers:
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
rf_max = len(servers) - 1
|
||||
rf = random.randint(2, rf_max)
|
||||
|
||||
@@ -32,7 +32,7 @@ async def pin_the_only_tablet(manager, keyspace_name, table_or_view_name, server
|
||||
# We need to send load-balancing commands to one of the nodes and they
|
||||
# will be propagated to all of them. Since we already know of
|
||||
# target_server, let's just use that.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
tablet_token = 0 # Doesn't matter since there is one tablet
|
||||
source_replicas = await get_tablet_replicas(manager, server, keyspace_name, table_or_view_name, tablet_token)
|
||||
# We assume RF=1 so get_tablet_replicas() returns just one replica
|
||||
@@ -297,7 +297,7 @@ async def test_mv_tablet_split(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -330,7 +330,7 @@ async def test_mv_tablet_split(manager: ManagerClient):
|
||||
|
||||
s1_log = await manager.server_open_log(servers[0].server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
await s1_log.wait_for(f"Detected tablet split for table {ks}.tv", from_mark=s1_mark)
|
||||
await check()
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
|
||||
|
||||
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
base_replicas = await get_tablet_replicas(manager, servers[0], ks, "test", 0)
|
||||
logger.info(f'{ks}.test replicas: {base_replicas}')
|
||||
|
||||
@@ -82,7 +82,9 @@ async def test_mv_retried_writes_reach_all_replicas(manager: ManagerClient) -> N
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
|
||||
# Disable tablet balancing so that the slow node doesn't get tablets moved away from it.
|
||||
await manager.disable_tablet_balancing()
|
||||
for s in servers:
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
# Make sure that the slow node has a base table tablet and no view tablets, so that the
|
||||
# view updates from it are remote. (using shard 0 and token 0 when moving tablets as they don't make a difference here)
|
||||
|
||||
@@ -92,7 +92,7 @@ async def test_start_scylla_with_view_building_disabled(manager: ManagerClient):
|
||||
async def test_view_building_with_tablet_move(manager: ManagerClient, build_mode: str):
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
table = 'test'
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ async def test_mv_update_on_pending_replica(manager: ManagerClient, intranode):
|
||||
cmd = ['--smp', '2']
|
||||
servers = [await manager.server_add(config=cfg, cmdline=cmd)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -400,7 +400,7 @@ async def test_mv_write_during_migration(manager: ManagerClient, migration_type:
|
||||
|
||||
servers = await manager.servers_add(3, cmdline=cmdline)
|
||||
cql = manager.get_cql()
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
tablets_enabled = migration_type.startswith("tablets")
|
||||
if tablets_enabled:
|
||||
|
||||
@@ -782,7 +782,7 @@ async def test_restore_with_streaming_scopes(build_mode: str, manager: ManagerCl
|
||||
|
||||
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
ks = 'ks'
|
||||
@@ -953,7 +953,7 @@ async def test_restore_primary_replica_same_rack_scope_rack(manager: ManagerClie
|
||||
|
||||
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
@@ -1005,7 +1005,7 @@ async def test_restore_primary_replica_different_rack_scope_dc(manager: ManagerC
|
||||
|
||||
servers, host_ids = await create_cluster(topology, True, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
@@ -1049,7 +1049,7 @@ async def test_restore_primary_replica_same_dc_scope_dc(manager: ManagerClient,
|
||||
|
||||
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
@@ -1101,7 +1101,7 @@ async def test_restore_primary_replica_different_dc_scope_all(manager: ManagerCl
|
||||
|
||||
servers, host_ids = await create_cluster(topology, False, manager, logger, object_storage)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
@@ -391,8 +391,6 @@ async def init_tablet_transfer(manager: ManagerClient,
|
||||
LOGGER.info("Cannot perform a tablet migration because rack='%s' has only %i node(s)", target_rack, len(viable_targets))
|
||||
return
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
await manager.cql.run_async(
|
||||
"CREATE KEYSPACE test"
|
||||
" WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 3 } AND"
|
||||
@@ -403,6 +401,8 @@ async def init_tablet_transfer(manager: ManagerClient,
|
||||
*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k})") for k in range(256)]
|
||||
)
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(node_ip=s.ip_addr) for s in servers])
|
||||
|
||||
replicas = await get_all_tablet_replicas(
|
||||
manager=manager,
|
||||
server=servers[0],
|
||||
|
||||
@@ -196,9 +196,9 @@ async def prepare_migration_test(manager: ManagerClient):
|
||||
s = await manager.server_add(cmdline=extra_scylla_cmdline_options)
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server()
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
@@ -360,7 +360,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC
|
||||
await asyncio.gather(repair_task(), wait_and_check_none())
|
||||
|
||||
async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
insert = cql.prepare(f"INSERT INTO {keyspace}.{table}(pk, c) VALUES(?, ?)")
|
||||
@@ -371,7 +371,7 @@ async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: st
|
||||
await manager.api.flush_keyspace(server.ip_addr, keyspace)
|
||||
|
||||
async def prepare_merge(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table} WHERE pk={k};") for k in keys])
|
||||
@@ -382,7 +382,7 @@ async def enable_tablet_balancing_and_wait(manager: ManagerClient, server: Serve
|
||||
s1_log = await manager.server_open_log(server.server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
await s1_log.wait_for(message, from_mark=s1_mark)
|
||||
|
||||
@@ -400,7 +400,7 @@ async def test_tablet_resize_task(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
})]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
table1 = "test1"
|
||||
@@ -421,7 +421,7 @@ async def test_tablet_resize_task(manager: ManagerClient):
|
||||
|
||||
injection = "tablet_split_finalization_postpone"
|
||||
await enable_injection(manager, servers, injection)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async def wait_and_check_status(server, type, keyspace, table):
|
||||
task = (await wait_tasks_created(tm, server, module_name, 1, type, keyspace, table))[0]
|
||||
@@ -441,7 +441,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
})]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
table1 = "test1"
|
||||
@@ -464,7 +464,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
|
||||
await enable_injection(manager, servers, injection)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
|
||||
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", keyspace, table1))[0]
|
||||
|
||||
@@ -501,7 +501,7 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
})]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
table1 = "test1"
|
||||
@@ -515,7 +515,7 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
|
||||
injection = "tablet_split_finalization_postpone"
|
||||
await enable_injection(manager, servers, injection)
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
|
||||
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
|
||||
@@ -1087,8 +1087,6 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
|
||||
server = await manager.server_add(config=config, cmdline=cmdline)
|
||||
alternator = get_alternator(server.ip_addr)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
logger.info("Creating alternator test table")
|
||||
table = alternator.create_table(TableName=unique_table_name(),
|
||||
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
||||
|
||||
@@ -328,7 +328,7 @@ async def test_cdc_colocation(manager: ManagerClient):
|
||||
test_data[i] = i * 10
|
||||
await cql.run_async(f"INSERT INTO {ks}.test(pk, v) VALUES({i}, {i * 10})")
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# create map that maps each stream_id to a list of all partitions it contains
|
||||
rows = await cql.run_async(f"SELECT \"cdc$stream_id\" as sid, pk FROM {ks}.test_scylla_cdc_log")
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import aiohttp
|
||||
import logging
|
||||
import pytest
|
||||
import requests
|
||||
@@ -40,86 +39,3 @@ async def test_non_liveupdatable_config(manager):
|
||||
await manager.server_update_config(server.server_id, liveupdatable_param, True)
|
||||
await wait_for_config(manager, server, liveupdatable_param, True)
|
||||
await wait_for_config(manager, server, not_liveupdatable_param, True)
|
||||
|
||||
|
||||
# Default Prometheus metrics port
|
||||
PROMETHEUS_PORT = 9180
|
||||
|
||||
# Accept header for requesting Prometheus protobuf format with native histograms
|
||||
PROMETHEUS_PROTOBUF_ACCEPT_HEADER = 'application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prometheus_allow_protobuf_default(manager):
|
||||
"""
|
||||
Test that prometheus_allow_protobuf is enabled by default,
|
||||
while ensuring the configuration can be changed if needed.
|
||||
"""
|
||||
logging.info("Starting server with default configuration")
|
||||
server = await manager.server_add()
|
||||
|
||||
logging.info("Verify prometheus_allow_protobuf defaults to true")
|
||||
await wait_for_config(manager, server, "prometheus_allow_protobuf", True)
|
||||
|
||||
logging.info("Test that the configuration can be explicitly disabled")
|
||||
server2 = await manager.server_add(config={'prometheus_allow_protobuf': False})
|
||||
await wait_for_config(manager, server2, "prometheus_allow_protobuf", False)
|
||||
|
||||
logging.info("Test that the configuration can be explicitly enabled")
|
||||
server3 = await manager.server_add(config={'prometheus_allow_protobuf': True})
|
||||
await wait_for_config(manager, server3, "prometheus_allow_protobuf", True)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prometheus_protobuf_native_histogram(manager):
|
||||
"""
|
||||
Test that when prometheus_allow_protobuf is enabled, the server actually
|
||||
returns metrics in protobuf format with native histogram support when requested.
|
||||
"""
|
||||
logging.info("Starting server with prometheus_allow_protobuf enabled")
|
||||
server = await manager.server_add(config={'prometheus_allow_protobuf': True})
|
||||
|
||||
metrics_url = f"http://{server.ip_addr}:{PROMETHEUS_PORT}/metrics"
|
||||
|
||||
logging.info(f"Requesting metrics in protobuf format from {metrics_url}")
|
||||
|
||||
# Request metrics with Accept header for protobuf format
|
||||
headers = {
|
||||
'Accept': PROMETHEUS_PROTOBUF_ACCEPT_HEADER
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(metrics_url, headers=headers) as resp:
|
||||
assert resp.status == 200, f"Expected status 200, got {resp.status}"
|
||||
|
||||
# Check that we got protobuf content type in response
|
||||
content_type = resp.headers.get('Content-Type', '')
|
||||
logging.info(f"Response Content-Type: {content_type}")
|
||||
|
||||
# When protobuf is supported and requested, we should get protobuf back
|
||||
assert 'application/vnd.google.protobuf' in content_type, \
|
||||
f"Expected protobuf content type, got: {content_type}"
|
||||
|
||||
# Read the response body
|
||||
body = await resp.read()
|
||||
|
||||
# Verify we got non-empty protobuf data
|
||||
assert len(body) > 0, "Expected non-empty protobuf response"
|
||||
|
||||
logging.info(f"Successfully received protobuf response with {len(body)} bytes")
|
||||
|
||||
logging.info("Test that disabling prometheus_allow_protobuf prevents protobuf responses")
|
||||
server2 = await manager.server_add(config={'prometheus_allow_protobuf': False})
|
||||
metrics_url2 = f"http://{server2.ip_addr}:{PROMETHEUS_PORT}/metrics"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(metrics_url2, headers=headers) as resp:
|
||||
assert resp.status == 200, "Fail reading metrics from {metrics_url2}"
|
||||
|
||||
content_type = resp.headers.get('Content-Type', '')
|
||||
logging.info(f"Response Content-Type (protobuf disabled): {content_type}")
|
||||
|
||||
# When protobuf is disabled, we should get text format even if requested
|
||||
# The server should return text/plain or not include protobuf in content-type
|
||||
assert 'application/vnd.google.protobuf' not in content_type, \
|
||||
f"Expected text format when protobuf disabled, got: {content_type}"
|
||||
|
||||
logging.info("Confirmed that protobuf is not returned when disabled")
|
||||
|
||||
@@ -40,7 +40,7 @@ async def test_counter_updates_during_tablet_migration(manager: ManagerClient, m
|
||||
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||
cql = manager.get_cql()
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets={'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.counters (pk int PRIMARY KEY, c counter)")
|
||||
|
||||
@@ -52,7 +52,7 @@ async def test_file_streaming_respects_encryption(manager: ManagerClient, workdi
|
||||
cmdline = ['--smp=1']
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg, cmdline=cmdline))
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.cql
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
@@ -84,7 +84,7 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
|
||||
# This should be done before adding the last two servers,
|
||||
# otherwise it can break the version == fence_version condition
|
||||
# which the test relies on.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[2].ip_addr)
|
||||
|
||||
logger.info('Creating new tables')
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
|
||||
@@ -146,7 +146,7 @@ async def test_fence_hints(request, manager: ManagerClient):
|
||||
# This should be done before adding the last two servers,
|
||||
# otherwise it can break the version == fence_version condition
|
||||
# which the test relies on.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(s0.ip_addr)
|
||||
|
||||
[s1, s2] = await manager.servers_add(2, property_file=[
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
@@ -416,7 +416,7 @@ async def test_fenced_out_on_tablet_migration_while_handling_paxos_verb(manager:
|
||||
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
|
||||
|
||||
logger.info("Disable tablet balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create a test keyspace")
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
|
||||
@@ -374,7 +374,8 @@ async def test_hint_to_pending(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
])
|
||||
cql = await manager.get_cql_exclusive(servers[0])
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
table = f"{ks}.t"
|
||||
|
||||
@@ -13,7 +13,8 @@ import pytest
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
|
||||
from test.cqlpy import nodetool
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra import ConsistencyLevel, Unavailable
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra.protocol import InvalidRequest, ConfigurationException
|
||||
from cassandra.query import SimpleStatement
|
||||
from test.pylib.async_cql import _wrap_future
|
||||
@@ -113,14 +114,19 @@ async def test_putget_2dc_with_rf(
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureRequest, manager: ManagerClient):
|
||||
"""Test querying dc with CL=LOCAL_QUORUM when RF=0 for this dc, does not crash the node and returns None
|
||||
Covers https://github.com/scylladb/scylla/issues/8354"""
|
||||
async def test_read_or_write_to_dc_with_rf_0_fails(request: pytest.FixtureRequest, manager: ManagerClient):
|
||||
"""
|
||||
Verifies that operations using local consistency levels (LOCAL_QUORUM, LOCAL_ONE) fail
|
||||
with a clear, actionable error message when the datacenter has replication factor 0.
|
||||
|
||||
Covers:
|
||||
- https://github.com/scylladb/scylla/issues/8354 - Operations should not crash the DB
|
||||
- https://github.com/scylladb/scylladb/issues/27893 - Should give a clear error about RF=0
|
||||
"""
|
||||
servers = []
|
||||
ks = "test_ks"
|
||||
table_name = "test_table_name"
|
||||
expected = ["k1", "value1"]
|
||||
dc_replication = {'dc2': 0}
|
||||
dc_replication = {'dc1': 1, 'dc2': 0}
|
||||
columns = [Column("name", TextType), Column("value", TextType)]
|
||||
|
||||
for i in [1, 2]:
|
||||
@@ -136,18 +142,40 @@ async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureReque
|
||||
|
||||
random_tables = RandomTables(request.node.name, manager, ks, 1, dc_replication)
|
||||
await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name)
|
||||
dc1_connection.execute(
|
||||
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('{expected[0]}', '{expected[1]}');")
|
||||
select_query = SimpleStatement(f"SELECT * from {ks}.{table_name};",
|
||||
consistency_level=ConsistencyLevel.LOCAL_QUORUM)
|
||||
nodetool.flush(dc1_connection, "{ks}.{table_name}")
|
||||
first_node_results = list(dc1_connection.execute(select_query).one())
|
||||
second_node_result = dc2_connection.execute(select_query).one()
|
||||
|
||||
assert first_node_results == expected, \
|
||||
f"Expected {expected} from {select_query.query_string}, but got {first_node_results}"
|
||||
assert second_node_result is None, \
|
||||
f"Expected no results from {select_query.query_string}, but got {second_node_result}"
|
||||
# sanity check: operations from dc1 (RF > 0) should succeed with all local consistency levels
|
||||
local_cls = [ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_ONE]
|
||||
|
||||
for i, cl in enumerate(local_cls):
|
||||
dc1_connection.execute(SimpleStatement(
|
||||
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('k{i}', 'value{i}')", consistency_level=cl))
|
||||
|
||||
nodetool.flush(dc1_connection, f"{ks}.{table_name}")
|
||||
|
||||
select_query = SimpleStatement(f"SELECT * FROM {ks}.{table_name} WHERE {columns[0].name} = 'k{i}'", consistency_level=cl)
|
||||
# asserting SELECT's results may seem excessive, but it could happen that it'd fail and return nothing and not throw,
|
||||
# in which case it'd silently return nothing, which we don't want, and hence are asserting it's *really* working
|
||||
result = list(dc1_connection.execute(select_query).one())
|
||||
expected_row = [f"k{i}", f"value{i}"]
|
||||
assert result == expected_row, \
|
||||
f"Expected {expected_row} with CL={cl} from dc1, but got {result}"
|
||||
|
||||
def assert_operation_fails_with_rf0_error(cl: ConsistencyLevel, operation: str) -> None:
|
||||
with pytest.raises((Unavailable, NoHostAvailable)) as exc_info:
|
||||
dc2_connection.execute(SimpleStatement(operation, consistency_level=cl))
|
||||
|
||||
error_msg = str(exc_info.value)
|
||||
assert "Cannot achieve consistency level LOCAL_" in error_msg and "use a non-local consistency level" in error_msg, \
|
||||
f"Expected error indicating RF=0 and datacenter with CL={cl}, but received: {exc_info.value}"
|
||||
|
||||
# SELECT & INSERT from dc2 (with RF=0) using local CLs should fail with a clear error message
|
||||
# indicating the replication factor is 0 and suggesting to use a non-local CL
|
||||
for i, cl in enumerate(local_cls):
|
||||
assert_operation_fails_with_rf0_error(cl,
|
||||
f"SELECT * FROM {ks}.{table_name}")
|
||||
|
||||
assert_operation_fails_with_rf0_error(cl,
|
||||
f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('k_fail_{i}', 'value_fail_{i}')")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_and_alter_keyspace_with_altering_rf_and_racks(manager: ManagerClient):
|
||||
|
||||
@@ -176,7 +176,7 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
|
||||
# Disable load balancer on the topology coordinator node so that an ongoing tablet migration doesn't fail one of the
|
||||
# check_system_topology_and_cdc_generations_v3_consistency calls below. A tablet migration can suddenly make
|
||||
# version or fence_version inconsistent among nodes.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(live_servers[0].ip_addr)
|
||||
|
||||
cql, hosts = await manager.get_ready_cql(live_servers + new_servers)
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ async def test_refresh_with_streaming_scopes(manager: ManagerClient, topology_rf
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
@@ -141,7 +141,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
@@ -41,7 +41,7 @@ async def test_resurrection_while_file_streaming(manager: ManagerClient):
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
||||
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
||||
|
||||
@@ -24,7 +24,7 @@ async def test_partitioned_sstable_set(manager: ManagerClient, mode):
|
||||
|
||||
cmdline = ['--smp=1']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmdline)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
||||
|
||||
@@ -151,7 +151,8 @@ async def test_reshape_with_tablets(manager: ManagerClient):
|
||||
async def test_tablet_rf_change(manager: ManagerClient, direction):
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
||||
await manager.disable_tablet_balancing()
|
||||
for s in servers:
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
res = await cql.run_async("SELECT data_center FROM system.local")
|
||||
@@ -252,7 +253,7 @@ async def test_tablets_api_consistency(manager: ManagerClient, endpoint):
|
||||
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack1'})
|
||||
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack2'})
|
||||
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack3'})
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
hosts = { await manager.get_host_id(s.server_id): s.ip_addr for s in servers }
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
||||
@@ -520,7 +521,6 @@ async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode
|
||||
cfg['error_injections_at_startup'] = [{'name': 'querier-cache-ttl-seconds', 'value': 999999999}]
|
||||
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -595,7 +595,7 @@ async def test_read_of_pending_replica_during_migration(manager: ManagerClient,
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
||||
@@ -667,7 +667,7 @@ async def test_explicit_tablet_movement_during_decommission(manager: ManagerClie
|
||||
#
|
||||
# Load balancing being enabled or disabled is a cluster-global property; we can use any node to toggle it.
|
||||
logger.info("Disabling load balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Populating tablet")
|
||||
# Create a table with just one partition and RF=1, so we have exactly one tablet.
|
||||
@@ -831,7 +831,7 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
'--logger-log-level', 'view_building_worker=debug',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create table, populate it and flush the table to disk")
|
||||
cql = manager.get_cql()
|
||||
@@ -896,7 +896,7 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
||||
'--logger-log-level', 'view_building_worker=debug',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create the test table, populate few rows and flush to disk")
|
||||
cql = manager.get_cql()
|
||||
@@ -985,7 +985,7 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create the test table, populate few rows and flush to disk")
|
||||
cql = manager.get_cql()
|
||||
@@ -998,7 +998,7 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
|
||||
|
||||
logger.info("Start Node 2")
|
||||
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
|
||||
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
|
||||
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
|
||||
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
||||
@@ -1214,7 +1214,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# create a table so that it has at least 2 tablets (and storage groups) per shard
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
||||
@@ -1230,7 +1230,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
|
||||
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
|
||||
|
||||
# enable the load balancer which should emmit a tablet split
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# wait for compaction groups to be created and split to begin
|
||||
await s0_log.wait_for('split_storage_groups_wait: wait')
|
||||
@@ -1260,7 +1260,7 @@ async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
|
||||
cql = manager.get_cql()
|
||||
|
||||
# We don't want the load balancer to migrate tablets during the test
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
# Create the table, insert data and flush
|
||||
@@ -1306,7 +1306,7 @@ async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
|
||||
async def test_two_tablets_concurrent_repair_and_migration(manager: ManagerClient):
|
||||
injection = "repair_shard_repair_task_impl_do_repair_ranges"
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
all_replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
|
||||
all_replicas.sort(key=lambda x: x.last_token)
|
||||
@@ -1384,7 +1384,7 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
|
||||
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
||||
for cf in ["test", "blocker"]:
|
||||
logger.info(f"Move all tablets of test.{cf} from Node 2 to Node 1")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
s1_replicas = await get_all_tablet_replicas(manager, servers[1], "test", cf)
|
||||
migration_tasks = [
|
||||
manager.api.move_tablet(servers[0].ip_addr, "test", cf,
|
||||
@@ -1395,7 +1395,7 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
|
||||
await asyncio.gather(*migration_tasks)
|
||||
|
||||
logger.info("Re-enable tablet balancing; it should be blocked by pending split finalization")
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
mark, _ = await log.wait_for("Setting tablet balancing to true")
|
||||
|
||||
logger.info("Unblock resize finalisation and verify that the finalisation is preferred over migrations")
|
||||
@@ -1421,7 +1421,7 @@ async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(m
|
||||
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async def insert_with_down(down_server):
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
|
||||
@@ -1461,13 +1461,12 @@ async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
|
||||
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server("r1")
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -1551,7 +1550,6 @@ async def test_moving_replica_to_replica(manager: ManagerClient):
|
||||
# For convenience when moving tablets.
|
||||
cmdline = ["--smp=1"]
|
||||
s1, s2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
host_id1 = await manager.get_host_id(s1.server_id)
|
||||
host_id2 = await manager.get_host_id(s2.server_id)
|
||||
@@ -1590,7 +1588,6 @@ async def test_moving_replica_within_single_rack(manager: ManagerClient):
|
||||
# For convenience when moving tablets.
|
||||
cmdline = ["--smp=1"]
|
||||
s1, s2 = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "r1"})
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
host_id1 = await manager.get_host_id(s1.server_id)
|
||||
host_id2 = await manager.get_host_id(s2.server_id)
|
||||
@@ -1637,39 +1634,4 @@ async def test_disabling_balancing_preempts_balancer(manager: ManagerClient):
|
||||
await log.wait_for('Initiating tablet', from_mark=mark)
|
||||
|
||||
# Should preempt balancing
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_table_creation_wakes_up_balancer(manager: ManagerClient):
|
||||
"""
|
||||
Reproduces both https://github.com/scylladb/scylladb/issues/25163 and https://github.com/scylladb/scylladb/issues/27958
|
||||
|
||||
Scenario:
|
||||
1. Start a cluster
|
||||
2. Block the topology coordinator right before it goes to sleep
|
||||
3. Create a table, which should wake up the coordinator
|
||||
4. Verify that the coordinator didn't go to sleep
|
||||
"""
|
||||
server = await manager.server_add()
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
|
||||
# Block coordinator right before going to sleep
|
||||
# We use node bootstrap as an operation which is going to be trapped on exit, but it's arbitrary.
|
||||
mark = await log.mark()
|
||||
await manager.api.enable_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep', one_shot=True)
|
||||
await manager.server_add()
|
||||
await log.wait_for('wait-before-topology-coordinator-goes-to-sleep: wait', from_mark=mark)
|
||||
|
||||
# Create a table, which should prevent the coordinator from sleeping
|
||||
await manager.api.enable_injection(server.ip_addr, 'wait-after-topology-coordinator-gets-event', one_shot=True)
|
||||
mark = await log.mark()
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# Verify it didn't go to sleep. If it did, the wait for wakeup would be 30s on average,
|
||||
# up to stats refresh period, which is 60s. So use a small timeout.
|
||||
await manager.api.message_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep')
|
||||
await log.wait_for('wait-after-topology-coordinator-gets-event: wait', from_mark=mark, timeout=5)
|
||||
await manager.api.disable_tablet_balancing(coord_srv.ip_addr)
|
||||
|
||||
@@ -224,7 +224,7 @@ async def get_two_servers_to_move_tablet(manager: ManagerClient):
|
||||
cmdline = ['--enable-file-stream', 'false']
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
|
||||
@@ -330,7 +330,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -405,7 +405,7 @@ async def test_table_dropped_during_streaming(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -475,7 +475,7 @@ async def test_tablet_cleanup(manager: ManagerClient):
|
||||
|
||||
logger.info("Start first node")
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Populate table")
|
||||
cql = manager.get_cql()
|
||||
@@ -549,7 +549,7 @@ async def test_tablet_cleanup_failure(manager: ManagerClient):
|
||||
|
||||
# Disable load balancing, so that after the move_tablet API the load balancer does
|
||||
# not attempt to migrate the tablet back to the first node.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
n_tablets = 1
|
||||
@@ -631,7 +631,7 @@ async def test_tablet_split(manager: ManagerClient, injection_error: str):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -671,7 +671,7 @@ async def test_tablet_split(manager: ManagerClient, injection_error: str):
|
||||
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
||||
|
||||
# Now there's a split and migration need, so they'll potentially run concurrently.
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await check()
|
||||
time.sleep(5) # Give load balancer some time to do work
|
||||
@@ -700,7 +700,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
servers.append(await manager.server_add(config={
|
||||
'error_injections_at_startup': ['delay_split_compaction']
|
||||
@@ -736,7 +736,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await s1_log.wait_for('Finalizing resize decision for table', from_mark=s1_mark)
|
||||
|
||||
@@ -744,7 +744,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
|
||||
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
|
||||
time.sleep(1)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
|
||||
await manager.server_start(servers[1].server_id)
|
||||
@@ -752,7 +752,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
||||
|
||||
@@ -769,7 +769,7 @@ async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, inj
|
||||
cmdline = []
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -826,7 +826,7 @@ async def test_concurrent_table_drop_and_major(manager: ManagerClient):
|
||||
cmdline = ['--logger-log-level', 'compaction_manager=debug',]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -913,7 +913,7 @@ async def test_tablet_count_metric_per_shard(manager: ManagerClient):
|
||||
servers = await manager.servers_add(2, cmdline=cmdline)
|
||||
|
||||
# And given disabled load balancing
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# When two tables are created
|
||||
cql = manager.get_cql()
|
||||
@@ -1002,7 +1002,7 @@ async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_on
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -1063,14 +1063,14 @@ async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_on
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
||||
assert len(rows) == 0
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Adding new server")
|
||||
servers.append(await manager.server_add(cmdline=cmdline))
|
||||
|
||||
# Trigger concurrent migration and load-and-stream
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await manager.api.load_new_sstables(servers[0].ip_addr, ks2, "test", primary_replica_only)
|
||||
|
||||
@@ -1131,7 +1131,7 @@ async def test_storage_service_api_uneven_ownership_keyspace_and_table_params_us
|
||||
async def test_tablet_storage_freeing(manager: ManagerClient):
|
||||
logger.info("Start first node")
|
||||
servers = [await manager.server_add()]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
@@ -1173,7 +1173,7 @@ async def test_tablet_storage_freeing(manager: ManagerClient):
|
||||
async def test_schema_change_during_cleanup(manager: ManagerClient):
|
||||
logger.info("Start first node")
|
||||
servers = [await manager.server_add()]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
@@ -1223,7 +1223,7 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -1265,7 +1265,7 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
|
||||
|
||||
logger.info("Enable balancing so split will be emitted")
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Waits for split of sstable containing expired tombstones")
|
||||
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
|
||||
@@ -1421,7 +1421,8 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
||||
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
|
||||
async with create_and_populate_table(manager, rf=rf) as ctx:
|
||||
logger.debug("Temporarily disable tablet load balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
node1 = sorted(initial_servers.values(), key=lambda s: s.server_id)[0]
|
||||
await manager.api.disable_tablet_balancing(node1.ip_addr)
|
||||
|
||||
logger.info("Add a new rack")
|
||||
new_rack = f"rack{num_racks}"
|
||||
@@ -1437,7 +1438,7 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
||||
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
||||
|
||||
logger.debug("Reenable tablet load balancing")
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(node1.ip_addr)
|
||||
|
||||
live_servers: dict[ServerNum, ServerInfo] = dict()
|
||||
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
||||
@@ -1505,7 +1506,6 @@ async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
|
||||
cmdline = ['--smp=1']
|
||||
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
cql = manager.get_cql()
|
||||
n_tablets = 1
|
||||
@@ -1548,7 +1548,7 @@ async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, o
|
||||
cfg = { 'auto_snapshot': True }
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}")
|
||||
@@ -1672,7 +1672,7 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
initial_tablets = 2
|
||||
|
||||
@@ -1696,7 +1696,7 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
||||
log_mark = await log.mark()
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
||||
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
||||
@@ -1738,7 +1738,7 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
|
||||
}, cmdline=cmdline)]
|
||||
server = servers[0]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -1786,14 +1786,14 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
||||
assert len(rows) == 0
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
|
||||
|
||||
s1_log = await manager.server_open_log(servers[0].server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
|
||||
|
||||
@@ -1887,7 +1887,7 @@ async def test_update_load_stats_after_migration(manager: ManagerClient):
|
||||
# Disable load balancing to avoid the balancer moving the tablet from a node with less to a node with more
|
||||
# available disk space. Otherwise, the move_tablet API can fail (if the tablet is already in transisiton) or
|
||||
# be a no-op (in case the tablet has already been migrated)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
||||
@@ -1936,7 +1936,7 @@ async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -2009,7 +2009,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
initial_tablets = 2
|
||||
|
||||
@@ -2049,7 +2049,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie
|
||||
|
||||
token = 'all'
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
if repair_before_split:
|
||||
await generate_repair_work()
|
||||
@@ -2100,7 +2100,7 @@ async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
initial_tablets = 1
|
||||
|
||||
@@ -2128,7 +2128,7 @@ async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
||||
dst_shard = 1
|
||||
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
||||
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
|
||||
|
||||
@@ -32,13 +32,19 @@ async def disable_injection_on(manager, error_name, servers):
|
||||
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
||||
await asyncio.gather(*errs)
|
||||
|
||||
async def disable_tablet_balancing(manager, servers):
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
||||
|
||||
async def enable_tablet_balancing(manager, servers):
|
||||
await asyncio.gather(*[manager.api.enable_tablet_balancing(s.ip_addr) for s in servers])
|
||||
|
||||
@asynccontextmanager
|
||||
async def no_tablet_balancing(manager):
|
||||
await manager.disable_tablet_balancing()
|
||||
async def no_tablet_balancing(manager, servers):
|
||||
await disable_tablet_balancing(manager, servers)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
await manager.enable_tablet_balancing()
|
||||
await enable_tablet_balancing(manager, servers)
|
||||
|
||||
async def wait_for_tablet_stage(manager, server, keyspace_name, table_name, token, stage):
|
||||
async def tablet_is_in_stage():
|
||||
@@ -107,7 +113,7 @@ async def test_move_tablet(manager: ManagerClient, move_table: str):
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
]
|
||||
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
|
||||
await manager.disable_tablet_balancing()
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -188,7 +194,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -230,7 +236,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
# Now there's a split and migration need, so they'll potentially run concurrently.
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await check()
|
||||
await asyncio.sleep(2) # Give load balancer some time to do work
|
||||
@@ -240,7 +246,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
||||
|
||||
await check()
|
||||
|
||||
async with no_tablet_balancing(manager):
|
||||
async with no_tablet_balancing(manager, servers):
|
||||
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
assert tablet_count > 1
|
||||
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
|
||||
@@ -267,7 +273,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
||||
keys = range(total_keys - 1, total_keys)
|
||||
|
||||
# To avoid race of major with migration
|
||||
async with no_tablet_balancing(manager):
|
||||
async with no_tablet_balancing(manager, servers):
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
||||
@@ -287,7 +293,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
||||
await s1_log.wait_for(f"Detected tablet merge for table {ks}.test", from_mark=s1_mark, timeout=60)
|
||||
await s1_log.wait_for(f"Detected tablet merge for table {ks}.tv", from_mark=s1_mark, timeout=60)
|
||||
|
||||
async with no_tablet_balancing(manager):
|
||||
async with no_tablet_balancing(manager, servers):
|
||||
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
assert tablet_count < old_tablet_count
|
||||
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
|
||||
@@ -310,7 +316,7 @@ async def test_create_colocated_table_while_base_is_migrating(manager: ManagerCl
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
]
|
||||
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
|
||||
await manager.disable_tablet_balancing()
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -397,7 +403,7 @@ async def test_repair_colocated_base_and_view(manager: ManagerClient):
|
||||
'--logger-log-level', 'repair=debug',
|
||||
]
|
||||
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
await manager.disable_tablet_balancing()
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
|
||||
@@ -35,7 +35,7 @@ async def test_intranode_migration(manager: ManagerClient):
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -70,7 +70,7 @@ async def test_crash_during_intranode_migration(manager: ManagerClient):
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -138,7 +138,7 @@ async def test_cross_shard_migration(manager: ManagerClient):
|
||||
|
||||
servers = await manager.servers_add(2, cmdline=cmdline)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
||||
|
||||
@@ -56,7 +56,7 @@ async def test_lwt(manager: ManagerClient):
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
quorum = len(hosts) // 2 + 1
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# We use capital letters to check that the proper quotes are used in paxos store queries.
|
||||
ks = unique_name() + '_Test'
|
||||
@@ -131,7 +131,7 @@ async def test_lwt_during_migration(manager: ManagerClient):
|
||||
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
|
||||
|
||||
logger.info("Disable tablet balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
logger.info("Create a table")
|
||||
@@ -245,7 +245,7 @@ async def test_lwt_state_is_preserved_on_tablet_migration(manager: ManagerClient
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Disable tablet balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create a keyspace")
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -354,7 +354,7 @@ async def test_lwt_state_is_preserved_on_tablet_rebuild(manager: ManagerClient):
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logger.info("Disable tablet balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create a keyspace")
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -650,7 +650,7 @@ async def test_lwt_coordinator_shard(manager: ManagerClient):
|
||||
cql = manager.get_cql()
|
||||
|
||||
logger.info("Disable tablet balancing")
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Create a keyspace")
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
|
||||
@@ -48,7 +48,7 @@ async def test_tablet_merge_simple(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -89,7 +89,7 @@ async def test_tablet_merge_simple(manager: ManagerClient):
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
# Now there's a split and migration need, so they'll potentially run concurrently.
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await check()
|
||||
time.sleep(2) # Give load balancer some time to do work
|
||||
@@ -119,12 +119,12 @@ async def test_tablet_merge_simple(manager: ManagerClient):
|
||||
keys = range(total_keys - 1, total_keys)
|
||||
|
||||
# To avoid race of major with migration
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
|
||||
# Waits for balancer to co-locate sibling tablets
|
||||
@@ -214,7 +214,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
|
||||
for cycle in range(2):
|
||||
logger.info("Running split-merge cycle #{}".format(cycle))
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Inserting data")
|
||||
# Initial average table size of (400k + metadata_overhead). Enough to trigger a few splits.
|
||||
@@ -248,7 +248,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
|
||||
|
||||
logger.info("Enabling balancing")
|
||||
# Now there's a split and migration need, so they'll potentially run concurrently.
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
topology_ops_task = asyncio.create_task(perform_topology_ops())
|
||||
|
||||
@@ -283,13 +283,13 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
|
||||
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
||||
|
||||
# To avoid race of major with migration
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Flushing keyspace and performing major")
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Waiting for merge decision")
|
||||
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
|
||||
@@ -457,7 +457,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
|
||||
cfg = {'force_capacity_based_balancing': True}
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -479,7 +479,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "merge_completion_fiber", one_shot=True)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "replica_merge_completion_wait", one_shot=True)
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
||||
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
||||
@@ -490,7 +490,7 @@ async def test_migration_running_concurrently_to_merge_completion_handling(manag
|
||||
|
||||
await wait_for(finished_merging, time.time() + 120)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "take_storage_snapshot", one_shot=True)
|
||||
|
||||
await s0_log.wait_for(f"merge_completion_fiber: waiting", from_mark=s0_mark)
|
||||
@@ -540,7 +540,7 @@ async def test_missing_data(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
inital_tablets = 32
|
||||
|
||||
@@ -560,7 +560,7 @@ async def test_missing_data(manager: ManagerClient):
|
||||
expected_tablet_count = inital_tablets // 2
|
||||
await cql.run_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': {expected_tablet_count}}}")
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
# wait for merge to complete
|
||||
actual_tablet_count = 0
|
||||
@@ -608,7 +608,7 @@ async def test_merge_with_drop(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
initial_tablets = 32
|
||||
|
||||
@@ -633,7 +633,7 @@ async def test_merge_with_drop(manager: ManagerClient):
|
||||
s0_log = await manager.server_open_log(server.server_id)
|
||||
s0_mark = await s0_log.mark()
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
# wait for merge to complete
|
||||
actual_tablet_count = 0
|
||||
|
||||
@@ -37,13 +37,12 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
||||
host = await manager.get_host_id(s.server_id)
|
||||
hosts_by_rack[rack].append(host)
|
||||
host_ids.append(host)
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server("r1")
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -125,13 +124,11 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
|
||||
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
await make_server("r1")
|
||||
await make_server("r2")
|
||||
await make_server("r3")
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -275,13 +272,13 @@ async def test_tablet_back_and_forth_migration(manager: ManagerClient):
|
||||
s = await manager.server_add(config=cfg)
|
||||
servers.append(s)
|
||||
host_ids.append(await manager.get_host_id(s.server_id))
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
|
||||
async def assert_rows(num):
|
||||
res = await cql.run_async(f"SELECT * FROM {ks}.test")
|
||||
assert len(res) == num
|
||||
|
||||
await make_server()
|
||||
await manager.disable_tablet_balancing()
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
@@ -321,7 +318,7 @@ async def test_staging_backlog_is_preserved_with_file_based_streaming(manager: M
|
||||
'error_injections_at_startup': ['view_update_generator_consume_staging_sstable']}
|
||||
servers = [await manager.server_add(config=cfg)]
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
||||
@@ -429,7 +426,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
|
||||
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
||||
@@ -470,7 +467,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Trigger tablet merge to reproduce #23481
|
||||
table_id = await manager.get_table_id(ks, 'test')
|
||||
@@ -490,7 +487,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
|
||||
|
||||
# Workaround for https://github.com/scylladb/scylladb/issues/21779. We don't want the keyspace drop at the end
|
||||
# of new_test_keyspace to fail because of concurrent tablet migrations.
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@@ -503,7 +500,7 @@ async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
|
||||
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
||||
|
||||
@@ -112,7 +112,7 @@ async def test_replace(manager: ManagerClient):
|
||||
|
||||
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
finish_writes = await start_writes(cql, ks3, "test2")
|
||||
|
||||
@@ -195,7 +195,7 @@ async def test_removenode(manager: ManagerClient):
|
||||
|
||||
# Disable migrations concurrent with removenode since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info('Removing a node')
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
@@ -244,7 +244,7 @@ async def test_removenode_with_ignored_node(manager: ManagerClient):
|
||||
|
||||
# Disable migrations concurrent with removenode since we don't handle nodes going down during migration yet.
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info('Removing a node with another node down')
|
||||
await manager.server_stop(servers[0].server_id) # removed
|
||||
|
||||
@@ -35,7 +35,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
|
||||
# Background migrations concurrent with node shutdown can cause background (after CL is met)
|
||||
# replica-side writes to fail due to barriers fencing out the node which is shutting down.
|
||||
# This trips check_node_log_for_failed_mutations().
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info(f"Restarting node {servers[0]}")
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
|
||||
@@ -42,7 +42,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
|
||||
# Background migrations concurrent with node shutdown can cause background (after CL is met)
|
||||
# replica-side writes to fail due to barriers fencing out the node which is shutting down.
|
||||
# This trips check_node_log_for_failed_mutations().
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info(f"Restarting node {servers[0]}")
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
|
||||
@@ -19,7 +19,7 @@ from cassandra.query import SimpleStatement
|
||||
from cassandra.protocol import InvalidRequest
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.cluster.test_view_building_coordinator import mark_all_servers, pause_view_building_tasks, \
|
||||
unpause_view_building_tasks, wait_for_some_view_build_tasks_to_get_stuck
|
||||
unpause_view_building_tasks, wait_for_some_view_build_tasks_to_get_stuck, disable_tablet_load_balancing_on_all_servers
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -626,7 +626,7 @@ async def test_view_build_status_marked_started_on_node_added_during_building(ma
|
||||
'--logger-log-level', 'view_building_worker=debug',
|
||||
])
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
||||
await create_table(cql, ks)
|
||||
|
||||
@@ -78,6 +78,10 @@ async def wait_for_message_on_any_server(manager: ManagerClient, message: str, m
|
||||
async def wait_for_some_view_build_tasks_to_get_stuck(manager: ManagerClient, marks: list[int]):
|
||||
return await wait_for_message_on_any_server(manager, "do_build_range: paused, waiting for message", marks)
|
||||
|
||||
async def disable_tablet_load_balancing_on_all_servers(manager: ManagerClient):
|
||||
servers = await manager.running_servers()
|
||||
await asyncio.gather(*(manager.api.disable_tablet_balancing(s.ip_addr) for s in servers))
|
||||
|
||||
async def populate_base_table(cql: Session, ks: str, tbl: str):
|
||||
for i in range(ROW_COUNT):
|
||||
await cql.run_async(f"INSERT INTO {ks}.{tbl} (key, c, v) VALUES ({i // ROWS_PER_PARTITION}, {i % ROWS_PER_PARTITION}, '{i}')")
|
||||
@@ -153,7 +157,7 @@ async def test_build_two_views(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
])
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -183,7 +187,7 @@ async def test_add_view_while_build_in_progress(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
])
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -213,7 +217,7 @@ async def test_remove_some_view_while_build_in_progress(manager: ManagerClient):
|
||||
node_count = 3
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -242,7 +246,7 @@ async def test_abort_building_by_remove_view(manager: ManagerClient):
|
||||
node_count = 3
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -270,7 +274,7 @@ async def test_alter_base_schema_while_build_in_progress(manager: ManagerClient,
|
||||
node_count = 3
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline_loggers)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -320,7 +324,7 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
|
||||
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
|
||||
property_file=property_file)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {old_rf}}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -358,7 +362,7 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
|
||||
property_file=property_file)
|
||||
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -404,7 +408,7 @@ async def test_leader_change_while_building(manager: ManagerClient):
|
||||
])
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -438,7 +442,7 @@ async def test_truncate_while_building(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
])
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
@@ -470,7 +474,7 @@ async def test_scylla_views_builds_in_progress(manager: ManagerClient, view_acti
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
])
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async def check_scylla_views_builds_in_progress(expect_zero_rows: bool):
|
||||
async def check():
|
||||
@@ -511,7 +515,7 @@ async def test_scylla_views_builds_in_progress(manager: ManagerClient, view_acti
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_view_building_while_tablet_streaming_fail(manager: ManagerClient):
|
||||
servers = [await manager.server_add(cmdline=cmdline_loggers)]
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
||||
@@ -608,7 +612,7 @@ async def test_concurrent_tablet_migrations(manager: ManagerClient):
|
||||
for property_file in rack_property_files:
|
||||
servers.append(await manager.server_add(property_file=property_file, cmdline=cmdline_loggers))
|
||||
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
await pause_view_building_tasks(manager)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1} AND tablets = {'initial': 2}") as ks:
|
||||
@@ -628,7 +632,7 @@ async def test_concurrent_tablet_migrations(manager: ManagerClient):
|
||||
servers.append(await manager.server_add(property_file=property_file, cmdline=cmdline_loggers))
|
||||
assert len(await get_nodes_which_are_tablet_replicas()) == 3
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# The effect of unpausing the balancer should be that all replicas are distributed evenly between nodes
|
||||
# (1 base + 1 view tablet for each node).
|
||||
@@ -680,7 +684,7 @@ async def test_file_streaming(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
])
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND tablets = {{'initial': 1}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key))")
|
||||
@@ -727,7 +731,7 @@ async def test_file_streaming(manager: ManagerClient):
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
# Add node3
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
new_server = await manager.server_add(cmdline=cmdline_loggers + ['--logger-log-level', 'view_update_generator=trace'], property_file={"dc": "dc1", "rack": "r1"})
|
||||
|
||||
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
||||
@@ -815,7 +819,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1,
|
||||
})
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key)) WITH tablets = {{'min_tablet_count': 2}}")
|
||||
@@ -864,7 +868,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
# Add node3
|
||||
await manager.disable_tablet_balancing()
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
new_server = await manager.server_add(cmdline=cmdline_loggers + ['--logger-log-level', 'view_update_generator=trace'], property_file={"dc": "dc1", "rack": "r1"}, config={
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1,
|
||||
})
|
||||
@@ -894,7 +898,7 @@ async def test_staging_sstables_with_tablet_merge(manager: ManagerClient):
|
||||
if new_tablet_count == expected_tablet_count:
|
||||
return True
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
assert await get_tablet_count(manager, servers[0], ks, "tab") == 2
|
||||
await cql.run_async(f"ALTER TABLE {ks}.tab WITH tablets = {{'min_tablet_count': 1}}")
|
||||
await wait_for(lambda: tablet_count_is(1), time.time() + 60)
|
||||
@@ -918,7 +922,7 @@ async def test_tablet_migration_during_view_building(manager: ManagerClient):
|
||||
node_count = 1
|
||||
server = new_server = await manager.server_add(cmdline=cmdline_loggers, property_file={"dc": "dc1", "rack": "r1"})
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c)) WITH tablets = {{'min_tablet_count': 1}}")
|
||||
@@ -953,7 +957,7 @@ async def test_tablet_merge_during_view_building(manager: ManagerClient):
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
])
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await manager.disable_tablet_balancing()
|
||||
await disable_tablet_load_balancing_on_all_servers(manager)
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets = {{'enabled': true}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c)) WITH tablets = {{'min_tablet_count': 2}}")
|
||||
@@ -972,7 +976,7 @@ async def test_tablet_merge_during_view_building(manager: ManagerClient):
|
||||
if new_tablet_count == expected_tablet_count:
|
||||
return True
|
||||
|
||||
await manager.enable_tablet_balancing()
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
assert await get_tablet_count(manager, servers[0], ks, "tab") == 2
|
||||
await cql.run_async(f"ALTER TABLE {ks}.tab WITH tablets = {{'min_tablet_count': 1}}")
|
||||
await wait_for(lambda: tablet_count_is(1), time.time() + 60)
|
||||
|
||||
@@ -336,29 +336,6 @@ class ManagerClient:
|
||||
logger.debug("ManagerClient stopping gracefully %s", server_id)
|
||||
await self.client.put_json(f"/cluster/server/{server_id}/stop_gracefully", timeout=timeout)
|
||||
|
||||
async def disable_tablet_balancing(self):
|
||||
"""
|
||||
Disables background tablet load-balancing.
|
||||
If there are already active migrations, it waits for them to finish before returning.
|
||||
Doesn't block migrations on behalf of node operations like decommission, removenode or replace.
|
||||
:return:
|
||||
"""
|
||||
servers = await self.running_servers()
|
||||
if not servers:
|
||||
raise Exception("No running servers")
|
||||
# Any server will do, it's a group0 operation
|
||||
await self.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async def enable_tablet_balancing(self):
|
||||
"""
|
||||
Enables background tablet load-balancing.
|
||||
"""
|
||||
servers = await self.running_servers()
|
||||
if not servers:
|
||||
raise Exception("No running servers")
|
||||
# Any server will do, it's a group0 operation
|
||||
await self.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
async def server_start(self,
|
||||
server_id: ServerNum,
|
||||
expected_error: str | None = None,
|
||||
|
||||
@@ -284,7 +284,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1,
|
||||
}
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
|
||||
await manager.disable_tablet_balancing()
|
||||
await asyncio.gather(*[manager.api.disable_tablet_balancing(server.ip_addr) for server in servers])
|
||||
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user