mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
test_tablets_lwt: add test_lwt_shutdown
This commit is contained in:
@@ -283,6 +283,8 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_
|
||||
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("paxos_state_learn_after_mutate", utils::wait_for_message(5min));
|
||||
|
||||
// We don't need to lock the partition key if there is no gap between loading paxos
|
||||
// state and saving it, and here we're just blindly updating.
|
||||
co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
|
||||
|
||||
@@ -7246,7 +7246,7 @@ future<> storage_proxy::abort_batch_writes() {
|
||||
|
||||
future<>
|
||||
storage_proxy::stop() {
|
||||
return make_ready_future<>();
|
||||
co_await utils::get_local_injector().inject("storage_proxy::stop", utils::wait_for_message(5min));
|
||||
}
|
||||
|
||||
locator::token_metadata_ptr storage_proxy::get_token_metadata_ptr() const noexcept {
|
||||
|
||||
@@ -830,3 +830,84 @@ async def test_lwts_for_special_tables(manager: ManagerClient):
|
||||
|
||||
with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on CDC log tables: {ks}.test_scylla_cdc_log")):
|
||||
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0xAB", consistency_level=ConsistencyLevel.SERIAL))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_lwt_shutdown(manager: ManagerClient):
|
||||
"""
|
||||
This is a regression test for #26355:
|
||||
* Start a cluster with two nodes (s0, s1) and a tablet-based table.
|
||||
* Inject `paxos_state_learn_after_mutate` on s0 for this table.
|
||||
* Inject `storage_proxy::stop` on s0 to ensure that s0 shutdown does not
|
||||
complete before the LWT write on s0 finishes.
|
||||
* Execute an LWT with `cl_learn=1` on s0 and wait for it to succeed.
|
||||
This LWT leaves a background write to s0 storage.
|
||||
* Begin s0 shutdown and wait until it reaches “storage proxy RPC verbs”,
|
||||
but not “paxos store”. In other words, storage_proxy::remote should start
|
||||
draining, but the paxos store should not yet be destroyed since it is still
|
||||
in use.
|
||||
* Release the `database_apply_wait` injection.
|
||||
* Verify that shutdown completes successfully.
|
||||
* After s0 restarts, a node-local read on s0 must return the updated value.
|
||||
"""
|
||||
cmdline = [
|
||||
'--logger-log-level', 'paxos=trace'
|
||||
]
|
||||
[s0, s1] = await manager.servers_add(2, cmdline=cmdline, property_file=[
|
||||
{'dc': 'my_dc', 'rack': 'r1'},
|
||||
{'dc': 'my_dc', 'rack': 'r2'}
|
||||
])
|
||||
(cql, [h0, _]) = await manager.get_ready_cql([s0, s1])
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)")
|
||||
|
||||
logger.info(f"Enable 'paxos_state_learn_after_mutate' injection on {s0.ip_addr}")
|
||||
await manager.api.enable_injection(s0.ip_addr, 'paxos_state_learn_after_mutate',
|
||||
False, parameters={'cf_name': 'test'})
|
||||
|
||||
logger.info("Run an LWT")
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, v) VALUES (1, 2) IF NOT EXISTS",
|
||||
consistency_level=ConsistencyLevel.ONE),
|
||||
host=h0)
|
||||
|
||||
logger.info("Open log")
|
||||
log = await manager.server_open_log(s0.server_id)
|
||||
logger.info("Wait for 'paxos_state_learn_after_mutate' injection")
|
||||
await log.wait_for('paxos_state_learn_after_mutate: waiting for message')
|
||||
|
||||
logger.info(f"Enable 'storage_proxy::stop' injection on {s0.ip_addr}")
|
||||
await manager.api.enable_injection(s0.ip_addr, 'storage_proxy::stop', True)
|
||||
|
||||
logger.info("Start node shutdown")
|
||||
stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
|
||||
await log.wait_for('Shutting down storage proxy RPC verbs')
|
||||
# assert len(await log.grep('Shutting down paxos store')) == 0
|
||||
|
||||
logger.info("Trigger storage_proxy::stop and database_apply_wait")
|
||||
await manager.api.message_injection(s0.ip_addr, "paxos_state_learn_after_mutate")
|
||||
await manager.api.message_injection(s0.ip_addr, 'storage_proxy::stop')
|
||||
|
||||
logger.info("Waiting for paxos store shutdown")
|
||||
await log.wait_for('Shutting down paxos store')
|
||||
|
||||
logger.info("Waiting for the node shutdown")
|
||||
await stop_task
|
||||
|
||||
logger.info("Restarting server")
|
||||
await manager.server_start(s0.server_id)
|
||||
|
||||
logger.info("Reconnecting the driver")
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logger.info("Wait for cql and get hosts")
|
||||
[h0, _] = await wait_for_cql_and_get_hosts(cql, [s0, s1], time.time() + 60)
|
||||
|
||||
logger.info("Run simple non-paxos read")
|
||||
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
||||
consistency_level=ConsistencyLevel.ONE),
|
||||
host=h0)
|
||||
assert len(rows) == 1
|
||||
row = rows[0]
|
||||
assert row.pk == 1
|
||||
assert row.v == 2
|
||||
|
||||
Reference in New Issue
Block a user