mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-08 16:03:20 +00:00
storage_proxy: hold shared pointer to a table object during entire query_partition_key_range_concurrent execution
Otherwise if a table is dropped in the middle of a scan the object may disappear. Fixes https://scylladb.atlassian.net/browse/SCYLLADB-2137 Closes scylladb/scylladb#29988
This commit is contained in:
@@ -6408,8 +6408,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
auto p = shared_from_this();
|
||||
auto& cf= _db.local().find_column_family(schema);
|
||||
auto pcf = _db.local().get_config().cache_hit_rate_read_balancing() ? &cf : nullptr;
|
||||
auto cf = _db.local().find_column_family(schema).shared_from_this();
|
||||
auto pcf = _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr;
|
||||
|
||||
if (_features.range_scan_data_variant) {
|
||||
cmd->slice.options.set<query::partition_slice::option::range_scan_data_variant>();
|
||||
@@ -6434,6 +6434,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
concurrency_factor = std::max(size_t(1), ranges.size());
|
||||
|
||||
while (i != ranges.end()) {
|
||||
co_await utils::get_local_injector().inject(
|
||||
"query_partition_key_range_concurrent_scan_pause",
|
||||
utils::wait_for_message(std::chrono::seconds(30)));
|
||||
dht::partition_range& range = *i;
|
||||
host_id_vector_replica_set live_endpoints = get_endpoints_for_reading(*schema, *erm, end_token(range), node_local_only);
|
||||
host_id_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
|
||||
@@ -6558,7 +6561,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
throw;
|
||||
}
|
||||
|
||||
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf.shared_from_this(), p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
|
||||
exec.push_back(::make_shared<never_speculating_read_executor>(schema, cf, p, erm, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state, permit, std::monostate()));
|
||||
ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges));
|
||||
}
|
||||
|
||||
|
||||
@@ -14,9 +14,14 @@
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "types/types.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "query_ranges_to_vnodes.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(storage_proxy_test)
|
||||
|
||||
@@ -138,4 +143,60 @@ SEASTAR_THREAD_TEST_CASE(test_split_stats) {
|
||||
stats2->register_metrics_for("DC1", ep1);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_drop_table_during_range_scan) {
|
||||
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE tbl (a int PRIMARY KEY, b int)").get();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
e.execute_cql(format("INSERT INTO tbl (a, b) VALUES ({}, {})", i, i * 10)).get();
|
||||
}
|
||||
|
||||
utils::get_local_injector().enable("query_partition_key_range_concurrent_scan_pause");
|
||||
|
||||
// Start scan in background
|
||||
auto scan_fut = e.execute_cql("SELECT * FROM tbl");
|
||||
|
||||
// Wait until the scan hits the pause point
|
||||
REQUIRE_EVENTUALLY_EQUAL(std::function<size_t()>([] {
|
||||
return utils::get_local_injector().waiters("query_partition_key_range_concurrent_scan_pause");
|
||||
}), size_t(1));
|
||||
|
||||
// Get current schema version before drop
|
||||
auto get_schema_version = [&e] {
|
||||
auto msg = e.execute_cql("SELECT schema_version FROM system.local WHERE key='local'").get();
|
||||
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
|
||||
auto& rs = rows->rs().result_set();
|
||||
return value_cast<utils::UUID>(uuid_type->deserialize(*rs.rows()[0][0]));
|
||||
};
|
||||
auto schema_version_before_drop = get_schema_version();
|
||||
|
||||
// Drop the table while scan is paused (run in background since drop
|
||||
// may wait for in-flight reads to complete)
|
||||
auto drop_fut = e.execute_cql("DROP TABLE tbl");
|
||||
|
||||
// Wait for schema version to change in system.local (indicates drop is fully applied)
|
||||
REQUIRE_EVENTUALLY_EQUAL(std::function<bool()>([&] {
|
||||
return get_schema_version() != schema_version_before_drop;
|
||||
}), true);
|
||||
|
||||
// Resume the scan after table is gone
|
||||
utils::get_local_injector().receive_message("query_partition_key_range_concurrent_scan_pause");
|
||||
utils::get_local_injector().disable("query_partition_key_range_concurrent_scan_pause");
|
||||
|
||||
// Wait for drop future
|
||||
drop_fut.get();
|
||||
|
||||
// The scan should either complete or throw, but not crash
|
||||
try {
|
||||
scan_fut.get();
|
||||
} catch (...) {
|
||||
// Expected - table was dropped mid-scan
|
||||
}
|
||||
});
|
||||
#else
|
||||
std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n";
|
||||
return make_ready_future<>();
|
||||
#endif
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
Reference in New Issue
Block a user