diff --git a/db/schema_tables.cc b/db/schema_tables.cc index eddfaa6fa2..a86c24b66e 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -2840,20 +2840,15 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?", db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY); -future get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) { - shared_ptr results = co_await sys_ks._qp.execute_internal( +future> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) { + shared_ptr results = co_await sys_ks.query_processor().execute_internal( GET_COLUMN_MAPPING_QUERY, db::consistency_level::LOCAL_ONE, {table_id.uuid(), version.uuid()}, cql3::query_processor::cache_internal::no ); if (results->empty()) { - // If we don't have a stored column_mapping for an obsolete schema version - // then it means it's way too old and been cleaned up already. - // Fail the whole learn stage in this case. - co_await coroutine::return_exception(std::runtime_error( - format("Failed to look up column mapping for schema version {}", - version))); + co_return std::nullopt; } std::vector static_columns, regular_columns; for (const auto& row : *results) { @@ -2881,6 +2876,18 @@ future get_column_mapping(db::system_keyspace& sys_ks, ::table_i co_return std::move(cm); } +future get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) { + auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version); + if (!cm_opt) { + // If we don't have a stored column_mapping for an obsolete schema version + // then it means it's way too old and been cleaned up already. + co_await coroutine::return_exception(std::runtime_error( + format("Failed to look up column mapping for schema version {}", + version))); + } + co_return std::move(*cm_opt); +} + future column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) { shared_ptr results = co_await sys_ks._qp.execute_internal( GET_COLUMN_MAPPING_QUERY, diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 173648cdc1..7d711257c1 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -320,6 +320,8 @@ std::optional> get_map(const query::result_set_row& row, const ss future<> store_column_mapping(sharded& proxy, schema_ptr s, bool with_ttl); /// Query column mapping for a given version of the table locally. future get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); +/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist. +future> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); /// Check that column mapping exists for a given version of the table future column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); /// Delete matching column mapping entries from the `system.scylla_table_schema_history` table diff --git a/main.cc b/main.cc index 9a38216c59..78dda43f2e 100644 --- a/main.cc +++ b/main.cc @@ -1831,7 +1831,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl checkpoint(stop_signal, "initializing strongly consistent groups manager"); sharded groups_manager; groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp), - std::ref(db), std::ref(feature_service)).get(); + std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get(); auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] { groups_manager.stop().get(); }); diff --git a/raft/fsm.cc b/raft/fsm.cc index 5f10187ffb..4e4025db64 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -8,6 +8,7 @@ #include "fsm.hh" #include #include +#include "raft/raft.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" @@ -205,6 +206,11 @@ void fsm::become_follower(server_id leader) { } void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) { + if (utils::get_local_injector().enter("avoid_being_raft_leader")) { + become_follower(server_id{}); + return; + } + if (!std::holds_alternative(_state)) { _output.state_changed = true; } diff --git a/schema/schema_registry.cc b/schema/schema_registry.cc index cbfab8397e..5ffab20bb6 100644 --- a/schema/schema_registry.cc +++ b/schema/schema_registry.cc @@ -11,6 +11,7 @@ #include #include "schema_registry.hh" +#include "utils/error_injection.hh" #include "utils/log.hh" #include "db/schema_tables.hh" #include "view_info.hh" @@ -103,6 +104,12 @@ schema_ptr schema_registry::learn(schema_ptr s) { } schema_registry_entry& schema_registry::get_entry(table_schema_version v) const { + if (auto ignore_version = utils::get_local_injector().inject_parameter("schema_registry_ignore_version"); ignore_version) { + if (v == table_schema_version{utils::UUID(*ignore_version)}) { + throw schema_version_not_found(v); + } + } + auto i = _entries.find(v); if (i == _entries.end()) { throw schema_version_not_found(v); @@ -142,6 +149,12 @@ future schema_registry::get_or_load(table_schema_version v, const as } schema_ptr schema_registry::get_or_null(table_schema_version v) const { + if (auto ignore_version = utils::get_local_injector().inject_parameter("schema_registry_ignore_version"); ignore_version) { + if (v == table_schema_version{utils::UUID(*ignore_version)}) { + return nullptr; + } + } + auto i = _entries.find(v); if (i == _entries.end()) { return nullptr; diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 3cbf869539..2493daaeb9 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -124,7 +124,12 @@ void raft_group_registry::init_rpc_verbs() { ser::raft_rpc_verbs::register_raft_append_entries(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable { - return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id()] (raft_rpc& rpc) mutable { + return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id(), gid] (raft_rpc& rpc) mutable { + if (auto ignore_group_id = utils::get_local_injector().inject_parameter("raft_drop_incoming_append_entries_for_specified_group"); ignore_group_id) { + if (gid == raft::group_id{utils::UUID(*ignore_group_id)}) { + return; + } + } if (utils::get_local_injector().enter("raft_drop_incoming_append_entries")) { return; } diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index 833ed177c8..45501b7ec4 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -8,6 +8,7 @@ #include "groups_manager.hh" +#include "service/migration_manager.hh" #include "service/strong_consistency/state_machine.hh" #include "service/strong_consistency/raft_groups_storage.hh" #include "gms/feature_service.hh" @@ -94,11 +95,13 @@ auto raft_server::begin_mutate() -> begin_mutate_result { groups_manager::groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr, cql3::query_processor& qp, - replica::database& db, gms::feature_service& features) + replica::database& db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features) : _ms(ms) , _raft_gr(raft_gr) , _qp(qp) , _db(db) + , _mm(mm) + , _sys_ks(sys_ks) , _features(features) { } @@ -109,7 +112,7 @@ future<> groups_manager::start_raft_group(global_tablet_id tablet, { const auto my_id = to_server_id(tm->get_my_id()); - auto state_machine = make_state_machine(tablet, group_id, _db); + auto state_machine = make_state_machine(tablet, group_id, _db, _mm, _sys_ks); auto& state_machine_ref = *state_machine; auto rpc = std::make_unique(state_machine_ref, _ms, _raft_gr.failure_detector(), group_id, my_id); // Keep a reference to a specific RPC class. diff --git a/service/strong_consistency/groups_manager.hh b/service/strong_consistency/groups_manager.hh index 5c0a61d599..65737f011c 100644 --- a/service/strong_consistency/groups_manager.hh +++ b/service/strong_consistency/groups_manager.hh @@ -13,6 +13,14 @@ #include "service/raft/raft_group_registry.hh" #include "cql3/query_processor.hh" +namespace db { +class system_keyspace; +} + +namespace service { +class migration_manager; +} + namespace service::strong_consistency { class raft_server; @@ -67,6 +75,8 @@ class groups_manager : public peering_sharded_service { raft_group_registry& _raft_gr; cql3::query_processor& _qp; replica::database& _db; + service::migration_manager& _mm; + db::system_keyspace& _sys_ks; gms::feature_service& _features; std::unordered_map _raft_groups = {}; locator::token_metadata_ptr _pending_tm = nullptr; @@ -87,7 +97,7 @@ class groups_manager : public peering_sharded_service { public: groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr, - cql3::query_processor& qp, replica::database& _db, + cql3::query_processor& qp, replica::database& _db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features); // Called whenever a new token_metadata is published on this shard. diff --git a/service/strong_consistency/state_machine.cc b/service/strong_consistency/state_machine.cc index 2bc2b7fd0b..e8e80d0214 100644 --- a/service/strong_consistency/state_machine.cc +++ b/service/strong_consistency/state_machine.cc @@ -6,31 +6,53 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include +#include #include "state_machine.hh" +#include "db/schema_tables.hh" +#include "mutation/frozen_mutation.hh" +#include "schema/schema_registry.hh" #include "serializer_impl.hh" #include "idl/strong_consistency/state_machine.dist.hh" #include "idl/strong_consistency/state_machine.dist.impl.hh" #include "replica/database.hh" +#include "service/migration_manager.hh" +#include "db/system_keyspace.hh" +#include "utils/loading_cache.hh" +#include "utils/error_injection.hh" + +using namespace std::chrono_literals; namespace service::strong_consistency { +static logging::logger logger("sc_state_machine"); + class state_machine : public raft_state_machine { locator::global_tablet_id _tablet; raft::group_id _group_id; replica::database& _db; + service::migration_manager& _mm; + db::system_keyspace& _sys_ks; public: state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db) + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks) : _tablet(tablet) , _group_id(gid) , _db(db) + , _mm(mm) + , _sys_ks(sys_ks) { } future<> apply(std::vector command) override { + static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10)); + try { + co_await utils::get_local_injector().inject("strong_consistency_state_machine_wait_before_apply", utils::wait_for_message(20min)); utils::chunked_vector muts; muts.reserve(command.size()); for (const auto& c: command) { @@ -38,8 +60,18 @@ public: auto cmd = ser::deserialize(is, std::type_identity{}); muts.push_back(std::move(cmd.mutation)); } + // Hold pointers to schemas until `_db.apply()` is finished + auto schemas = co_await get_schema_and_upgrade_mutations(muts); co_await _db.apply(std::move(muts), db::no_timeout); - } catch (...) { + } catch (replica::no_such_column_family&) { + // If the table doesn't exist, it means it was already dropped. + // This cannot happen if the table wasn't created yet on the node + // because the state machine is created only after the table is created + // (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`). + // In this case, we should just ignore mutations without throwing an error. + logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table); + } + catch (...) { throw std::runtime_error(::format( "tablet {}, group id {}: error while applying mutations {}", _tablet, _group_id, std::current_exception())); @@ -65,13 +97,96 @@ public: future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override { throw std::runtime_error("transfer_snapshot() not implemented"); } + +private: + using column_mappings_cache = utils::loading_cache; + using schema_store = std::unordered_map>; + future get_schema_and_upgrade_mutations(utils::chunked_vector& muts) { + // Cache column mappings to avoid querying `system.scylla_table_schema_history` multiple times. + static thread_local column_mappings_cache column_mapping_cache(std::numeric_limits::max(), 1h, logger); + // Stores schema pointer and optional column mapping for each schema version present in the mutations + schema_store schema_mappings; + bool barrier_executed = false; + + auto get_schema = [&] (table_schema_version schema_version) -> future> { + auto schema = local_schema_registry().get_or_null(schema_version); + if (schema) { + co_return std::pair{std::move(schema), nullptr}; + } + + // `_db.find_schema()` may throw `replica::no_such_column_family` if the table was already dropped. + schema = _db.find_schema(_tablet.table); + // The column mapping may be already present in the cache from another `apply()` call + auto cm_ptr = column_mapping_cache.find(schema_version); + if (cm_ptr) { + co_return std::pair{std::move(schema), std::move(cm_ptr)}; + } + + // We may not find the column mapping if the mutation schema is newer than the present schema. + // In this case, we should trigger the barrier to wait for the schema to be updated and then try again. + auto cm_opt = co_await db::schema_tables::get_column_mapping_if_exists(_sys_ks, _tablet.table, schema_version); + if (!cm_opt) { + co_return std::pair{nullptr, nullptr}; + } + + cm_ptr = co_await column_mapping_cache.get_ptr(schema_version, [cm = std::move(*cm_opt)] (auto schema_version) -> future { + co_return std::move(cm); + }); + co_return std::pair{std::move(schema), std::move(cm_ptr)}; + }; + + auto resolve_schema = [&] (const frozen_mutation& mut) -> future { + auto schema_version = mut.schema_version(); + auto it = schema_mappings.find(schema_version); + if (it != schema_mappings.end()) { + co_return &it->second; + } + + auto schema_cm = co_await get_schema(schema_version); + if (!schema_cm.first && !barrier_executed) { + if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) { + utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group"); + } + // TODO: pass valid abort source + co_await _mm.get_group0_barrier().trigger(); + barrier_executed = true; + schema_cm = co_await get_schema(schema_version); + } + + if (schema_cm.first) { + const auto [it, _] = schema_mappings.insert({schema_version, std::move(schema_cm)}); + co_return &it->second; + } + co_return nullptr; + }; + + for (auto& m: muts) { + auto schema_entry = co_await resolve_schema(m); + if (!schema_entry) { + // Old schema are TTLed after 10 days (see comment in `schema_applier::finalize_tables_and_views()`), + // so this error theoretically may be triggered if a node is stuck longer than this. + // But in practice we should do a snapshot much earlier, that's why `on_internal_error()` here. + // And if the table was already dropped, `no_such_column_family` will be dropped earlier. + on_internal_error(logger, fmt::format("couldn't find schema for table {} and mutation schema version {}", _tablet.table, m.schema_version())); + } + if (schema_entry->second) { + m = freeze(m.unfreeze_upgrading(schema_entry->first, *schema_entry->second)); + } + } + + // We only need vector of schema pointers but we're returning the whole map + // to avoid another allocation + co_return std::move(schema_mappings); + } }; std::unique_ptr make_state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db) + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks) { - return std::make_unique(tablet, gid, db); + return std::make_unique(tablet, gid, db, mm, sys_ks); } }; \ No newline at end of file diff --git a/service/strong_consistency/state_machine.hh b/service/strong_consistency/state_machine.hh index 36c0467a02..0186e61f1e 100644 --- a/service/strong_consistency/state_machine.hh +++ b/service/strong_consistency/state_machine.hh @@ -12,6 +12,14 @@ #include "mutation/frozen_mutation.hh" #include "locator/tablets.hh" +namespace db { +class system_keyspace; +} + +namespace service { +class migration_manager; +} + namespace service::strong_consistency { struct raft_command { @@ -19,6 +27,8 @@ struct raft_command { }; std::unique_ptr make_state_machine(locator::global_tablet_id tablet, raft::group_id gid, - replica::database& db); + replica::database& db, + service::migration_manager& mm, + db::system_keyspace& sys_ks); } \ No newline at end of file diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 15287c24d2..8e8b1b42d1 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -95,6 +95,11 @@ async def assert_no_cross_shard_routing(manager: ManagerClient, server: ServerIn f"but partitioner computed shard {shard_from_partitioner}." ) +async def get_table_raft_group_id(manager: ManagerClient, ks: str, table: str): + table_id = await manager.get_table_id(ks, table) + rows = await manager.get_cql().run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}") + return str(rows[0].raft_group_id) + @pytest.mark.asyncio async def test_basic_write_read(manager: ManagerClient): @@ -124,9 +129,7 @@ async def test_basic_write_read(manager: ManagerClient): await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Select raft group id for the tablet") - table_id = await manager.get_table_id(ks, 'test') - rows = await cql.run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}") - group_id = str(rows[0].raft_group_id) + group_id = await get_table_raft_group_id(manager, ks, 'test') logger.info(f"Get current leader for the group {group_id}") leader_host_id = await wait_for_leader(manager, servers[0], group_id) @@ -457,3 +460,103 @@ async def test_sc_persistence_after_crash(manager: ManagerClient): await assert_no_cross_shard_routing(manager, server) await manager.server_stop_gracefully(server.server_id) + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_no_schema_when_apply_write(manager: ManagerClient): + config = { + 'experimental_features': ['strongly-consistent-tables'] + } + cmdline = [ + '--logger-log-level', 'sc_groups_manager=debug', + '--logger-log-level', 'sc_coordinator=debug' + ] + servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc') + # We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups), + # because we want `servers[2]` to receive Raft commands from others. + servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})] + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + # Drop incoming append entries from group0 (schema changes) on `servers[2]` after the table is created, + # so strong consistency raft groups are created on the node but it won't receive next alter table mutations. + group0_id = (await cql.run_async("SELECT value FROM system.scylla_local WHERE key = 'raft_group0_id'"))[0].value + await manager.api.enable_injection(servers[2].ip_addr, "raft_drop_incoming_append_entries_for_specified_group", one_shot=False, parameters={'value': group0_id}) + await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=hosts[0]) + + group_id = await get_table_raft_group_id(manager, ks, 'test') + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + assert leader_host_id != host_ids[2] + leader_host = host_by_host_id(leader_host_id) + + s2_log = await manager.server_open_log(servers[2].server_id) + s2_mark = await s2_log.mark() + + await manager.api.enable_injection(servers[2].ip_addr, "disable_raft_drop_append_entries_for_specified_group", one_shot=True) + await cql.run_async(f"INSERT INTO {ks}.test (pk, c, new_col) VALUES (10, 20, 30)", host=leader_host) + + await s2_log.wait_for(f"Column definitions for {ks}.test changed", timeout=60, from_mark=s2_mark) + rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2]) + assert len(rows) == 1 + row = rows[0] + assert row.pk == 10 + assert row.c == 20 + assert row.new_col == 30 + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_old_schema_when_apply_write(manager: ManagerClient): + config = { + 'experimental_features': ['strongly-consistent-tables'] + } + cmdline = [ + '--logger-log-level', 'sc_groups_manager=debug', + '--logger-log-level', 'sc_coordinator=debug' + ] + servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc') + # We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups), + # because we want `servers[2]` to receive Raft commands from others. + servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})] + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + group_id = await get_table_raft_group_id(manager, ks, 'test') + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + assert leader_host_id != host_ids[2] + leader_host = host_by_host_id(leader_host_id) + + table_schema_version = (await cql.run_async(f"SELECT version FROM system_schema.scylla_tables WHERE keyspace_name = '{ks}' AND table_name = 'test'"))[0].version + + await manager.api.enable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply", one_shot=False) + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host) + + await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=leader_host) + # Following injection simulates that old schema version was already removed from the memory + await manager.api.enable_injection(servers[2].ip_addr, "schema_registry_ignore_version", one_shot=False, parameters={'value': table_schema_version}) + await manager.api.message_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply") + await manager.api.disable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply") + + rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2]) + assert len(rows) == 1 + row = rows[0] + assert row.pk == 10 + assert row.c == 20 + assert row.new_col is None diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 46895ca747..2b561361eb 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -960,7 +960,7 @@ private: auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); }); _groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp), - std::ref(_db), std::ref(_feature_service)).get(); + std::ref(_db), std::ref(_mm), std::ref(_sys_ks), std::ref(_feature_service)).get(); auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); }); _sc_coordinator.start(std::ref(_groups_manager), std::ref(_db)).get();