diff --git a/service/strong_consistency/coordinator.cc b/service/strong_consistency/coordinator.cc index 5546e59289..4d62f33758 100644 --- a/service/strong_consistency/coordinator.cc +++ b/service/strong_consistency/coordinator.cc @@ -339,14 +339,7 @@ future> coordinator::mutate(schema_ptr schema, co_return std::monostate{}; } catch (...) { auto ex = std::current_exception(); - if (try_catch(ex)) { - // Holding raft_server.holder guarantees that the raft::server is not - // aborted until the holder is released. - - on_internal_error(logger, - format("mutate(): add_entry, unexpected exception {}, table {}.{}, tablet {}, term {}", - ex, schema->ks_name(), schema->cf_name(), op.tablet_id, term)); - } else if (try_catch(ex) || try_catch(ex)) { + if (try_catch(ex) || try_catch(ex)) { logger.debug("mutate(): add_entry, got retriable error {}, table {}.{}, tablet {}, term {}", ex, schema->ks_name(), schema->cf_name(), op.tablet_id, term); @@ -379,10 +372,16 @@ future> coordinator::mutate(schema_ptr schema, // * seastar::abort_requested_exception: Can be thrown by create_operation_ctx. // * timed_out_error: Can be thrown by the abort_on_expiry. // * condition_variable_timed_out: Can be thrown by begin_mutate. + // * raft::stopped_error: The raft server was aborted (e.g. table being dropped). // // We handle them collectively here. if (try_catch(ex) || try_catch(ex) - || try_catch(ex) || try_catch(ex)) { + || try_catch(ex) || try_catch(ex) + || try_catch(ex)) { + if (!_db.column_family_exists(schema->id())) { + co_return coroutine::return_exception( + replica::no_such_column_family(schema->ks_name(), schema->cf_name())); + } logger.trace("mutate(): request timed out with error {}, table {}.{}, token {}", ex, schema->ks_name(), schema->cf_name(), token); ++_stats.write_errors_timeout; @@ -468,10 +467,16 @@ auto coordinator::query(schema_ptr schema, // * seastar::abort_requested_exception: Can be thrown by create_operation_ctx. // * timed_out_error: Can be thrown by the abort_on_expiry. // * seastar::condition_variable_timed_out: Can be thrown by begin_read's wait_for_leader. + // * raft::stopped_error: The raft server was aborted (e.g. table being dropped). // // We handle them collectively here. if (try_catch(ex) || try_catch(ex) - || try_catch(ex) || try_catch(ex)) { + || try_catch(ex) || try_catch(ex) + || try_catch(ex)) { + if (!_db.column_family_exists(schema->id())) { + co_return coroutine::return_exception( + replica::no_such_column_family(schema->ks_name(), schema->cf_name())); + } logger.trace("query(): request timed out with error {}, table {}.{}, read cmd {}", ex, schema->ks_name(), schema->cf_name(), cmd); ++_stats.read_errors_timeout; diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index 0083ea0dec..a275f9ad60 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -184,12 +184,21 @@ void groups_manager::schedule_raft_group_deletion(raft::group_id id, raft_group_ co_await state.server_control_op.get_future(); logger.debug("schedule_raft_group_deletion(): group id {}: starting", id); - co_await g->close(); - logger.debug("schedule_raft_group_deletion(): group id {}: gate closed", id); + // Initiate gate close, then abort the raft server, then wait for the + // gate. Gate close alone would block until all holders are released, + // but in-flight writes may be stuck in add_entry waiting for quorum + // that will never come (other nodes already destroyed their servers). + // Aborting the server causes those stuck operations to throw + // raft::stopped_error, releasing their holders and unblocking the gate. + auto gate_fut = g->close(); + logger.debug("schedule_raft_group_deletion(): group id {}: gate close initiated", id); co_await _raft_gr.abort_server(id); logger.debug("schedule_raft_group_deletion(): group id {}: server aborted", id); + co_await std::move(gate_fut); + logger.debug("schedule_raft_group_deletion(): group id {}: gate closed", id); + co_await std::move(state.leader_info_updater); _raft_gr.destroy_server(id); @@ -429,21 +438,27 @@ future groups_manager::acquire_server(table_id table_id, raft::grou // lw_shared_ptr, so that the table is dropped but the table object // is still alive. // - // Check that the table still exists in the database to turn the - // fatal on_internal_error below into a clean no_such_column_family - // exception. - // - // When the table does exist, we proceed to acquire state.gate->hold(). - // This prevents schedule_raft_group_deletion (which co_awaits gate::close) - // from erasing the group until the DML operation completes. - _db.find_column_family(table_id); + // Check that the table still exists. The table is removed from the + // database (via schema_applier::commit_tables_and_views) BEFORE + // groups_manager::update() is called (which triggers gate closure via + // schedule_raft_group_deletion). Since there's no scheduling point + // between the column_family_exists check and try_hold below, the gate + // cannot be closed if the table exists. + if (!_db.column_family_exists(table_id)) { + return make_exception_future( + replica::no_such_column_family(table_id)); + } const auto it = _raft_groups.find(group_id); if (it == _raft_groups.end()) { on_internal_error(logger, format("raft group {} not found", group_id)); } auto& state = it->second; - return state.server_control_op.get_future(as).then([&state, h = state.gate->hold()] mutable { + auto h = state.gate->try_hold(); + if (!h) { + on_internal_error(logger, format("acquire_server: gate closed for group {} while table {} exists", group_id, table_id)); + } + return state.server_control_op.get_future(as).then([&state, h = std::move(*h)] mutable { return raft_server(state, std::move(h)); }); } diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 32bd8afcf9..c24c31f687 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -944,7 +944,6 @@ async def test_timed_out_queries(manager: ManagerClient): await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (11, 13) USING TIMEOUT 100ms") -@pytest.mark.xfail(reason="SCYLLADB-2080: write gets stuck waiting for quorum; fix not yet applied") @pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode") async def test_queries_while_dropping_table(manager: ManagerClient): """Verify that in-flight reads and writes are promptly aborted when