Compare commits
23 Commits
debug_form
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c575bbf1e8 | ||
|
|
7fdd650009 | ||
|
|
552a2d0995 | ||
|
|
73de865ca3 | ||
|
|
f988ec18cb | ||
|
|
cd1679934c | ||
|
|
d52fbf7ada | ||
|
|
141aa2d696 | ||
|
|
c670183be8 | ||
|
|
e639dcda0b | ||
|
|
503a6e2d7e | ||
|
|
0f02c0d6fa | ||
|
|
4fead4baae | ||
|
|
ffd58ca1f0 | ||
|
|
f6fd3bbea0 | ||
|
|
148217bed6 | ||
|
|
2b472fe7fd | ||
|
|
ae12c712ce | ||
|
|
dd446aa442 | ||
|
|
dea79b09a9 | ||
|
|
3d04fd1d13 | ||
|
|
f5438e0587 | ||
|
|
f6ab576ed9 |
@@ -42,7 +42,14 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
|
||||
|
||||
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
|
||||
const auto replication_factor = erm.get_replication_factor();
|
||||
if (read_replicas.size() > replication_factor) {
|
||||
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
|
||||
if (read_replicas.size() > replication_factor + 1) {
|
||||
return seastar::format(
|
||||
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
|
||||
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
|
||||
read_replicas.size(), replication_factor);
|
||||
}
|
||||
} else if (read_replicas.size() > replication_factor) {
|
||||
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
|
||||
}
|
||||
return {};
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
|
||||
size 6502668
|
||||
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
|
||||
size 6598648
|
||||
|
||||
@@ -87,6 +87,11 @@ target_include_directories(wasmtime_bindings
|
||||
target_link_libraries(wasmtime_bindings
|
||||
INTERFACE Rust::rust_combined)
|
||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
|
||||
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
|
||||
# this PCH must have matching compile options, otherwise the compiler rejects
|
||||
# the PCH due to flag mismatch (e.g., -fsanitize=address).
|
||||
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
|
||||
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
|
||||
endif()
|
||||
|
||||
@@ -108,5 +113,6 @@ target_include_directories(inc
|
||||
target_link_libraries(inc
|
||||
INTERFACE Rust::rust_combined)
|
||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||
target_link_libraries(inc PRIVATE Seastar::seastar)
|
||||
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
|
||||
endif()
|
||||
|
||||
2
test.py
2
test.py
@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
help="Run only tests for given build mode(s)")
|
||||
parser.add_argument('--repeat', action="store", default="1", type=int,
|
||||
help="number of times to repeat test execution")
|
||||
parser.add_argument('--timeout', action="store", default="24000", type=int,
|
||||
parser.add_argument('--timeout', action="store", default="3600", type=int,
|
||||
help="timeout value for single test execution")
|
||||
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
|
||||
help="timeout value for test.py/pytest session execution")
|
||||
|
||||
@@ -511,8 +511,7 @@ class AuditBackendComposite(AuditBackend):
|
||||
return rows_dict
|
||||
|
||||
|
||||
@pytest.mark.single_node
|
||||
class TestCQLAudit(AuditTester):
|
||||
class CQLAuditTester(AuditTester):
|
||||
"""
|
||||
Make sure CQL statements are audited
|
||||
"""
|
||||
@@ -1763,7 +1762,7 @@ class TestCQLAudit(AuditTester):
|
||||
|
||||
async def test_audit_table_noauth(manager: ManagerClient):
|
||||
"""Table backend, no auth, single node — groups all tests that share this config."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
await t.test_using_non_existent_keyspace(AuditBackendTable)
|
||||
await t.test_audit_keyspace(AuditBackendTable)
|
||||
await t.test_audit_keyspace_extra_parameter(AuditBackendTable)
|
||||
@@ -1787,7 +1786,7 @@ async def test_audit_table_noauth(manager: ManagerClient):
|
||||
|
||||
async def test_audit_table_auth(manager: ManagerClient):
|
||||
"""Table backend, auth enabled, single node."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
await t.test_user_password_masking(AuditBackendTable)
|
||||
await t.test_negative_audit_records_auth()
|
||||
await t.test_negative_audit_records_admin()
|
||||
@@ -1803,7 +1802,7 @@ async def test_audit_table_auth(manager: ManagerClient):
|
||||
|
||||
async def test_audit_table_auth_multinode(manager: ManagerClient):
|
||||
"""Table backend, auth enabled, multi-node (rf=3)."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
await t.test_negative_audit_records_ddl()
|
||||
|
||||
|
||||
@@ -1811,49 +1810,49 @@ async def test_audit_table_auth_multinode(manager: ManagerClient):
|
||||
|
||||
async def test_audit_type_none_standalone(manager: ManagerClient):
|
||||
"""audit=None — verify no auditing occurs."""
|
||||
await TestCQLAudit(manager).test_audit_type_none()
|
||||
await CQLAuditTester(manager).test_audit_type_none()
|
||||
|
||||
|
||||
async def test_audit_type_invalid_standalone(manager: ManagerClient):
|
||||
"""audit=invalid — server should fail to start."""
|
||||
await TestCQLAudit(manager).test_audit_type_invalid()
|
||||
await CQLAuditTester(manager).test_audit_type_invalid()
|
||||
|
||||
|
||||
async def test_composite_audit_type_invalid_standalone(manager: ManagerClient):
|
||||
"""audit=table,syslog,invalid — server should fail to start."""
|
||||
await TestCQLAudit(manager).test_composite_audit_type_invalid()
|
||||
await CQLAuditTester(manager).test_composite_audit_type_invalid()
|
||||
|
||||
|
||||
async def test_audit_empty_settings_standalone(manager: ManagerClient):
|
||||
"""audit=none — verify no auditing occurs."""
|
||||
await TestCQLAudit(manager).test_audit_empty_settings()
|
||||
await CQLAuditTester(manager).test_audit_empty_settings()
|
||||
|
||||
|
||||
async def test_composite_audit_empty_settings_standalone(manager: ManagerClient):
|
||||
"""audit=table,syslog,none — verify no auditing occurs."""
|
||||
await TestCQLAudit(manager).test_composite_audit_empty_settings()
|
||||
await CQLAuditTester(manager).test_composite_audit_empty_settings()
|
||||
|
||||
|
||||
async def test_audit_categories_invalid_standalone(manager: ManagerClient):
|
||||
"""Invalid audit_categories — server should fail to start."""
|
||||
await TestCQLAudit(manager).test_audit_categories_invalid()
|
||||
await CQLAuditTester(manager).test_audit_categories_invalid()
|
||||
|
||||
|
||||
async def test_insert_failure_standalone(manager: ManagerClient):
|
||||
"""7-node topology, audit=table, no auth — standalone due to unique topology."""
|
||||
await TestCQLAudit(manager).test_insert_failure_doesnt_report_success()
|
||||
await CQLAuditTester(manager).test_insert_failure_doesnt_report_success()
|
||||
|
||||
|
||||
async def test_service_level_statements_standalone(manager: ManagerClient):
|
||||
"""audit=table, auth, cmdline=--smp 1 — standalone due to special cmdline."""
|
||||
await TestCQLAudit(manager).test_service_level_statements()
|
||||
await CQLAuditTester(manager).test_service_level_statements()
|
||||
|
||||
|
||||
# AuditBackendSyslog, no auth, rf=1
|
||||
|
||||
async def test_audit_syslog_noauth(manager: ManagerClient):
|
||||
"""Syslog backend, no auth, single node."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||
await t.test_using_non_existent_keyspace(Syslog)
|
||||
await t.test_audit_keyspace(Syslog)
|
||||
@@ -1870,7 +1869,7 @@ async def test_audit_syslog_noauth(manager: ManagerClient):
|
||||
|
||||
async def test_audit_syslog_auth(manager: ManagerClient):
|
||||
"""Syslog backend, auth enabled, single node."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||
await t.test_user_password_masking(Syslog)
|
||||
await t.test_role_password_masking(Syslog)
|
||||
@@ -1881,7 +1880,7 @@ async def test_audit_syslog_auth(manager: ManagerClient):
|
||||
|
||||
async def test_audit_composite_noauth(manager: ManagerClient):
|
||||
"""Composite backend (table+syslog), no auth, single node."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
||||
await t.test_using_non_existent_keyspace(Composite)
|
||||
await t.test_audit_keyspace(Composite)
|
||||
@@ -1898,7 +1897,7 @@ async def test_audit_composite_noauth(manager: ManagerClient):
|
||||
|
||||
async def test_audit_composite_auth(manager: ManagerClient):
|
||||
"""Composite backend (table+syslog), auth enabled, single node."""
|
||||
t = TestCQLAudit(manager)
|
||||
t = CQLAuditTester(manager)
|
||||
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
|
||||
await t.test_user_password_masking(Composite)
|
||||
await t.test_role_password_masking(Composite)
|
||||
@@ -1910,29 +1909,29 @@ _composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_
|
||||
|
||||
|
||||
@pytest.mark.parametrize("helper_class,config_changer", [
|
||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
|
||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
|
||||
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
|
||||
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
|
||||
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
|
||||
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
|
||||
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
|
||||
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
|
||||
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
|
||||
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
|
||||
])
|
||||
async def test_config_no_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
||||
"""Non-live audit config params (audit, audit_unix_socket_path, audit_syslog_write_buffer_size) must be unmodifiable."""
|
||||
await TestCQLAudit(manager).test_config_no_liveupdate(helper_class, config_changer)
|
||||
await CQLAuditTester(manager).test_config_no_liveupdate(helper_class, config_changer)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("helper_class,config_changer", [
|
||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
|
||||
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
|
||||
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
|
||||
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
|
||||
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
|
||||
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
|
||||
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
|
||||
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
|
||||
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
|
||||
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
|
||||
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
|
||||
])
|
||||
async def test_config_liveupdate(manager: ManagerClient, helper_class, config_changer):
|
||||
"""Live-updatable audit config params (categories, keyspaces, tables) must be modifiable at runtime."""
|
||||
await TestCQLAudit(manager).test_config_liveupdate(helper_class, config_changer)
|
||||
await CQLAuditTester(manager).test_config_liveupdate(helper_class, config_changer)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("helper_class", [
|
||||
@@ -1942,4 +1941,4 @@ async def test_config_liveupdate(manager: ManagerClient, helper_class, config_ch
|
||||
])
|
||||
async def test_parallel_syslog_audit(manager: ManagerClient, helper_class):
|
||||
"""Cluster must not fail when multiple queries are audited in parallel."""
|
||||
await TestCQLAudit(manager).test_parallel_syslog_audit(helper_class)
|
||||
await CQLAuditTester(manager).test_parallel_syslog_audit(helper_class)
|
||||
|
||||
@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
|
||||
from test.pylib.tablets import get_tablet_replicas
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.util import gather_safely, wait_for
|
||||
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -51,28 +51,42 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
|
||||
node_count = 2
|
||||
servers = await manager.servers_add(node_count)
|
||||
cmdline = ["--logger-log-level", "hints_manager=trace"]
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||
|
||||
async def wait_for_hints_written(min_hint_count: int, timeout: int):
|
||||
async def aux():
|
||||
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
if hints_written >= min_hint_count:
|
||||
return True
|
||||
return None
|
||||
assert await wait_for(aux, time.time() + timeout)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
||||
uses_tablets = await keyspace_has_tablets(manager, ks)
|
||||
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
|
||||
# Otherwise, it could happen that all mutations would target servers[0] only, which would
|
||||
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
|
||||
# distributed more or less uniformly!
|
||||
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
|
||||
stmt.consistency_level = ConsistencyLevel.ANY
|
||||
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
for i in range(100):
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
|
||||
|
||||
# Verify hints are written
|
||||
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
assert hints_after > hints_before
|
||||
# Verify hints are written
|
||||
await wait_for_hints_written(hints_before + 1, timeout=60)
|
||||
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||
|
||||
@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
|
||||
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
|
||||
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
||||
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
|
||||
repaired_keys = set(range(0, nr_keys))
|
||||
unrepaired_keys = set()
|
||||
@@ -164,7 +164,7 @@ async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
|
||||
await insert_keys(cql, ks, 0, 100)
|
||||
|
||||
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
|
||||
|
||||
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
|
||||
token = -1
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# Disable autocompaction
|
||||
for server in servers:
|
||||
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
|
||||
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
|
||||
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
|
||||
nr_keys = 100
|
||||
cmdline = ["--hinted-handoff-enabled", "0"]
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
|
||||
nr_keys = 100
|
||||
# Make sure no data commit log replay after force server stop
|
||||
cmdline = ['--enable-commitlog', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
token = -1
|
||||
|
||||
sstables_repaired_at = 0
|
||||
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
time1 = 0
|
||||
time2 = 0
|
||||
|
||||
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
|
||||
@@ -20,6 +20,7 @@ from cassandra.query import SimpleStatement
|
||||
from test.pylib.async_cql import _wrap_future
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables, TextType, Column
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import unique_name
|
||||
from test.cluster.conftest import cluster_con
|
||||
|
||||
@@ -403,6 +404,7 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
|
||||
for task in [*valid_keyspaces, *invalid_keyspaces]:
|
||||
_ = tg.create_task(task)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
|
||||
"""
|
||||
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
|
||||
@@ -464,22 +466,50 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
|
||||
for rfs, tablets in valid_keyspaces:
|
||||
_ = tg.create_task(create_keyspace(rfs, tablets))
|
||||
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
|
||||
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
|
||||
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
|
||||
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
|
||||
running_servers = await manager.running_servers()
|
||||
should_start = s1.server_id not in [server.server_id for server in running_servers]
|
||||
if should_start:
|
||||
await manager.server_start(s1.server_id)
|
||||
|
||||
ks = await create_keyspace(rfs, True)
|
||||
# We need to wait for the new schema to propagate.
|
||||
# Otherwise, it's not clear when the mutation
|
||||
# corresponding to the created keyspace will
|
||||
# arrive at server 1.
|
||||
# It could happen only after the node performs
|
||||
# the check upon start-up, effectively leading
|
||||
# to a successful start-up, which we don't want.
|
||||
# For more context, see issue: SCYLLADB-1137.
|
||||
await read_barrier(manager.api, s1.ip_addr)
|
||||
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
|
||||
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
|
||||
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
|
||||
_ = await manager.server_start(s1.server_id, expected_error=err)
|
||||
await manager.server_start(s1.server_id, expected_error=err)
|
||||
await cql.run_async(f"DROP KEYSPACE {ks}")
|
||||
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
|
||||
|
||||
# Test RF-rack-invalid keyspaces.
|
||||
await try_fail([2, 0], "dc1", 2, 3)
|
||||
await try_fail([3, 2], "dc2", 2, 1)
|
||||
await try_fail([4, 1], "dc1", 4, 3)
|
||||
|
||||
_ = await manager.server_start(s1.server_id)
|
||||
# We need to perform a read barrier on the node to make
|
||||
# sure that it processes the last DROP KEYSPACE.
|
||||
# Otherwise, the node could think the RF-rack-invalid
|
||||
# keyspace still exists.
|
||||
await manager.server_start(s1.server_id)
|
||||
await read_barrier(manager.api, s1.ip_addr)
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
await manager.server_start(s1.server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):
|
||||
|
||||
@@ -23,10 +23,25 @@ from test.cluster.object_store.conftest import format_tuples
|
||||
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import unique_name
|
||||
from test.pylib.util import unique_name, wait_for
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
|
||||
'''
|
||||
Wait until the upload directory is empty with a timeout.
|
||||
SSTable unlinking is asynchronous and in rare situations, it can happen
|
||||
that not all sstables are deleted from the upload dir immediately after refresh is done.
|
||||
'''
|
||||
deadline = time.time() + timeout
|
||||
async def check_empty():
|
||||
files = os.listdir(upload_dir)
|
||||
if not files:
|
||||
return True
|
||||
return None
|
||||
await wait_for(check_empty, deadline, period=0.5)
|
||||
|
||||
class SSTablesOnLocalStorage:
|
||||
def __init__(self):
|
||||
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
||||
@@ -153,7 +168,8 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
for s in servers:
|
||||
cf_dir = dirs[s.server_id]["cf_dir"]
|
||||
files = os.listdir(os.path.join(cf_dir, 'upload'))
|
||||
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
|
||||
upload_dir = os.path.join(cf_dir, 'upload')
|
||||
assert os.path.exists(upload_dir)
|
||||
await wait_for_upload_dir_empty(upload_dir)
|
||||
|
||||
shutil.rmtree(tmpbackup)
|
||||
|
||||
@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
tombstone_mark = datetime.now(timezone.utc)
|
||||
|
||||
# test #2: the tombstones are not cleaned up when one node is down
|
||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
||||
with pytest.raises(AssertionError, match="timed out"):
|
||||
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
|
||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||
|
||||
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
|
||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
||||
with pytest.raises(AssertionError, match="timed out"):
|
||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||
|
||||
await manager.remove_node(servers[0].server_id, down_server.server_id)
|
||||
|
||||
@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
|
||||
unpublished_generations = topo_res[0].unpublished_cdc_generations
|
||||
return unpublished_generations is None or len(unpublished_generations) == 0 or None
|
||||
|
||||
await wait_for(all_generations_published, deadline=deadline, period=1.0)
|
||||
await wait_for(all_generations_published, deadline=deadline)
|
||||
|
||||
|
||||
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
|
||||
@@ -470,6 +470,17 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
|
||||
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
|
||||
|
||||
|
||||
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
|
||||
"""
|
||||
Checks whether the given keyspace uses tablets.
|
||||
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
|
||||
"""
|
||||
cql = manager.get_cql()
|
||||
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
|
||||
rows = list(rows_iter)
|
||||
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
|
||||
|
||||
|
||||
async def get_raft_log_size(cql, host) -> int:
|
||||
query = "select count(\"index\") from system.raft"
|
||||
return (await cql.run_async(query, host=host))[0][0]
|
||||
|
||||
@@ -271,10 +271,21 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
|
||||
// arbitrary timeout of 120s for the server to make some output. Very generous.
|
||||
// but since we (maybe) run docker, and might need to pull image, this can take
|
||||
// some time if we're unlucky.
|
||||
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
||||
} catch (in_use&) {
|
||||
retry = true;
|
||||
p = std::current_exception();
|
||||
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
||||
for (auto* f : {&f1, &f2}) {
|
||||
if (f->failed()) {
|
||||
try {
|
||||
f->get();
|
||||
} catch (in_use&) {
|
||||
retry = true;
|
||||
p = std::current_exception();
|
||||
} catch (...) {
|
||||
if (!p) {
|
||||
p = std::current_exception();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
p = std::current_exception();
|
||||
}
|
||||
|
||||
@@ -56,15 +56,25 @@ def unique_name(unique_name_prefix = 'test_'):
|
||||
async def wait_for(
|
||||
pred: Callable[[], Awaitable[Optional[T]]],
|
||||
deadline: float,
|
||||
period: float = 1,
|
||||
period: float = 0.1,
|
||||
before_retry: Optional[Callable[[], Any]] = None,
|
||||
backoff_factor: float = 1,
|
||||
max_period: float = None) -> T:
|
||||
backoff_factor: float = 1.5,
|
||||
max_period: float = 1.0,
|
||||
label: Optional[str] = None) -> T:
|
||||
tag = label or getattr(pred, '__name__', 'unlabeled')
|
||||
start = time.time()
|
||||
retries = 0
|
||||
while True:
|
||||
assert(time.time() < deadline), "Deadline exceeded, failing test."
|
||||
elapsed = time.time() - start
|
||||
assert time.time() < deadline, \
|
||||
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
res = await pred()
|
||||
if res is not None:
|
||||
if retries > 0:
|
||||
logger.debug(f"wait_for({tag}) completed "
|
||||
f"in {elapsed:.2f}s ({retries} retries)")
|
||||
return res
|
||||
retries += 1
|
||||
await asyncio.sleep(period)
|
||||
period *= backoff_factor
|
||||
if max_period is not None:
|
||||
@@ -273,14 +283,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
|
||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||
return done[0][0] == node_count or None
|
||||
deadline = time.time() + timeout
|
||||
await wait_for(view_is_built, deadline)
|
||||
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
|
||||
|
||||
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
|
||||
async def view_is_built():
|
||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||
return done[0][0] == node_count or None
|
||||
deadline = time.time() + timeout
|
||||
await wait_for(view_is_built, deadline)
|
||||
await wait_for(view_is_built, deadline, label=f"view_{name}")
|
||||
|
||||
|
||||
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):
|
||||
|
||||
Reference in New Issue
Block a user