diff --git a/service/strong_consistency/coordinator.cc b/service/strong_consistency/coordinator.cc index 3350afa53b..4d5e5a9580 100644 --- a/service/strong_consistency/coordinator.cc +++ b/service/strong_consistency/coordinator.cc @@ -339,14 +339,7 @@ future> coordinator::mutate(schema_ptr schema, co_return std::monostate{}; } catch (...) { auto ex = std::current_exception(); - if (try_catch(ex)) { - // Holding raft_server.holder guarantees that the raft::server is not - // aborted until the holder is released. - - on_internal_error(logger, - format("mutate(): add_entry, unexpected exception {}, table {}.{}, tablet {}, term {}", - ex, schema->ks_name(), schema->cf_name(), op.tablet_id, term)); - } else if (try_catch(ex) || try_catch(ex)) { + if (try_catch(ex) || try_catch(ex)) { logger.debug("mutate(): add_entry, got retriable error {}, table {}.{}, tablet {}, term {}", ex, schema->ks_name(), schema->cf_name(), op.tablet_id, term); @@ -379,10 +372,16 @@ future> coordinator::mutate(schema_ptr schema, // * seastar::abort_requested_exception: Can be thrown by create_operation_ctx. // * timed_out_error: Can be thrown by the abort_on_expiry. // * condition_variable_timed_out: Can be thrown by begin_mutate. + // * raft::stopped_error: The raft server was aborted (e.g. table being dropped). // // We handle them collectively here. if (try_catch(ex) || try_catch(ex) - || try_catch(ex) || try_catch(ex)) { + || try_catch(ex) || try_catch(ex) + || try_catch(ex)) { + if (!_db.column_family_exists(schema->id())) { + co_return coroutine::return_exception( + replica::no_such_column_family(schema->ks_name(), schema->cf_name())); + } logger.trace("mutate(): request timed out with error {}, table {}.{}, token {}", ex, schema->ks_name(), schema->cf_name(), token); ++_stats.write_errors_timeout; @@ -466,10 +465,16 @@ auto coordinator::query(schema_ptr schema, // * seastar::abort_requested_exception: Can be thrown by create_operation_ctx. // * timed_out_error: Can be thrown by the abort_on_expiry. // * seastar::condition_variable_timed_out: Can be thrown by begin_read's wait_for_leader. + // * raft::stopped_error: The raft server was aborted (e.g. table being dropped). // // We handle them collectively here. if (try_catch(ex) || try_catch(ex) - || try_catch(ex) || try_catch(ex)) { + || try_catch(ex) || try_catch(ex) + || try_catch(ex)) { + if (!_db.column_family_exists(schema->id())) { + co_return coroutine::return_exception( + replica::no_such_column_family(schema->ks_name(), schema->cf_name())); + } logger.trace("query(): request timed out with error {}, table {}.{}, read cmd {}", ex, schema->ks_name(), schema->cf_name(), cmd); ++_stats.read_errors_timeout; diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index 0083ea0dec..2c9db7c301 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -184,12 +184,21 @@ void groups_manager::schedule_raft_group_deletion(raft::group_id id, raft_group_ co_await state.server_control_op.get_future(); logger.debug("schedule_raft_group_deletion(): group id {}: starting", id); - co_await g->close(); - logger.debug("schedule_raft_group_deletion(): group id {}: gate closed", id); + // Initiate gate close, then abort the raft server, then wait for the + // gate. Gate close alone would block until all holders are released, + // but in-flight writes may be stuck in add_entry waiting for quorum + // that will never come (other nodes already destroyed their servers). + // Aborting the server causes those stuck operations to throw + // raft::stopped_error, releasing their holders and unblocking the gate. + auto gate_fut = g->close(); + logger.debug("schedule_raft_group_deletion(): group id {}: gate close initiated", id); co_await _raft_gr.abort_server(id); logger.debug("schedule_raft_group_deletion(): group id {}: server aborted", id); + co_await std::move(gate_fut); + logger.debug("schedule_raft_group_deletion(): group id {}: gate closed", id); + co_await std::move(state.leader_info_updater); _raft_gr.destroy_server(id); @@ -400,7 +409,12 @@ void groups_manager::update(token_metadata_ptr new_tm) { auto srv = raft_server(state, state.gate->hold()); auto res = srv.begin_mutate(aoe.abort_source()); if (auto w = get_if(&res)) { - co_await std::move(w->future); + auto f = co_await coroutine::as_future(std::move(w->future)); + if (f.failed()) { + logger.warn("update(): waiting for leader timed out for tablet {}, " + "group id {}: {}", tablet, id, f.get_exception()); + break; + } } else { break; } @@ -429,21 +443,27 @@ future groups_manager::acquire_server(table_id table_id, raft::grou // lw_shared_ptr, so that the table is dropped but the table object // is still alive. // - // Check that the table still exists in the database to turn the - // fatal on_internal_error below into a clean no_such_column_family - // exception. - // - // When the table does exist, we proceed to acquire state.gate->hold(). - // This prevents schedule_raft_group_deletion (which co_awaits gate::close) - // from erasing the group until the DML operation completes. - _db.find_column_family(table_id); + // Check that the table still exists. The table is removed from the + // database (via schema_applier::commit_tables_and_views) BEFORE + // groups_manager::update() is called (which triggers gate closure via + // schedule_raft_group_deletion). Since there's no scheduling point + // between the column_family_exists check and try_hold below, the gate + // cannot be closed if the table exists. + if (!_db.column_family_exists(table_id)) { + return make_exception_future( + replica::no_such_column_family(table_id)); + } const auto it = _raft_groups.find(group_id); if (it == _raft_groups.end()) { on_internal_error(logger, format("raft group {} not found", group_id)); } auto& state = it->second; - return state.server_control_op.get_future(as).then([&state, h = state.gate->hold()] mutable { + auto h = state.gate->try_hold(); + if (!h) { + on_internal_error(logger, format("acquire_server: gate closed for group {} while table {} exists", group_id, table_id)); + } + return state.server_control_op.get_future(as).then([&state, h = std::move(*h)] mutable { return raft_server(state, std::move(h)); }); } diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 0dfb0ae681..c24c31f687 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -5,6 +5,7 @@ # from typing import Tuple +import re from test.pylib.manager_client import ManagerClient from test.pylib.util import gather_safely, wait_for, Host @@ -945,75 +946,110 @@ async def test_timed_out_queries(manager: ManagerClient): @pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode") async def test_queries_while_dropping_table(manager: ManagerClient): - """ - A simple test that verifies that ongoing reads and writes are not interrupted - by a tablet migration. We simulate the process by dropping a table. + """Verify that in-flight reads and writes are promptly aborted when + a strongly consistent table is dropped. - We verify that: - - New reads and writes are rejected with a proper error. - - Ongoing reads and writes are not interrupted by a tablet migration - (which boils down to successful executions of Raft methods). - - Ongoing reads can still eventually observe that the table has - been dropped. - - Ongoing writes should also finish successfully if they've reached - the strongly consistent coordinator. + Setup: 2 nodes, RF=2, 1 tablet (raft quorum = 2). + + We pause a read (before read_barrier) and a write (before add_entry) + on the leader, then drop the table. The follower destroys its raft group + immediately (no in-flight ops). On the leader, the raft server is aborted + as part of group deletion (SCYLLADB-2080 fix), causing the paused + operations to fail with raft::stopped_error which the coordinator converts + to "no such column family" / "unconfigured table". + + We also verify that new reads/writes after the drop are rejected + immediately. """ - s1 = await manager.server_add(config=DEFAULT_CONFIG, cmdline=DEFAULT_CMDLINE) - cql, _ = await manager.get_ready_cql([s1]) + servers = await manager.servers_add(2, config=DEFAULT_CONFIG, cmdline=DEFAULT_CMDLINE, auto_rack_dc='my_dc') + cql, hosts = await manager.get_ready_cql(servers) + host_ids = [await manager.get_host_id(s.server_id) for s in servers] - log = await manager.server_open_log(s1.server_id) - mark = await log.mark() - - async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1} AND consistency = 'global'") as ks: - table_name = "my_table" + async with new_test_keyspace(manager, + "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}" + " AND tablets = {'initial': 1} AND consistency = 'global'" + ) as ks: + table_name = "tbl" table = f"{ks}.{table_name}" - await cql.run_async(f"CREATE TABLE {table} (pk int PRIMARY KEY, v int)") await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (0, 13)") - await gather_safely(*[ - manager.api.enable_injection(s1.ip_addr, "sc_coordinator_wait_before_query_read_barrier", one_shot=True), - manager.api.enable_injection(s1.ip_addr, "sc_coordinator_wait_before_add_entry", one_shot=True) - ]) + # Identify the leader node for this tablet's raft group. + group_id = await get_table_raft_group_id(manager, ks, table_name) + leader_host_id = await wait_for_leader(manager, servers[0], group_id) - read_fut = cql.run_async(f"SELECT * FROM {table} WHERE pk = 0") - write_fut = cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (7, 17)") + leader_idx = next(i for i, hid in enumerate(host_ids) if str(hid) == leader_host_id) + leader_server = servers[leader_idx] + leader_host = hosts[leader_idx] + follower_server = servers[1 - leader_idx] - await asyncio.gather(*[ - asyncio.create_task(log.wait_for("sc_coordinator_wait_before_query_read_barrier", from_mark=mark)), - asyncio.create_task(log.wait_for("sc_coordinator_wait_before_add_entry", from_mark=mark)) - ]) + leader_log = await manager.server_open_log(leader_server.server_id) + follower_log = await manager.server_open_log(follower_server.server_id) - mark = await log.mark() + mark_leader = await leader_log.mark() + # Pause both a read and a write on the leader. + await asyncio.gather( + manager.api.enable_injection(leader_server.ip_addr, + "sc_coordinator_wait_before_query_read_barrier", one_shot=True), + manager.api.enable_injection(leader_server.ip_addr, + "sc_coordinator_wait_before_add_entry", one_shot=True)) + + read_fut = asyncio.ensure_future( + cql.run_async(f"SELECT * FROM {table} WHERE pk = 0", host=leader_host)) + write_fut = asyncio.ensure_future( + cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (7, 17)", host=leader_host)) + + # Wait for both to hit their injection points. + await asyncio.gather( + leader_log.wait_for("sc_coordinator_wait_before_query_read_barrier: waiting", + from_mark=mark_leader, timeout=30), + leader_log.wait_for("sc_coordinator_wait_before_add_entry: waiting", + from_mark=mark_leader, timeout=30)) + + mark_leader = await leader_log.mark() + mark_follower = await follower_log.mark() + + # Drop the table. await cql.run_async(f"DROP TABLE {table}") - # Sanity check: The table was really dropped and we can no longer read or write to it. + # Sanity check: new queries are rejected immediately. with pytest.raises(InvalidRequest, match="unconfigured table"): await cql.run_async(f"SELECT * FROM {table} WHERE pk = 0") with pytest.raises(InvalidRequest, match="unconfigured table"): await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (13, 11)") - # Wait for the Raft group to be removed. This should happen almost immediately - # after dropping the table, but let's avoid any potential cause of flakiness. - await log.wait_for(r"schedule_raft_group_deletion\(\): group id \S+: starting", from_mark=mark) + # Confirm the follower destroyed its raft group. + await follower_log.wait_for("schedule_raft_group_deletion.*raft server.*is destroyed", + from_mark=mark_follower, timeout=30) - await asyncio.gather(*[ - asyncio.create_task(manager.api.message_injection(s1.ip_addr, "sc_coordinator_wait_before_query_read_barrier")), - asyncio.create_task(manager.api.message_injection(s1.ip_addr, "sc_coordinator_wait_before_add_entry")) - ]) + # Resume both operations. + await asyncio.gather( + manager.api.message_injection(leader_server.ip_addr, + "sc_coordinator_wait_before_query_read_barrier"), + manager.api.message_injection(leader_server.ip_addr, + "sc_coordinator_wait_before_add_entry")) - # Make sure that the read noticed that the table had been dropped. - with pytest.raises(Exception, match="Can't find a column family"): + # Both should fail with "no such column family" / "unconfigured table". + # The raft server is aborted as part of group deletion, causing + # stopped_error which the coordinator converts to no_such_column_family. + # Use a timeout to detect the old buggy behavior (write stuck forever). + try: + await asyncio.wait_for(asyncio.shield(write_fut), timeout=15) + except asyncio.TimeoutError: + cql.cluster.shutdown() + for s in servers: + await manager.server_stop(s.server_id, convict=True) + pytest.fail("SCYLLADB-2080: write is stuck waiting for quorum that can never " + "be reached because the follower's raft group was already destroyed. " + "The leader should abort the raft server to unblock the write.") + except InvalidRequest as e: + assert re.search(r"[Uu]nconfigured table|[Nn]o such column family", str(e)), \ + f"Expected 'unconfigured table' or 'no such column family', got: {e}" + + with pytest.raises((InvalidRequest, Exception), match="[Uu]nconfigured table|[Cc]an't find a column family|[Nn]o such column family"): await read_fut - # The groups manager waits for all ongoing queries to finish before - # it aborts the Raft server. Thanks to this, the write will succeed. - # No matter if we get a `no_such_column_family` exception when later - # applying the mutation in the state machine or not, there will be - # no exception thrown, so the write will still look like a success. - # And that's the desired behavior. - await write_fut @pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")