diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 185f08fba0..6c6a27fa45 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6408,8 +6408,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t std::vector>> results; schema_ptr schema = local_schema_registry().get(cmd->schema_version); auto p = shared_from_this(); - auto& cf= _db.local().find_column_family(schema); - auto pcf = _db.local().get_config().cache_hit_rate_read_balancing() ? &cf : nullptr; + auto cf = _db.local().find_column_family(schema).shared_from_this(); + auto pcf = _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr; if (_features.range_scan_data_variant) { cmd->slice.options.set(); @@ -6434,6 +6434,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t concurrency_factor = std::max(size_t(1), ranges.size()); while (i != ranges.end()) { + co_await utils::get_local_injector().inject( + "query_partition_key_range_concurrent_scan_pause", + utils::wait_for_message(std::chrono::seconds(30))); dht::partition_range& range = *i; host_id_vector_replica_set live_endpoints = get_endpoints_for_reading(*schema, *erm, end_token(range), node_local_only); host_id_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i); @@ -6558,7 +6561,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t throw; } - exec.push_back(::make_shared(schema, cf.shared_from_this(), p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate())); + exec.push_back(::make_shared(schema, cf, p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate())); ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges)); } diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index 07e385aaa7..e36a2acade 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -14,9 +14,14 @@ #include #include "test/lib/cql_test_env.hh" +#include "test/lib/cql_assertions.hh" +#include "test/lib/eventually.hh" +#include "transport/messages/result_message.hh" +#include "types/types.hh" #include "service/storage_proxy.hh" #include "query_ranges_to_vnodes.hh" #include "schema/schema_builder.hh" +#include "utils/error_injection.hh" BOOST_AUTO_TEST_SUITE(storage_proxy_test) @@ -138,4 +143,60 @@ SEASTAR_THREAD_TEST_CASE(test_split_stats) { stats2->register_metrics_for("DC1", ep1); } +SEASTAR_TEST_CASE(test_drop_table_during_range_scan) { +#ifdef SCYLLA_ENABLE_ERROR_INJECTION + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE tbl (a int PRIMARY KEY, b int)").get(); + for (int i = 0; i < 100; i++) { + e.execute_cql(format("INSERT INTO tbl (a, b) VALUES ({}, {})", i, i * 10)).get(); + } + + utils::get_local_injector().enable("query_partition_key_range_concurrent_scan_pause"); + + // Start scan in background + auto scan_fut = e.execute_cql("SELECT * FROM tbl"); + + // Wait until the scan hits the pause point + REQUIRE_EVENTUALLY_EQUAL(std::function([] { + return utils::get_local_injector().waiters("query_partition_key_range_concurrent_scan_pause"); + }), size_t(1)); + + // Get current schema version before drop + auto get_schema_version = [&e] { + auto msg = e.execute_cql("SELECT schema_version FROM system.local WHERE key='local'").get(); + auto rows = dynamic_pointer_cast(msg); + auto& rs = rows->rs().result_set(); + return value_cast(uuid_type->deserialize(*rs.rows()[0][0])); + }; + auto schema_version_before_drop = get_schema_version(); + + // Drop the table while scan is paused (run in background since drop + // may wait for in-flight reads to complete) + auto drop_fut = e.execute_cql("DROP TABLE tbl"); + + // Wait for schema version to change in system.local (indicates drop is fully applied) + REQUIRE_EVENTUALLY_EQUAL(std::function([&] { + return get_schema_version() != schema_version_before_drop; + }), true); + + // Resume the scan after table is gone + utils::get_local_injector().receive_message("query_partition_key_range_concurrent_scan_pause"); + utils::get_local_injector().disable("query_partition_key_range_concurrent_scan_pause"); + + // Wait for drop future + drop_fut.get(); + + // The scan should either complete or throw, but not crash + try { + scan_fut.get(); + } catch (...) { + // Expected - table was dropped mid-scan + } + }); +#else + std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n"; + return make_ready_future<>(); +#endif +} + BOOST_AUTO_TEST_SUITE_END()