mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
Merge 'strong_consistency/state_machine: ensure and upgrade mutations schema' from Michał Jadwiszczak
This patch fixes 2 issues within strong consistency state machine: - it might happen that apply is called before the schema is delivered to the node - on the other hand, the apply may be called after the schema was changed and purged from the schema registry The first problem is fixed by doing `group0.read_barrier()` before applying the mutations. The second one is solved by upgrading the mutations using column mappings in case the version of the mutations' schema is older. Fixes SCYLLADB-428 Strong consistency is in experimental phase, no need to backport. Closes scylladb/scylladb#28546 * https://github.com/scylladb/scylladb: test/cluster/test_strong_consistency: add reproducer for old schema during apply test/cluster/test_strong_consistency: add reproducer for missing schema during apply test/cluster/test_strong_consistency: extract common function raft_group_registry: allow to drop append entries requests for specific raft group strong_consistency/state_machine: find and hold schemas of applying mutations strong_consistency/state_machine: pull necessary dependencies db/schema_tables: add `get_column_mapping_if_exists()`
This commit is contained in:
@@ -2840,20 +2840,15 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
|
||||
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks.query_processor().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id.uuid(), version.uuid()},
|
||||
cql3::query_processor::cache_internal::no
|
||||
);
|
||||
if (results->empty()) {
|
||||
// If we don't have a stored column_mapping for an obsolete schema version
|
||||
// then it means it's way too old and been cleaned up already.
|
||||
// Fail the whole learn stage in this case.
|
||||
co_await coroutine::return_exception(std::runtime_error(
|
||||
format("Failed to look up column mapping for schema version {}",
|
||||
version)));
|
||||
co_return std::nullopt;
|
||||
}
|
||||
std::vector<column_definition> static_columns, regular_columns;
|
||||
for (const auto& row : *results) {
|
||||
@@ -2881,6 +2876,18 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_i
|
||||
co_return std::move(cm);
|
||||
}
|
||||
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||
auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version);
|
||||
if (!cm_opt) {
|
||||
// If we don't have a stored column_mapping for an obsolete schema version
|
||||
// then it means it's way too old and been cleaned up already.
|
||||
co_await coroutine::return_exception(std::runtime_error(
|
||||
format("Failed to look up column mapping for schema version {}",
|
||||
version)));
|
||||
}
|
||||
co_return std::move(*cm_opt);
|
||||
}
|
||||
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
|
||||
@@ -320,6 +320,8 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
|
||||
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
|
||||
/// Query column mapping for a given version of the table locally.
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist.
|
||||
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Check that column mapping exists for a given version of the table
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
|
||||
|
||||
2
main.cc
2
main.cc
@@ -1831,7 +1831,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
checkpoint(stop_signal, "initializing strongly consistent groups manager");
|
||||
sharded<service::strong_consistency::groups_manager> groups_manager;
|
||||
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
|
||||
std::ref(db), std::ref(feature_service)).get();
|
||||
std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get();
|
||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
|
||||
groups_manager.stop().get();
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include "fsm.hh"
|
||||
#include <random>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "raft/raft.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
@@ -205,6 +206,11 @@ void fsm::become_follower(server_id leader) {
|
||||
}
|
||||
|
||||
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
|
||||
if (utils::get_local_injector().enter("avoid_being_raft_leader")) {
|
||||
become_follower(server_id{});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!std::holds_alternative<candidate>(_state)) {
|
||||
_output.state_changed = true;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
#include "schema_registry.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "view_info.hh"
|
||||
@@ -103,6 +104,12 @@ schema_ptr schema_registry::learn(schema_ptr s) {
|
||||
}
|
||||
|
||||
schema_registry_entry& schema_registry::get_entry(table_schema_version v) const {
|
||||
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
||||
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
||||
throw schema_version_not_found(v);
|
||||
}
|
||||
}
|
||||
|
||||
auto i = _entries.find(v);
|
||||
if (i == _entries.end()) {
|
||||
throw schema_version_not_found(v);
|
||||
@@ -142,6 +149,12 @@ future<schema_ptr> schema_registry::get_or_load(table_schema_version v, const as
|
||||
}
|
||||
|
||||
schema_ptr schema_registry::get_or_null(table_schema_version v) const {
|
||||
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
||||
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
auto i = _entries.find(v);
|
||||
if (i == _entries.end()) {
|
||||
return nullptr;
|
||||
|
||||
@@ -124,7 +124,12 @@ void raft_group_registry::init_rpc_verbs() {
|
||||
|
||||
ser::raft_rpc_verbs::register_raft_append_entries(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable {
|
||||
return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id()] (raft_rpc& rpc) mutable {
|
||||
return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id(), gid] (raft_rpc& rpc) mutable {
|
||||
if (auto ignore_group_id = utils::get_local_injector().inject_parameter<std::string_view>("raft_drop_incoming_append_entries_for_specified_group"); ignore_group_id) {
|
||||
if (gid == raft::group_id{utils::UUID(*ignore_group_id)}) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (utils::get_local_injector().enter("raft_drop_incoming_append_entries")) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "groups_manager.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/strong_consistency/state_machine.hh"
|
||||
#include "service/strong_consistency/raft_groups_storage.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
@@ -94,11 +95,13 @@ auto raft_server::begin_mutate() -> begin_mutate_result {
|
||||
|
||||
groups_manager::groups_manager(netw::messaging_service& ms,
|
||||
raft_group_registry& raft_gr, cql3::query_processor& qp,
|
||||
replica::database& db, gms::feature_service& features)
|
||||
replica::database& db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features)
|
||||
: _ms(ms)
|
||||
, _raft_gr(raft_gr)
|
||||
, _qp(qp)
|
||||
, _db(db)
|
||||
, _mm(mm)
|
||||
, _sys_ks(sys_ks)
|
||||
, _features(features)
|
||||
{
|
||||
}
|
||||
@@ -109,7 +112,7 @@ future<> groups_manager::start_raft_group(global_tablet_id tablet,
|
||||
{
|
||||
const auto my_id = to_server_id(tm->get_my_id());
|
||||
|
||||
auto state_machine = make_state_machine(tablet, group_id, _db);
|
||||
auto state_machine = make_state_machine(tablet, group_id, _db, _mm, _sys_ks);
|
||||
auto& state_machine_ref = *state_machine;
|
||||
auto rpc = std::make_unique<rpc_impl>(state_machine_ref, _ms, _raft_gr.failure_detector(), group_id, my_id);
|
||||
// Keep a reference to a specific RPC class.
|
||||
|
||||
@@ -13,6 +13,14 @@
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
|
||||
namespace db {
|
||||
class system_keyspace;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace service::strong_consistency {
|
||||
|
||||
class raft_server;
|
||||
@@ -67,6 +75,8 @@ class groups_manager : public peering_sharded_service<groups_manager> {
|
||||
raft_group_registry& _raft_gr;
|
||||
cql3::query_processor& _qp;
|
||||
replica::database& _db;
|
||||
service::migration_manager& _mm;
|
||||
db::system_keyspace& _sys_ks;
|
||||
gms::feature_service& _features;
|
||||
std::unordered_map<raft::group_id, raft_group_state> _raft_groups = {};
|
||||
locator::token_metadata_ptr _pending_tm = nullptr;
|
||||
@@ -87,7 +97,7 @@ class groups_manager : public peering_sharded_service<groups_manager> {
|
||||
|
||||
public:
|
||||
groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr,
|
||||
cql3::query_processor& qp, replica::database& _db,
|
||||
cql3::query_processor& qp, replica::database& _db, service::migration_manager& mm, db::system_keyspace& sys_ks,
|
||||
gms::feature_service& features);
|
||||
|
||||
// Called whenever a new token_metadata is published on this shard.
|
||||
|
||||
@@ -6,31 +6,53 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "state_machine.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "schema/schema_registry.hh"
|
||||
#include "serializer_impl.hh"
|
||||
#include "idl/strong_consistency/state_machine.dist.hh"
|
||||
#include "idl/strong_consistency/state_machine.dist.impl.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace service::strong_consistency {
|
||||
|
||||
static logging::logger logger("sc_state_machine");
|
||||
|
||||
class state_machine : public raft_state_machine {
|
||||
locator::global_tablet_id _tablet;
|
||||
raft::group_id _group_id;
|
||||
replica::database& _db;
|
||||
service::migration_manager& _mm;
|
||||
db::system_keyspace& _sys_ks;
|
||||
|
||||
public:
|
||||
state_machine(locator::global_tablet_id tablet,
|
||||
raft::group_id gid,
|
||||
replica::database& db)
|
||||
replica::database& db,
|
||||
service::migration_manager& mm,
|
||||
db::system_keyspace& sys_ks)
|
||||
: _tablet(tablet)
|
||||
, _group_id(gid)
|
||||
, _db(db)
|
||||
, _mm(mm)
|
||||
, _sys_ks(sys_ks)
|
||||
{
|
||||
}
|
||||
|
||||
future<> apply(std::vector<raft::command_cref> command) override {
|
||||
static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10));
|
||||
|
||||
try {
|
||||
co_await utils::get_local_injector().inject("strong_consistency_state_machine_wait_before_apply", utils::wait_for_message(20min));
|
||||
utils::chunked_vector<frozen_mutation> muts;
|
||||
muts.reserve(command.size());
|
||||
for (const auto& c: command) {
|
||||
@@ -38,8 +60,18 @@ public:
|
||||
auto cmd = ser::deserialize(is, std::type_identity<raft_command>{});
|
||||
muts.push_back(std::move(cmd.mutation));
|
||||
}
|
||||
// Hold pointers to schemas until `_db.apply()` is finished
|
||||
auto schemas = co_await get_schema_and_upgrade_mutations(muts);
|
||||
co_await _db.apply(std::move(muts), db::no_timeout);
|
||||
} catch (...) {
|
||||
} catch (replica::no_such_column_family&) {
|
||||
// If the table doesn't exist, it means it was already dropped.
|
||||
// This cannot happen if the table wasn't created yet on the node
|
||||
// because the state machine is created only after the table is created
|
||||
// (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`).
|
||||
// In this case, we should just ignore mutations without throwing an error.
|
||||
logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table);
|
||||
}
|
||||
catch (...) {
|
||||
throw std::runtime_error(::format(
|
||||
"tablet {}, group id {}: error while applying mutations {}",
|
||||
_tablet, _group_id, std::current_exception()));
|
||||
@@ -65,13 +97,96 @@ public:
|
||||
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override {
|
||||
throw std::runtime_error("transfer_snapshot() not implemented");
|
||||
}
|
||||
|
||||
private:
|
||||
using column_mappings_cache = utils::loading_cache<table_schema_version, column_mapping>;
|
||||
using schema_store = std::unordered_map<table_schema_version, std::pair<schema_ptr, column_mappings_cache::value_ptr>>;
|
||||
future<schema_store> get_schema_and_upgrade_mutations(utils::chunked_vector<frozen_mutation>& muts) {
|
||||
// Cache column mappings to avoid querying `system.scylla_table_schema_history` multiple times.
|
||||
static thread_local column_mappings_cache column_mapping_cache(std::numeric_limits<size_t>::max(), 1h, logger);
|
||||
// Stores schema pointer and optional column mapping for each schema version present in the mutations
|
||||
schema_store schema_mappings;
|
||||
bool barrier_executed = false;
|
||||
|
||||
auto get_schema = [&] (table_schema_version schema_version) -> future<std::pair<schema_ptr, column_mappings_cache::value_ptr>> {
|
||||
auto schema = local_schema_registry().get_or_null(schema_version);
|
||||
if (schema) {
|
||||
co_return std::pair{std::move(schema), nullptr};
|
||||
}
|
||||
|
||||
// `_db.find_schema()` may throw `replica::no_such_column_family` if the table was already dropped.
|
||||
schema = _db.find_schema(_tablet.table);
|
||||
// The column mapping may be already present in the cache from another `apply()` call
|
||||
auto cm_ptr = column_mapping_cache.find(schema_version);
|
||||
if (cm_ptr) {
|
||||
co_return std::pair{std::move(schema), std::move(cm_ptr)};
|
||||
}
|
||||
|
||||
// We may not find the column mapping if the mutation schema is newer than the present schema.
|
||||
// In this case, we should trigger the barrier to wait for the schema to be updated and then try again.
|
||||
auto cm_opt = co_await db::schema_tables::get_column_mapping_if_exists(_sys_ks, _tablet.table, schema_version);
|
||||
if (!cm_opt) {
|
||||
co_return std::pair{nullptr, nullptr};
|
||||
}
|
||||
|
||||
cm_ptr = co_await column_mapping_cache.get_ptr(schema_version, [cm = std::move(*cm_opt)] (auto schema_version) -> future<column_mapping> {
|
||||
co_return std::move(cm);
|
||||
});
|
||||
co_return std::pair{std::move(schema), std::move(cm_ptr)};
|
||||
};
|
||||
|
||||
auto resolve_schema = [&] (const frozen_mutation& mut) -> future<const schema_store::mapped_type*> {
|
||||
auto schema_version = mut.schema_version();
|
||||
auto it = schema_mappings.find(schema_version);
|
||||
if (it != schema_mappings.end()) {
|
||||
co_return &it->second;
|
||||
}
|
||||
|
||||
auto schema_cm = co_await get_schema(schema_version);
|
||||
if (!schema_cm.first && !barrier_executed) {
|
||||
if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) {
|
||||
utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group");
|
||||
}
|
||||
// TODO: pass valid abort source
|
||||
co_await _mm.get_group0_barrier().trigger();
|
||||
barrier_executed = true;
|
||||
schema_cm = co_await get_schema(schema_version);
|
||||
}
|
||||
|
||||
if (schema_cm.first) {
|
||||
const auto [it, _] = schema_mappings.insert({schema_version, std::move(schema_cm)});
|
||||
co_return &it->second;
|
||||
}
|
||||
co_return nullptr;
|
||||
};
|
||||
|
||||
for (auto& m: muts) {
|
||||
auto schema_entry = co_await resolve_schema(m);
|
||||
if (!schema_entry) {
|
||||
// Old schema are TTLed after 10 days (see comment in `schema_applier::finalize_tables_and_views()`),
|
||||
// so this error theoretically may be triggered if a node is stuck longer than this.
|
||||
// But in practice we should do a snapshot much earlier, that's why `on_internal_error()` here.
|
||||
// And if the table was already dropped, `no_such_column_family` will be dropped earlier.
|
||||
on_internal_error(logger, fmt::format("couldn't find schema for table {} and mutation schema version {}", _tablet.table, m.schema_version()));
|
||||
}
|
||||
if (schema_entry->second) {
|
||||
m = freeze(m.unfreeze_upgrading(schema_entry->first, *schema_entry->second));
|
||||
}
|
||||
}
|
||||
|
||||
// We only need vector of schema pointers but we're returning the whole map
|
||||
// to avoid another allocation
|
||||
co_return std::move(schema_mappings);
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
||||
raft::group_id gid,
|
||||
replica::database& db)
|
||||
replica::database& db,
|
||||
service::migration_manager& mm,
|
||||
db::system_keyspace& sys_ks)
|
||||
{
|
||||
return std::make_unique<state_machine>(tablet, gid, db);
|
||||
return std::make_unique<state_machine>(tablet, gid, db, mm, sys_ks);
|
||||
}
|
||||
|
||||
};
|
||||
@@ -12,6 +12,14 @@
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "locator/tablets.hh"
|
||||
|
||||
namespace db {
|
||||
class system_keyspace;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace service::strong_consistency {
|
||||
|
||||
struct raft_command {
|
||||
@@ -19,6 +27,8 @@ struct raft_command {
|
||||
};
|
||||
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
||||
raft::group_id gid,
|
||||
replica::database& db);
|
||||
replica::database& db,
|
||||
service::migration_manager& mm,
|
||||
db::system_keyspace& sys_ks);
|
||||
|
||||
}
|
||||
@@ -95,6 +95,11 @@ async def assert_no_cross_shard_routing(manager: ManagerClient, server: ServerIn
|
||||
f"but partitioner computed shard {shard_from_partitioner}."
|
||||
)
|
||||
|
||||
async def get_table_raft_group_id(manager: ManagerClient, ks: str, table: str):
|
||||
table_id = await manager.get_table_id(ks, table)
|
||||
rows = await manager.get_cql().run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}")
|
||||
return str(rows[0].raft_group_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_basic_write_read(manager: ManagerClient):
|
||||
|
||||
@@ -124,9 +129,7 @@ async def test_basic_write_read(manager: ManagerClient):
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Select raft group id for the tablet")
|
||||
table_id = await manager.get_table_id(ks, 'test')
|
||||
rows = await cql.run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}")
|
||||
group_id = str(rows[0].raft_group_id)
|
||||
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||
|
||||
logger.info(f"Get current leader for the group {group_id}")
|
||||
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||
@@ -457,3 +460,103 @@ async def test_sc_persistence_after_crash(manager: ManagerClient):
|
||||
await assert_no_cross_shard_routing(manager, server)
|
||||
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_no_schema_when_apply_write(manager: ManagerClient):
|
||||
config = {
|
||||
'experimental_features': ['strongly-consistent-tables']
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'sc_groups_manager=debug',
|
||||
'--logger-log-level', 'sc_coordinator=debug'
|
||||
]
|
||||
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
||||
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
||||
# because we want `servers[2]` to receive Raft commands from others.
|
||||
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
||||
(cql, hosts) = await manager.get_ready_cql(servers)
|
||||
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
||||
|
||||
def host_by_host_id(host_id):
|
||||
for hid, host in zip(host_ids, hosts):
|
||||
if hid == host_id:
|
||||
return host
|
||||
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# Drop incoming append entries from group0 (schema changes) on `servers[2]` after the table is created,
|
||||
# so strong consistency raft groups are created on the node but it won't receive next alter table mutations.
|
||||
group0_id = (await cql.run_async("SELECT value FROM system.scylla_local WHERE key = 'raft_group0_id'"))[0].value
|
||||
await manager.api.enable_injection(servers[2].ip_addr, "raft_drop_incoming_append_entries_for_specified_group", one_shot=False, parameters={'value': group0_id})
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=hosts[0])
|
||||
|
||||
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||
assert leader_host_id != host_ids[2]
|
||||
leader_host = host_by_host_id(leader_host_id)
|
||||
|
||||
s2_log = await manager.server_open_log(servers[2].server_id)
|
||||
s2_mark = await s2_log.mark()
|
||||
|
||||
await manager.api.enable_injection(servers[2].ip_addr, "disable_raft_drop_append_entries_for_specified_group", one_shot=True)
|
||||
await cql.run_async(f"INSERT INTO {ks}.test (pk, c, new_col) VALUES (10, 20, 30)", host=leader_host)
|
||||
|
||||
await s2_log.wait_for(f"Column definitions for {ks}.test changed", timeout=60, from_mark=s2_mark)
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
||||
assert len(rows) == 1
|
||||
row = rows[0]
|
||||
assert row.pk == 10
|
||||
assert row.c == 20
|
||||
assert row.new_col == 30
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_old_schema_when_apply_write(manager: ManagerClient):
|
||||
config = {
|
||||
'experimental_features': ['strongly-consistent-tables']
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'sc_groups_manager=debug',
|
||||
'--logger-log-level', 'sc_coordinator=debug'
|
||||
]
|
||||
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
||||
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
||||
# because we want `servers[2]` to receive Raft commands from others.
|
||||
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
||||
(cql, hosts) = await manager.get_ready_cql(servers)
|
||||
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
||||
|
||||
def host_by_host_id(host_id):
|
||||
for hid, host in zip(host_ids, hosts):
|
||||
if hid == host_id:
|
||||
return host
|
||||
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||
assert leader_host_id != host_ids[2]
|
||||
leader_host = host_by_host_id(leader_host_id)
|
||||
|
||||
table_schema_version = (await cql.run_async(f"SELECT version FROM system_schema.scylla_tables WHERE keyspace_name = '{ks}' AND table_name = 'test'"))[0].version
|
||||
|
||||
await manager.api.enable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply", one_shot=False)
|
||||
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=leader_host)
|
||||
# Following injection simulates that old schema version was already removed from the memory
|
||||
await manager.api.enable_injection(servers[2].ip_addr, "schema_registry_ignore_version", one_shot=False, parameters={'value': table_schema_version})
|
||||
await manager.api.message_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
||||
await manager.api.disable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
||||
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
||||
assert len(rows) == 1
|
||||
row = rows[0]
|
||||
assert row.pk == 10
|
||||
assert row.c == 20
|
||||
assert row.new_col is None
|
||||
|
||||
@@ -960,7 +960,7 @@ private:
|
||||
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); });
|
||||
|
||||
_groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp),
|
||||
std::ref(_db), std::ref(_feature_service)).get();
|
||||
std::ref(_db), std::ref(_mm), std::ref(_sys_ks), std::ref(_feature_service)).get();
|
||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); });
|
||||
|
||||
_sc_coordinator.start(std::ref(_groups_manager), std::ref(_db)).get();
|
||||
|
||||
Reference in New Issue
Block a user