From d25be9e3897e4981c70eccbb2257e63438cf57c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 11 Feb 2026 17:06:30 +0100 Subject: [PATCH 1/7] db/schema_tables: add `get_column_mapping_if_exists()` In scenarios where we want to firsty check if a column mapping exists and if we don't want do flow control with exception, it is very wasteful to do ``` if (column_mapping_exists()) { get_column_mapping(); } ``` especially in a hot path like `state_machine::apply()` becase this will execute 2 internal queries. This commit introduces `get_column_mapping_if_exists()` function, which simply wrapps result of `get_column_mapping()` in optional and doesn't throw an exception if the mapping doesn't exist. --- db/schema_tables.cc | 23 +++++++++++++++-------- db/schema_tables.hh | 2 ++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 29e288d369..694ea9e372 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -2840,20 +2840,15 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?", db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY); -future get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) { - shared_ptr results = co_await sys_ks._qp.execute_internal( +future> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) { + shared_ptr results = co_await sys_ks.query_processor().execute_internal( GET_COLUMN_MAPPING_QUERY, db::consistency_level::LOCAL_ONE, {table_id.uuid(), version.uuid()}, cql3::query_processor::cache_internal::no ); if (results->empty()) { - // If we don't have a stored column_mapping for an obsolete schema version - // then it means it's way too old and been cleaned up already. - // Fail the whole learn stage in this case. - co_await coroutine::return_exception(std::runtime_error( - format("Failed to look up column mapping for schema version {}", - version))); + co_return std::nullopt; } std::vector static_columns, regular_columns; for (const auto& row : *results) { @@ -2881,6 +2876,18 @@ future get_column_mapping(db::system_keyspace& sys_ks, ::table_i co_return std::move(cm); } +future get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) { + auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version); + if (!cm_opt) { + // If we don't have a stored column_mapping for an obsolete schema version + // then it means it's way too old and been cleaned up already. + co_await coroutine::return_exception(std::runtime_error( + format("Failed to look up column mapping for schema version {}", + version))); + } + co_return std::move(*cm_opt); +} + future column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) { shared_ptr results = co_await sys_ks._qp.execute_internal( GET_COLUMN_MAPPING_QUERY, diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 173648cdc1..7d711257c1 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -320,6 +320,8 @@ std::optional> get_map(const query::result_set_row& row, const ss future<> store_column_mapping(sharded& proxy, schema_ptr s, bool with_ttl); /// Query column mapping for a given version of the table locally. future get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); +/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist. +future> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); /// Check that column mapping exists for a given version of the table future column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); /// Delete matching column mapping entries from the `system.scylla_table_schema_history` table From 33a16940be5035f687804aff1f73c247b4c24946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 18 Feb 2026 09:28:06 +0100 Subject: [PATCH 2/7] strong_consistency/state_machine: pull necessary dependencies Both migration manager and system keyspace will be used in next commit. The first one is needed to execute group0 read barrier and we need system keyspace to get column mappings. --- main.cc | 2 +- service/strong_consistency/groups_manager.cc | 7 +++++-- service/strong_consistency/groups_manager.hh | 12 +++++++++++- service/strong_consistency/state_machine.cc | 16 +++++++++++++--- service/strong_consistency/state_machine.hh | 12 +++++++++++- test/lib/cql_test_env.cc | 2 +- 6 files changed, 42 insertions(+), 9 deletions(-) diff --git a/main.cc b/main.cc index 9a38216c59..78dda43f2e 100644 --- a/main.cc +++ b/main.cc @@ -1831,7 +1831,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl checkpoint(stop_signal, "initializing strongly consistent groups manager"); sharded groups_manager; groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp), - std::ref(db), std::ref(feature_service)).get(); + std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get(); auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] { groups_manager.stop().get(); }); diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index 833ed177c8..45501b7ec4 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -8,6 +8,7 @@ #include "groups_manager.hh" +#include "service/migration_manager.hh" #include "service/strong_consistency/state_machine.hh" #include "service/strong_consistency/raft_groups_storage.hh" #include "gms/feature_service.hh" @@ -94,11 +95,13 @@ auto raft_server::begin_mutate() -> begin_mutate_result { groups_manager::groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr, cql3::query_processor& qp, - replica::database& db, gms::feature_service& features) + replica::database& db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features) : _ms(ms) , _raft_gr(raft_gr) , _qp(qp) , _db(db) + , _mm(mm) + , _sys_ks(sys_ks) , _features(features) { } @@ -109,7 +112,7 @@ future<> groups_manager::start_raft_group(global_tablet_id tablet, { const auto my_id = to_server_id(tm->get_my_id()); - auto state_machine = make_state_machine(tablet, group_id, _db); + auto state_machine = make_state_machine(tablet, group_id, _db, _mm, _sys_ks); auto& state_machine_ref = *state_machine; auto rpc = std::make_unique(state_machine_ref, _ms, _raft_gr.failure_detector(), group_id, my_id); // Keep a reference to a specific RPC class. diff --git a/service/strong_consistency/groups_manager.hh b/service/strong_consistency/groups_manager.hh index 5c0a61d599..65737f011c 100644 --- a/service/strong_consistency/groups_manager.hh +++ b/service/strong_consistency/groups_manager.hh @@ -13,6 +13,14 @@ #include "service/raft/raft_group_registry.hh" #include "cql3/query_processor.hh" +namespace db { +class system_keyspace; +} + +namespace service { +class migration_manager; +} + namespace service::strong_consistency { class raft_server; @@ -67,6 +75,8 @@ class groups_manager : public peering_sharded_service { raft_group_registry& _raft_gr; cql3::query_processor& _qp; replica::database& _db; + service::migration_manager& _mm; + db::system_keyspace& _sys_ks; gms::feature_service& _features; std::unordered_map _raft_groups = {}; locator::token_metadata_ptr _pending_tm = nullptr; @@ -87,7 +97,7 @@ class groups_manager : public peering_sharded_service { public: groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr, - cql3::query_processor& qp, replica::database& _db, + cql3::query_processor& qp, replica::database& _db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features); // Called whenever a new token_metadata is published on this shard. diff --git a/service/strong_consistency/state_machine.cc b/service/strong_consistency/state_machine.cc index 2bc2b7fd0b..1e39c55654 100644 --- a/service/strong_consistency/state_machine.cc +++ b/service/strong_consistency/state_machine.cc @@ -11,6 +11,8 @@ #include "idl/strong_consistency/state_machine.dist.hh" #include "idl/strong_consistency/state_machine.dist.impl.hh" #include "replica/database.hh" +#include "service/migration_manager.hh" +#include "db/system_keyspace.hh" namespace service::strong_consistency { @@ -18,14 +20,20 @@ class state_machine : public raft_state_machine { locator::global_tablet_id _tablet; raft::group_id _group_id; replica::database& _db; + [[maybe_unused]] service::migration_manager& _mm; + [[maybe_unused]] db::system_keyspace& _sys_ks; public: state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db) + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks) : _tablet(tablet) , _group_id(gid) , _db(db) + , _mm(mm) + , _sys_ks(sys_ks) { } @@ -69,9 +77,11 @@ public: std::unique_ptr make_state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db) + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks) { - return std::make_unique(tablet, gid, db); + return std::make_unique(tablet, gid, db, mm, sys_ks); } }; \ No newline at end of file diff --git a/service/strong_consistency/state_machine.hh b/service/strong_consistency/state_machine.hh index 36c0467a02..0186e61f1e 100644 --- a/service/strong_consistency/state_machine.hh +++ b/service/strong_consistency/state_machine.hh @@ -12,6 +12,14 @@ #include "mutation/frozen_mutation.hh" #include "locator/tablets.hh" +namespace db { +class system_keyspace; +} + +namespace service { +class migration_manager; +} + namespace service::strong_consistency { struct raft_command { @@ -19,6 +27,8 @@ struct raft_command { }; std::unique_ptr make_state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db); + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks); } \ No newline at end of file diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 46895ca747..2b561361eb 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -960,7 +960,7 @@ private: auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); }); _groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp), - std::ref(_db), std::ref(_feature_service)).get(); + std::ref(_db), std::ref(_mm), std::ref(_sys_ks), std::ref(_feature_service)).get(); auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); }); _sc_coordinator.start(std::ref(_groups_manager), std::ref(_db)).get(); From b0cffb2e81eb907d221cf0f9dde74fba3ecd60a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 18 Feb 2026 09:36:14 +0100 Subject: [PATCH 3/7] strong_consistency/state_machine: find and hold schemas of applying mutations It might happen that a strong consistency command will arrive to a node: - before it knows about the schema - after the schema was changes and the old version was removed from the memory To fix the first case, it's enough to perform a read barrier on group0. In case of the second one, we can use column mapping the upgrade the mutation to newer schema. Also, we should hold pointers to schemas until we finish `_db.apply()`, so the schema is valid for the whole time. And potentially we should hold multiple pointers because commands passed to `state_machine::apply()` may contain mutations to different schema versions. This commit relies on a fact that the tablet raft group and its state machine is created only after the table is created locally on the node. Fixes SCYLLADB-428 --- service/strong_consistency/state_machine.cc | 104 +++++++++++++++++++- 1 file changed, 101 insertions(+), 3 deletions(-) diff --git a/service/strong_consistency/state_machine.cc b/service/strong_consistency/state_machine.cc index 1e39c55654..f1d94dfe85 100644 --- a/service/strong_consistency/state_machine.cc +++ b/service/strong_consistency/state_machine.cc @@ -6,22 +6,30 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include +#include #include "state_machine.hh" +#include "db/schema_tables.hh" +#include "mutation/frozen_mutation.hh" +#include "schema/schema_registry.hh" #include "serializer_impl.hh" #include "idl/strong_consistency/state_machine.dist.hh" #include "idl/strong_consistency/state_machine.dist.impl.hh" #include "replica/database.hh" #include "service/migration_manager.hh" #include "db/system_keyspace.hh" +#include "utils/loading_cache.hh" namespace service::strong_consistency { +static logging::logger logger("sc_state_machine"); + class state_machine : public raft_state_machine { locator::global_tablet_id _tablet; raft::group_id _group_id; replica::database& _db; - [[maybe_unused]] service::migration_manager& _mm; - [[maybe_unused]] db::system_keyspace& _sys_ks; + service::migration_manager& _mm; + db::system_keyspace& _sys_ks; public: state_machine(locator::global_tablet_id tablet, @@ -38,6 +46,8 @@ public: } future<> apply(std::vector command) override { + static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10)); + try { utils::chunked_vector muts; muts.reserve(command.size()); @@ -46,8 +56,18 @@ public: auto cmd = ser::deserialize(is, std::type_identity{}); muts.push_back(std::move(cmd.mutation)); } + // Hold pointers to schemas until `_db.apply()` is finished + auto schemas = co_await get_schema_and_upgrade_mutations(muts); co_await _db.apply(std::move(muts), db::no_timeout); - } catch (...) { + } catch (replica::no_such_column_family&) { + // If the table doesn't exist, it means it was already dropped. + // This cannot happen if the table wasn't created yet on the node + // because the state machine is created only after the table is created + // (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`). + // In this case, we should just ignore mutations without throwing an error. + logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table); + } + catch (...) { throw std::runtime_error(::format( "tablet {}, group id {}: error while applying mutations {}", _tablet, _group_id, std::current_exception())); @@ -73,6 +93,84 @@ public: future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override { throw std::runtime_error("transfer_snapshot() not implemented"); } + +private: + using column_mappings_cache = utils::loading_cache; + using schema_store = std::unordered_map>; + future get_schema_and_upgrade_mutations(utils::chunked_vector& muts) { + // Cache column mappings to avoid querying `system.scylla_table_schema_history` multiple times. + static thread_local column_mappings_cache column_mapping_cache(std::numeric_limits::max(), 1h, logger); + // Stores schema pointer and optional column mapping for each schema version present in the mutations + schema_store schema_mappings; + bool barrier_executed = false; + + auto get_schema = [&] (table_schema_version schema_version) -> future> { + auto schema = local_schema_registry().get_or_null(schema_version); + if (schema) { + co_return std::pair{std::move(schema), nullptr}; + } + + // `_db.find_schema()` may throw `replica::no_such_column_family` if the table was already dropped. + schema = _db.find_schema(_tablet.table); + // The column mapping may be already present in the cache from another `apply()` call + auto cm_ptr = column_mapping_cache.find(schema_version); + if (cm_ptr) { + co_return std::pair{std::move(schema), std::move(cm_ptr)}; + } + + // We may not find the column mapping if the mutation schema is newer than the present schema. + // In this case, we should trigger the barrier to wait for the schema to be updated and then try again. + auto cm_opt = co_await db::schema_tables::get_column_mapping_if_exists(_sys_ks, _tablet.table, schema_version); + if (!cm_opt) { + co_return std::pair{nullptr, nullptr}; + } + + cm_ptr = co_await column_mapping_cache.get_ptr(schema_version, [cm = std::move(*cm_opt)] (auto schema_version) -> future { + co_return std::move(cm); + }); + co_return std::pair{std::move(schema), std::move(cm_ptr)}; + }; + + auto resolve_schema = [&] (const frozen_mutation& mut) -> future { + auto schema_version = mut.schema_version(); + auto it = schema_mappings.find(schema_version); + if (it != schema_mappings.end()) { + co_return &it->second; + } + + auto schema_cm = co_await get_schema(schema_version); + if (!schema_cm.first && !barrier_executed) { + // TODO: pass valid abort source + co_await _mm.get_group0_barrier().trigger(); + barrier_executed = true; + schema_cm = co_await get_schema(schema_version); + } + + if (schema_cm.first) { + const auto [it, _] = schema_mappings.insert({schema_version, std::move(schema_cm)}); + co_return &it->second; + } + co_return nullptr; + }; + + for (auto& m: muts) { + auto schema_entry = co_await resolve_schema(m); + if (!schema_entry) { + // Old schema are TTLed after 10 days (see comment in `schema_applier::finalize_tables_and_views()`), + // so this error theoretically may be triggered if a node is stuck longer than this. + // But in practice we should do a snapshot much earlier, that's why `on_internal_error()` here. + // And if the table was already dropped, `no_such_column_family` will be dropped earlier. + on_internal_error(logger, fmt::format("couldn't find schema for table {} and mutation schema version {}", _tablet.table, m.schema_version())); + } + if (schema_entry->second) { + m = freeze(m.unfreeze_upgrading(schema_entry->first, *schema_entry->second)); + } + } + + // We only need vector of schema pointers but we're returning the whole map + // to avoid another allocation + co_return std::move(schema_mappings); + } }; std::unique_ptr make_state_machine(locator::global_tablet_id tablet, From 3548b7ad384dd3a97a0341222fb1c227b2c6d571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 4 Feb 2026 16:09:54 +0100 Subject: [PATCH 4/7] raft_group_registry: allow to drop append entries requests for specific raft group Similar to `raft_drop_incoming_append_entries`, the new error injection `raft_drop_incoming_append_entries_for_specified_group` skips handler for `raft_append_entries` RPC but it allows to specify id of raft group for which the requests should be dropped. The id of a raft group should be passed in error injection parameters under `value` key. --- service/raft/raft_group_registry.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 3cbf869539..2493daaeb9 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -124,7 +124,12 @@ void raft_group_registry::init_rpc_verbs() { ser::raft_rpc_verbs::register_raft_append_entries(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable { - return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id()] (raft_rpc& rpc) mutable { + return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id(), gid] (raft_rpc& rpc) mutable { + if (auto ignore_group_id = utils::get_local_injector().inject_parameter("raft_drop_incoming_append_entries_for_specified_group"); ignore_group_id) { + if (gid == raft::group_id{utils::UUID(*ignore_group_id)}) { + return; + } + } if (utils::get_local_injector().enter("raft_drop_incoming_append_entries")) { return; } From 4795f5840f6527714ed71a49ae82a22bfcd222e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 18 Feb 2026 08:42:46 +0100 Subject: [PATCH 5/7] test/cluster/test_strong_consistency: extract common function --- test/cluster/test_strong_consistency.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 15287c24d2..2a42dfb56b 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -95,6 +95,11 @@ async def assert_no_cross_shard_routing(manager: ManagerClient, server: ServerIn f"but partitioner computed shard {shard_from_partitioner}." ) +async def get_table_raft_group_id(manager: ManagerClient, ks: str, table: str): + table_id = await manager.get_table_id(ks, table) + rows = await manager.get_cql().run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}") + return str(rows[0].raft_group_id) + @pytest.mark.asyncio async def test_basic_write_read(manager: ManagerClient): @@ -124,9 +129,7 @@ async def test_basic_write_read(manager: ManagerClient): await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Select raft group id for the tablet") - table_id = await manager.get_table_id(ks, 'test') - rows = await cql.run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}") - group_id = str(rows[0].raft_group_id) + group_id = await get_table_raft_group_id(manager, ks, 'test') logger.info(f"Get current leader for the group {group_id}") leader_host_id = await wait_for_leader(manager, servers[0], group_id) From 6aef4d35412ad82866989793f4a79721d95aa546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Wed, 4 Feb 2026 16:20:49 +0100 Subject: [PATCH 6/7] test/cluster/test_strong_consistency: add reproducer for missing schema during apply --- raft/fsm.cc | 6 +++ service/strong_consistency/state_machine.cc | 4 ++ test/cluster/test_strong_consistency.py | 51 +++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/raft/fsm.cc b/raft/fsm.cc index 952334e5d8..54a1caeb5e 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -8,6 +8,7 @@ #include "fsm.hh" #include #include +#include "raft/raft.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" @@ -205,6 +206,11 @@ void fsm::become_follower(server_id leader) { } void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) { + if (utils::get_local_injector().enter("avoid_being_raft_leader")) { + become_follower(server_id{}); + return; + } + if (!std::holds_alternative(_state)) { _output.state_changed = true; } diff --git a/service/strong_consistency/state_machine.cc b/service/strong_consistency/state_machine.cc index f1d94dfe85..32d72c2a0f 100644 --- a/service/strong_consistency/state_machine.cc +++ b/service/strong_consistency/state_machine.cc @@ -19,6 +19,7 @@ #include "service/migration_manager.hh" #include "db/system_keyspace.hh" #include "utils/loading_cache.hh" +#include "utils/error_injection.hh" namespace service::strong_consistency { @@ -140,6 +141,9 @@ private: auto schema_cm = co_await get_schema(schema_version); if (!schema_cm.first && !barrier_executed) { + if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) { + utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group"); + } // TODO: pass valid abort source co_await _mm.get_group0_barrier().trigger(); barrier_executed = true; diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 2a42dfb56b..bff14ff1b6 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -460,3 +460,54 @@ async def test_sc_persistence_after_crash(manager: ManagerClient): await assert_no_cross_shard_routing(manager, server) await manager.server_stop_gracefully(server.server_id) + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_no_schema_when_apply_write(manager: ManagerClient): + config = { + 'experimental_features': ['strongly-consistent-tables'] + } + cmdline = [ + '--logger-log-level', 'sc_groups_manager=debug', + '--logger-log-level', 'sc_coordinator=debug' + ] + servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc') + # We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups), + # because we want `servers[2]` to receive Raft commands from others. + servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})] + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + # Drop incoming append entries from group0 (schema changes) on `servers[2]` after the table is created, + # so strong consistency raft groups are created on the node but it won't receive next alter table mutations. + group0_id = (await cql.run_async("SELECT value FROM system.scylla_local WHERE key = 'raft_group0_id'"))[0].value + await manager.api.enable_injection(servers[2].ip_addr, "raft_drop_incoming_append_entries_for_specified_group", one_shot=False, parameters={'value': group0_id}) + await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=hosts[0]) + + group_id = await get_table_raft_group_id(manager, ks, 'test') + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + assert leader_host_id != host_ids[2] + leader_host = host_by_host_id(leader_host_id) + + s2_log = await manager.server_open_log(servers[2].server_id) + s2_mark = await s2_log.mark() + + await manager.api.enable_injection(servers[2].ip_addr, "disable_raft_drop_append_entries_for_specified_group", one_shot=True) + await cql.run_async(f"INSERT INTO {ks}.test (pk, c, new_col) VALUES (10, 20, 30)", host=leader_host) + + await s2_log.wait_for(f"Column definitions for {ks}.test changed", timeout=60, from_mark=s2_mark) + rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2]) + assert len(rows) == 1 + row = rows[0] + assert row.pk == 10 + assert row.c == 20 + assert row.new_col == 30 From 37bbbd3a27e7cb335d17f7ec93b85f4da9db3ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Thu, 5 Feb 2026 06:55:31 +0100 Subject: [PATCH 7/7] test/cluster/test_strong_consistency: add reproducer for old schema during apply --- schema/schema_registry.cc | 13 ++++++ service/strong_consistency/state_machine.cc | 3 ++ test/cluster/test_strong_consistency.py | 49 +++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/schema/schema_registry.cc b/schema/schema_registry.cc index cbfab8397e..5ffab20bb6 100644 --- a/schema/schema_registry.cc +++ b/schema/schema_registry.cc @@ -11,6 +11,7 @@ #include #include "schema_registry.hh" +#include "utils/error_injection.hh" #include "utils/log.hh" #include "db/schema_tables.hh" #include "view_info.hh" @@ -103,6 +104,12 @@ schema_ptr schema_registry::learn(schema_ptr s) { } schema_registry_entry& schema_registry::get_entry(table_schema_version v) const { + if (auto ignore_version = utils::get_local_injector().inject_parameter("schema_registry_ignore_version"); ignore_version) { + if (v == table_schema_version{utils::UUID(*ignore_version)}) { + throw schema_version_not_found(v); + } + } + auto i = _entries.find(v); if (i == _entries.end()) { throw schema_version_not_found(v); @@ -142,6 +149,12 @@ future schema_registry::get_or_load(table_schema_version v, const as } schema_ptr schema_registry::get_or_null(table_schema_version v) const { + if (auto ignore_version = utils::get_local_injector().inject_parameter("schema_registry_ignore_version"); ignore_version) { + if (v == table_schema_version{utils::UUID(*ignore_version)}) { + return nullptr; + } + } + auto i = _entries.find(v); if (i == _entries.end()) { return nullptr; diff --git a/service/strong_consistency/state_machine.cc b/service/strong_consistency/state_machine.cc index 32d72c2a0f..e8e80d0214 100644 --- a/service/strong_consistency/state_machine.cc +++ b/service/strong_consistency/state_machine.cc @@ -21,6 +21,8 @@ #include "utils/loading_cache.hh" #include "utils/error_injection.hh" +using namespace std::chrono_literals; + namespace service::strong_consistency { static logging::logger logger("sc_state_machine"); @@ -50,6 +52,7 @@ public: static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10)); try { + co_await utils::get_local_injector().inject("strong_consistency_state_machine_wait_before_apply", utils::wait_for_message(20min)); utils::chunked_vector muts; muts.reserve(command.size()); for (const auto& c: command) { diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index bff14ff1b6..8e8b1b42d1 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -511,3 +511,52 @@ async def test_no_schema_when_apply_write(manager: ManagerClient): assert row.pk == 10 assert row.c == 20 assert row.new_col == 30 + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_old_schema_when_apply_write(manager: ManagerClient): + config = { + 'experimental_features': ['strongly-consistent-tables'] + } + cmdline = [ + '--logger-log-level', 'sc_groups_manager=debug', + '--logger-log-level', 'sc_coordinator=debug' + ] + servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc') + # We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups), + # because we want `servers[2]` to receive Raft commands from others. + servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})] + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + group_id = await get_table_raft_group_id(manager, ks, 'test') + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + assert leader_host_id != host_ids[2] + leader_host = host_by_host_id(leader_host_id) + + table_schema_version = (await cql.run_async(f"SELECT version FROM system_schema.scylla_tables WHERE keyspace_name = '{ks}' AND table_name = 'test'"))[0].version + + await manager.api.enable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply", one_shot=False) + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host) + + await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=leader_host) + # Following injection simulates that old schema version was already removed from the memory + await manager.api.enable_injection(servers[2].ip_addr, "schema_registry_ignore_version", one_shot=False, parameters={'value': table_schema_version}) + await manager.api.message_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply") + await manager.api.disable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply") + + rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2]) + assert len(rows) == 1 + row = rows[0] + assert row.pk == 10 + assert row.c == 20 + assert row.new_col is None