diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 76242488cc..fc3777bfa3 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -36,6 +36,7 @@ #include "streaming/stream_mutation_fragments_cmd.hh" #include "consumer.hh" #include "readers/generating_v2.hh" +#include "utils/error_injection.hh" namespace streaming { @@ -167,6 +168,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped. auto& table = _db.local().find_column_family(cf_id); + utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [this] () { + _db.local().find_column_family(table_id::create_null_id()); + }); auto erm = table.get_effective_replication_map(); auto op = table.stream_in_progress(); //FIXME: discarded future. diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index b69e0bfa02..d461653960 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -31,6 +31,7 @@ #include "replica/database.hh" #include "repair/table_check.hh" #include "gms/feature_service.hh" +#include "utils/error_injection.hh" namespace streaming { @@ -224,8 +225,11 @@ future<> stream_transfer_task::execute() { }).then([this, id, plan_id] { _mutation_done_sent = true; sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); - }).handle_exception([plan_id, id] (std::exception_ptr ep) { + }).handle_exception([plan_id, id, &sm] (std::exception_ptr ep) { sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); + utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [&sm] () { + sm.db().find_column_family(table_id::create_null_id()); + }); std::rethrow_exception(ep); }); }); diff --git a/test/topology_custom/test_table_drop.py b/test/topology_custom/test_table_drop.py new file mode 100644 index 0000000000..421a8817c7 --- /dev/null +++ b/test/topology_custom/test_table_drop.py @@ -0,0 +1,11 @@ +from test.pylib.manager_client import ManagerClient +import pytest + +@pytest.mark.asyncio +async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient): + servers = [await manager.server_add(config={ + 'error_injections_at_startup': ['stream_mutation_fragments_table_dropped'], + 'enable_repair_based_node_ops': False, + 'enable_user_defined_functions': False, + 'experimental_features': [] + }) for _ in range(2)]