mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
Merge 'strong_consistency/state_machine: ensure and upgrade mutations schema' from Michał Jadwiszczak
This patch fixes 2 issues within strong consistency state machine: - it might happen that apply is called before the schema is delivered to the node - on the other hand, the apply may be called after the schema was changed and purged from the schema registry The first problem is fixed by doing `group0.read_barrier()` before applying the mutations. The second one is solved by upgrading the mutations using column mappings in case the version of the mutations' schema is older. Fixes SCYLLADB-428 Strong consistency is in experimental phase, no need to backport. Closes scylladb/scylladb#28546 * https://github.com/scylladb/scylladb: test/cluster/test_strong_consistency: add reproducer for old schema during apply test/cluster/test_strong_consistency: add reproducer for missing schema during apply test/cluster/test_strong_consistency: extract common function raft_group_registry: allow to drop append entries requests for specific raft group strong_consistency/state_machine: find and hold schemas of applying mutations strong_consistency/state_machine: pull necessary dependencies db/schema_tables: add `get_column_mapping_if_exists()`
This commit is contained in:
@@ -2840,20 +2840,15 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
|
|||||||
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
||||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||||
|
|
||||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks.query_processor().execute_internal(
|
||||||
GET_COLUMN_MAPPING_QUERY,
|
GET_COLUMN_MAPPING_QUERY,
|
||||||
db::consistency_level::LOCAL_ONE,
|
db::consistency_level::LOCAL_ONE,
|
||||||
{table_id.uuid(), version.uuid()},
|
{table_id.uuid(), version.uuid()},
|
||||||
cql3::query_processor::cache_internal::no
|
cql3::query_processor::cache_internal::no
|
||||||
);
|
);
|
||||||
if (results->empty()) {
|
if (results->empty()) {
|
||||||
// If we don't have a stored column_mapping for an obsolete schema version
|
co_return std::nullopt;
|
||||||
// then it means it's way too old and been cleaned up already.
|
|
||||||
// Fail the whole learn stage in this case.
|
|
||||||
co_await coroutine::return_exception(std::runtime_error(
|
|
||||||
format("Failed to look up column mapping for schema version {}",
|
|
||||||
version)));
|
|
||||||
}
|
}
|
||||||
std::vector<column_definition> static_columns, regular_columns;
|
std::vector<column_definition> static_columns, regular_columns;
|
||||||
for (const auto& row : *results) {
|
for (const auto& row : *results) {
|
||||||
@@ -2881,6 +2876,18 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_i
|
|||||||
co_return std::move(cm);
|
co_return std::move(cm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||||
|
auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version);
|
||||||
|
if (!cm_opt) {
|
||||||
|
// If we don't have a stored column_mapping for an obsolete schema version
|
||||||
|
// then it means it's way too old and been cleaned up already.
|
||||||
|
co_await coroutine::return_exception(std::runtime_error(
|
||||||
|
format("Failed to look up column mapping for schema version {}",
|
||||||
|
version)));
|
||||||
|
}
|
||||||
|
co_return std::move(*cm_opt);
|
||||||
|
}
|
||||||
|
|
||||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||||
GET_COLUMN_MAPPING_QUERY,
|
GET_COLUMN_MAPPING_QUERY,
|
||||||
|
|||||||
@@ -320,6 +320,8 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
|
|||||||
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
|
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
|
||||||
/// Query column mapping for a given version of the table locally.
|
/// Query column mapping for a given version of the table locally.
|
||||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||||
|
/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist.
|
||||||
|
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||||
/// Check that column mapping exists for a given version of the table
|
/// Check that column mapping exists for a given version of the table
|
||||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||||
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
|
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
|
||||||
|
|||||||
2
main.cc
2
main.cc
@@ -1831,7 +1831,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
checkpoint(stop_signal, "initializing strongly consistent groups manager");
|
checkpoint(stop_signal, "initializing strongly consistent groups manager");
|
||||||
sharded<service::strong_consistency::groups_manager> groups_manager;
|
sharded<service::strong_consistency::groups_manager> groups_manager;
|
||||||
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
|
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
|
||||||
std::ref(db), std::ref(feature_service)).get();
|
std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get();
|
||||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
|
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
|
||||||
groups_manager.stop().get();
|
groups_manager.stop().get();
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
#include "fsm.hh"
|
#include "fsm.hh"
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <seastar/core/coroutine.hh>
|
#include <seastar/core/coroutine.hh>
|
||||||
|
#include "raft/raft.hh"
|
||||||
#include "utils/assert.hh"
|
#include "utils/assert.hh"
|
||||||
#include "utils/error_injection.hh"
|
#include "utils/error_injection.hh"
|
||||||
|
|
||||||
@@ -205,6 +206,11 @@ void fsm::become_follower(server_id leader) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
|
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
|
||||||
|
if (utils::get_local_injector().enter("avoid_being_raft_leader")) {
|
||||||
|
become_follower(server_id{});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!std::holds_alternative<candidate>(_state)) {
|
if (!std::holds_alternative<candidate>(_state)) {
|
||||||
_output.state_changed = true;
|
_output.state_changed = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
#include <seastar/core/sharded.hh>
|
#include <seastar/core/sharded.hh>
|
||||||
|
|
||||||
#include "schema_registry.hh"
|
#include "schema_registry.hh"
|
||||||
|
#include "utils/error_injection.hh"
|
||||||
#include "utils/log.hh"
|
#include "utils/log.hh"
|
||||||
#include "db/schema_tables.hh"
|
#include "db/schema_tables.hh"
|
||||||
#include "view_info.hh"
|
#include "view_info.hh"
|
||||||
@@ -103,6 +104,12 @@ schema_ptr schema_registry::learn(schema_ptr s) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
schema_registry_entry& schema_registry::get_entry(table_schema_version v) const {
|
schema_registry_entry& schema_registry::get_entry(table_schema_version v) const {
|
||||||
|
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
||||||
|
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
||||||
|
throw schema_version_not_found(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto i = _entries.find(v);
|
auto i = _entries.find(v);
|
||||||
if (i == _entries.end()) {
|
if (i == _entries.end()) {
|
||||||
throw schema_version_not_found(v);
|
throw schema_version_not_found(v);
|
||||||
@@ -142,6 +149,12 @@ future<schema_ptr> schema_registry::get_or_load(table_schema_version v, const as
|
|||||||
}
|
}
|
||||||
|
|
||||||
schema_ptr schema_registry::get_or_null(table_schema_version v) const {
|
schema_ptr schema_registry::get_or_null(table_schema_version v) const {
|
||||||
|
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
||||||
|
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto i = _entries.find(v);
|
auto i = _entries.find(v);
|
||||||
if (i == _entries.end()) {
|
if (i == _entries.end()) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
|
|||||||
@@ -124,7 +124,12 @@ void raft_group_registry::init_rpc_verbs() {
|
|||||||
|
|
||||||
ser::raft_rpc_verbs::register_raft_append_entries(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
ser::raft_rpc_verbs::register_raft_append_entries(&_ms, [handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
|
||||||
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable {
|
raft::group_id gid, raft::server_id from, raft::server_id dst, raft::append_request append_request) mutable {
|
||||||
return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id()] (raft_rpc& rpc) mutable {
|
return handle_raft_rpc(cinfo, gid, from, dst, [from, append_request = std::move(append_request), original_shard_id = this_shard_id(), gid] (raft_rpc& rpc) mutable {
|
||||||
|
if (auto ignore_group_id = utils::get_local_injector().inject_parameter<std::string_view>("raft_drop_incoming_append_entries_for_specified_group"); ignore_group_id) {
|
||||||
|
if (gid == raft::group_id{utils::UUID(*ignore_group_id)}) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (utils::get_local_injector().enter("raft_drop_incoming_append_entries")) {
|
if (utils::get_local_injector().enter("raft_drop_incoming_append_entries")) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
|
|
||||||
#include "groups_manager.hh"
|
#include "groups_manager.hh"
|
||||||
|
|
||||||
|
#include "service/migration_manager.hh"
|
||||||
#include "service/strong_consistency/state_machine.hh"
|
#include "service/strong_consistency/state_machine.hh"
|
||||||
#include "service/strong_consistency/raft_groups_storage.hh"
|
#include "service/strong_consistency/raft_groups_storage.hh"
|
||||||
#include "gms/feature_service.hh"
|
#include "gms/feature_service.hh"
|
||||||
@@ -94,11 +95,13 @@ auto raft_server::begin_mutate() -> begin_mutate_result {
|
|||||||
|
|
||||||
groups_manager::groups_manager(netw::messaging_service& ms,
|
groups_manager::groups_manager(netw::messaging_service& ms,
|
||||||
raft_group_registry& raft_gr, cql3::query_processor& qp,
|
raft_group_registry& raft_gr, cql3::query_processor& qp,
|
||||||
replica::database& db, gms::feature_service& features)
|
replica::database& db, service::migration_manager& mm, db::system_keyspace& sys_ks, gms::feature_service& features)
|
||||||
: _ms(ms)
|
: _ms(ms)
|
||||||
, _raft_gr(raft_gr)
|
, _raft_gr(raft_gr)
|
||||||
, _qp(qp)
|
, _qp(qp)
|
||||||
, _db(db)
|
, _db(db)
|
||||||
|
, _mm(mm)
|
||||||
|
, _sys_ks(sys_ks)
|
||||||
, _features(features)
|
, _features(features)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@@ -109,7 +112,7 @@ future<> groups_manager::start_raft_group(global_tablet_id tablet,
|
|||||||
{
|
{
|
||||||
const auto my_id = to_server_id(tm->get_my_id());
|
const auto my_id = to_server_id(tm->get_my_id());
|
||||||
|
|
||||||
auto state_machine = make_state_machine(tablet, group_id, _db);
|
auto state_machine = make_state_machine(tablet, group_id, _db, _mm, _sys_ks);
|
||||||
auto& state_machine_ref = *state_machine;
|
auto& state_machine_ref = *state_machine;
|
||||||
auto rpc = std::make_unique<rpc_impl>(state_machine_ref, _ms, _raft_gr.failure_detector(), group_id, my_id);
|
auto rpc = std::make_unique<rpc_impl>(state_machine_ref, _ms, _raft_gr.failure_detector(), group_id, my_id);
|
||||||
// Keep a reference to a specific RPC class.
|
// Keep a reference to a specific RPC class.
|
||||||
|
|||||||
@@ -13,6 +13,14 @@
|
|||||||
#include "service/raft/raft_group_registry.hh"
|
#include "service/raft/raft_group_registry.hh"
|
||||||
#include "cql3/query_processor.hh"
|
#include "cql3/query_processor.hh"
|
||||||
|
|
||||||
|
namespace db {
|
||||||
|
class system_keyspace;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace service {
|
||||||
|
class migration_manager;
|
||||||
|
}
|
||||||
|
|
||||||
namespace service::strong_consistency {
|
namespace service::strong_consistency {
|
||||||
|
|
||||||
class raft_server;
|
class raft_server;
|
||||||
@@ -67,6 +75,8 @@ class groups_manager : public peering_sharded_service<groups_manager> {
|
|||||||
raft_group_registry& _raft_gr;
|
raft_group_registry& _raft_gr;
|
||||||
cql3::query_processor& _qp;
|
cql3::query_processor& _qp;
|
||||||
replica::database& _db;
|
replica::database& _db;
|
||||||
|
service::migration_manager& _mm;
|
||||||
|
db::system_keyspace& _sys_ks;
|
||||||
gms::feature_service& _features;
|
gms::feature_service& _features;
|
||||||
std::unordered_map<raft::group_id, raft_group_state> _raft_groups = {};
|
std::unordered_map<raft::group_id, raft_group_state> _raft_groups = {};
|
||||||
locator::token_metadata_ptr _pending_tm = nullptr;
|
locator::token_metadata_ptr _pending_tm = nullptr;
|
||||||
@@ -87,7 +97,7 @@ class groups_manager : public peering_sharded_service<groups_manager> {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr,
|
groups_manager(netw::messaging_service& ms, raft_group_registry& raft_gr,
|
||||||
cql3::query_processor& qp, replica::database& _db,
|
cql3::query_processor& qp, replica::database& _db, service::migration_manager& mm, db::system_keyspace& sys_ks,
|
||||||
gms::feature_service& features);
|
gms::feature_service& features);
|
||||||
|
|
||||||
// Called whenever a new token_metadata is published on this shard.
|
// Called whenever a new token_metadata is published on this shard.
|
||||||
|
|||||||
@@ -6,31 +6,53 @@
|
|||||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <seastar/core/shard_id.hh>
|
||||||
|
#include <seastar/core/on_internal_error.hh>
|
||||||
#include "state_machine.hh"
|
#include "state_machine.hh"
|
||||||
|
#include "db/schema_tables.hh"
|
||||||
|
#include "mutation/frozen_mutation.hh"
|
||||||
|
#include "schema/schema_registry.hh"
|
||||||
#include "serializer_impl.hh"
|
#include "serializer_impl.hh"
|
||||||
#include "idl/strong_consistency/state_machine.dist.hh"
|
#include "idl/strong_consistency/state_machine.dist.hh"
|
||||||
#include "idl/strong_consistency/state_machine.dist.impl.hh"
|
#include "idl/strong_consistency/state_machine.dist.impl.hh"
|
||||||
#include "replica/database.hh"
|
#include "replica/database.hh"
|
||||||
|
#include "service/migration_manager.hh"
|
||||||
|
#include "db/system_keyspace.hh"
|
||||||
|
#include "utils/loading_cache.hh"
|
||||||
|
#include "utils/error_injection.hh"
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace service::strong_consistency {
|
namespace service::strong_consistency {
|
||||||
|
|
||||||
|
static logging::logger logger("sc_state_machine");
|
||||||
|
|
||||||
class state_machine : public raft_state_machine {
|
class state_machine : public raft_state_machine {
|
||||||
locator::global_tablet_id _tablet;
|
locator::global_tablet_id _tablet;
|
||||||
raft::group_id _group_id;
|
raft::group_id _group_id;
|
||||||
replica::database& _db;
|
replica::database& _db;
|
||||||
|
service::migration_manager& _mm;
|
||||||
|
db::system_keyspace& _sys_ks;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
state_machine(locator::global_tablet_id tablet,
|
state_machine(locator::global_tablet_id tablet,
|
||||||
raft::group_id gid,
|
raft::group_id gid,
|
||||||
replica::database& db)
|
replica::database& db,
|
||||||
|
service::migration_manager& mm,
|
||||||
|
db::system_keyspace& sys_ks)
|
||||||
: _tablet(tablet)
|
: _tablet(tablet)
|
||||||
, _group_id(gid)
|
, _group_id(gid)
|
||||||
, _db(db)
|
, _db(db)
|
||||||
|
, _mm(mm)
|
||||||
|
, _sys_ks(sys_ks)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> apply(std::vector<raft::command_cref> command) override {
|
future<> apply(std::vector<raft::command_cref> command) override {
|
||||||
|
static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
co_await utils::get_local_injector().inject("strong_consistency_state_machine_wait_before_apply", utils::wait_for_message(20min));
|
||||||
utils::chunked_vector<frozen_mutation> muts;
|
utils::chunked_vector<frozen_mutation> muts;
|
||||||
muts.reserve(command.size());
|
muts.reserve(command.size());
|
||||||
for (const auto& c: command) {
|
for (const auto& c: command) {
|
||||||
@@ -38,8 +60,18 @@ public:
|
|||||||
auto cmd = ser::deserialize(is, std::type_identity<raft_command>{});
|
auto cmd = ser::deserialize(is, std::type_identity<raft_command>{});
|
||||||
muts.push_back(std::move(cmd.mutation));
|
muts.push_back(std::move(cmd.mutation));
|
||||||
}
|
}
|
||||||
|
// Hold pointers to schemas until `_db.apply()` is finished
|
||||||
|
auto schemas = co_await get_schema_and_upgrade_mutations(muts);
|
||||||
co_await _db.apply(std::move(muts), db::no_timeout);
|
co_await _db.apply(std::move(muts), db::no_timeout);
|
||||||
} catch (...) {
|
} catch (replica::no_such_column_family&) {
|
||||||
|
// If the table doesn't exist, it means it was already dropped.
|
||||||
|
// This cannot happen if the table wasn't created yet on the node
|
||||||
|
// because the state machine is created only after the table is created
|
||||||
|
// (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`).
|
||||||
|
// In this case, we should just ignore mutations without throwing an error.
|
||||||
|
logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table);
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
throw std::runtime_error(::format(
|
throw std::runtime_error(::format(
|
||||||
"tablet {}, group id {}: error while applying mutations {}",
|
"tablet {}, group id {}: error while applying mutations {}",
|
||||||
_tablet, _group_id, std::current_exception()));
|
_tablet, _group_id, std::current_exception()));
|
||||||
@@ -65,13 +97,96 @@ public:
|
|||||||
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override {
|
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override {
|
||||||
throw std::runtime_error("transfer_snapshot() not implemented");
|
throw std::runtime_error("transfer_snapshot() not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
using column_mappings_cache = utils::loading_cache<table_schema_version, column_mapping>;
|
||||||
|
using schema_store = std::unordered_map<table_schema_version, std::pair<schema_ptr, column_mappings_cache::value_ptr>>;
|
||||||
|
future<schema_store> get_schema_and_upgrade_mutations(utils::chunked_vector<frozen_mutation>& muts) {
|
||||||
|
// Cache column mappings to avoid querying `system.scylla_table_schema_history` multiple times.
|
||||||
|
static thread_local column_mappings_cache column_mapping_cache(std::numeric_limits<size_t>::max(), 1h, logger);
|
||||||
|
// Stores schema pointer and optional column mapping for each schema version present in the mutations
|
||||||
|
schema_store schema_mappings;
|
||||||
|
bool barrier_executed = false;
|
||||||
|
|
||||||
|
auto get_schema = [&] (table_schema_version schema_version) -> future<std::pair<schema_ptr, column_mappings_cache::value_ptr>> {
|
||||||
|
auto schema = local_schema_registry().get_or_null(schema_version);
|
||||||
|
if (schema) {
|
||||||
|
co_return std::pair{std::move(schema), nullptr};
|
||||||
|
}
|
||||||
|
|
||||||
|
// `_db.find_schema()` may throw `replica::no_such_column_family` if the table was already dropped.
|
||||||
|
schema = _db.find_schema(_tablet.table);
|
||||||
|
// The column mapping may be already present in the cache from another `apply()` call
|
||||||
|
auto cm_ptr = column_mapping_cache.find(schema_version);
|
||||||
|
if (cm_ptr) {
|
||||||
|
co_return std::pair{std::move(schema), std::move(cm_ptr)};
|
||||||
|
}
|
||||||
|
|
||||||
|
// We may not find the column mapping if the mutation schema is newer than the present schema.
|
||||||
|
// In this case, we should trigger the barrier to wait for the schema to be updated and then try again.
|
||||||
|
auto cm_opt = co_await db::schema_tables::get_column_mapping_if_exists(_sys_ks, _tablet.table, schema_version);
|
||||||
|
if (!cm_opt) {
|
||||||
|
co_return std::pair{nullptr, nullptr};
|
||||||
|
}
|
||||||
|
|
||||||
|
cm_ptr = co_await column_mapping_cache.get_ptr(schema_version, [cm = std::move(*cm_opt)] (auto schema_version) -> future<column_mapping> {
|
||||||
|
co_return std::move(cm);
|
||||||
|
});
|
||||||
|
co_return std::pair{std::move(schema), std::move(cm_ptr)};
|
||||||
|
};
|
||||||
|
|
||||||
|
auto resolve_schema = [&] (const frozen_mutation& mut) -> future<const schema_store::mapped_type*> {
|
||||||
|
auto schema_version = mut.schema_version();
|
||||||
|
auto it = schema_mappings.find(schema_version);
|
||||||
|
if (it != schema_mappings.end()) {
|
||||||
|
co_return &it->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto schema_cm = co_await get_schema(schema_version);
|
||||||
|
if (!schema_cm.first && !barrier_executed) {
|
||||||
|
if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) {
|
||||||
|
utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group");
|
||||||
|
}
|
||||||
|
// TODO: pass valid abort source
|
||||||
|
co_await _mm.get_group0_barrier().trigger();
|
||||||
|
barrier_executed = true;
|
||||||
|
schema_cm = co_await get_schema(schema_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (schema_cm.first) {
|
||||||
|
const auto [it, _] = schema_mappings.insert({schema_version, std::move(schema_cm)});
|
||||||
|
co_return &it->second;
|
||||||
|
}
|
||||||
|
co_return nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
for (auto& m: muts) {
|
||||||
|
auto schema_entry = co_await resolve_schema(m);
|
||||||
|
if (!schema_entry) {
|
||||||
|
// Old schema are TTLed after 10 days (see comment in `schema_applier::finalize_tables_and_views()`),
|
||||||
|
// so this error theoretically may be triggered if a node is stuck longer than this.
|
||||||
|
// But in practice we should do a snapshot much earlier, that's why `on_internal_error()` here.
|
||||||
|
// And if the table was already dropped, `no_such_column_family` will be dropped earlier.
|
||||||
|
on_internal_error(logger, fmt::format("couldn't find schema for table {} and mutation schema version {}", _tablet.table, m.schema_version()));
|
||||||
|
}
|
||||||
|
if (schema_entry->second) {
|
||||||
|
m = freeze(m.unfreeze_upgrading(schema_entry->first, *schema_entry->second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We only need vector of schema pointers but we're returning the whole map
|
||||||
|
// to avoid another allocation
|
||||||
|
co_return std::move(schema_mappings);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
||||||
raft::group_id gid,
|
raft::group_id gid,
|
||||||
replica::database& db)
|
replica::database& db,
|
||||||
|
service::migration_manager& mm,
|
||||||
|
db::system_keyspace& sys_ks)
|
||||||
{
|
{
|
||||||
return std::make_unique<state_machine>(tablet, gid, db);
|
return std::make_unique<state_machine>(tablet, gid, db, mm, sys_ks);
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
@@ -12,6 +12,14 @@
|
|||||||
#include "mutation/frozen_mutation.hh"
|
#include "mutation/frozen_mutation.hh"
|
||||||
#include "locator/tablets.hh"
|
#include "locator/tablets.hh"
|
||||||
|
|
||||||
|
namespace db {
|
||||||
|
class system_keyspace;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace service {
|
||||||
|
class migration_manager;
|
||||||
|
}
|
||||||
|
|
||||||
namespace service::strong_consistency {
|
namespace service::strong_consistency {
|
||||||
|
|
||||||
struct raft_command {
|
struct raft_command {
|
||||||
@@ -19,6 +27,8 @@ struct raft_command {
|
|||||||
};
|
};
|
||||||
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
|
||||||
raft::group_id gid,
|
raft::group_id gid,
|
||||||
replica::database& db);
|
replica::database& db,
|
||||||
|
service::migration_manager& mm,
|
||||||
|
db::system_keyspace& sys_ks);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -95,6 +95,11 @@ async def assert_no_cross_shard_routing(manager: ManagerClient, server: ServerIn
|
|||||||
f"but partitioner computed shard {shard_from_partitioner}."
|
f"but partitioner computed shard {shard_from_partitioner}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_table_raft_group_id(manager: ManagerClient, ks: str, table: str):
|
||||||
|
table_id = await manager.get_table_id(ks, table)
|
||||||
|
rows = await manager.get_cql().run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}")
|
||||||
|
return str(rows[0].raft_group_id)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_basic_write_read(manager: ManagerClient):
|
async def test_basic_write_read(manager: ManagerClient):
|
||||||
|
|
||||||
@@ -124,9 +129,7 @@ async def test_basic_write_read(manager: ManagerClient):
|
|||||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||||
|
|
||||||
logger.info("Select raft group id for the tablet")
|
logger.info("Select raft group id for the tablet")
|
||||||
table_id = await manager.get_table_id(ks, 'test')
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||||
rows = await cql.run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}")
|
|
||||||
group_id = str(rows[0].raft_group_id)
|
|
||||||
|
|
||||||
logger.info(f"Get current leader for the group {group_id}")
|
logger.info(f"Get current leader for the group {group_id}")
|
||||||
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||||
@@ -457,3 +460,103 @@ async def test_sc_persistence_after_crash(manager: ManagerClient):
|
|||||||
await assert_no_cross_shard_routing(manager, server)
|
await assert_no_cross_shard_routing(manager, server)
|
||||||
|
|
||||||
await manager.server_stop_gracefully(server.server_id)
|
await manager.server_stop_gracefully(server.server_id)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
|
async def test_no_schema_when_apply_write(manager: ManagerClient):
|
||||||
|
config = {
|
||||||
|
'experimental_features': ['strongly-consistent-tables']
|
||||||
|
}
|
||||||
|
cmdline = [
|
||||||
|
'--logger-log-level', 'sc_groups_manager=debug',
|
||||||
|
'--logger-log-level', 'sc_coordinator=debug'
|
||||||
|
]
|
||||||
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
||||||
|
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
||||||
|
# because we want `servers[2]` to receive Raft commands from others.
|
||||||
|
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
||||||
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
||||||
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
||||||
|
|
||||||
|
def host_by_host_id(host_id):
|
||||||
|
for hid, host in zip(host_ids, hosts):
|
||||||
|
if hid == host_id:
|
||||||
|
return host
|
||||||
|
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
||||||
|
|
||||||
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
||||||
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||||
|
|
||||||
|
# Drop incoming append entries from group0 (schema changes) on `servers[2]` after the table is created,
|
||||||
|
# so strong consistency raft groups are created on the node but it won't receive next alter table mutations.
|
||||||
|
group0_id = (await cql.run_async("SELECT value FROM system.scylla_local WHERE key = 'raft_group0_id'"))[0].value
|
||||||
|
await manager.api.enable_injection(servers[2].ip_addr, "raft_drop_incoming_append_entries_for_specified_group", one_shot=False, parameters={'value': group0_id})
|
||||||
|
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=hosts[0])
|
||||||
|
|
||||||
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||||
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||||
|
assert leader_host_id != host_ids[2]
|
||||||
|
leader_host = host_by_host_id(leader_host_id)
|
||||||
|
|
||||||
|
s2_log = await manager.server_open_log(servers[2].server_id)
|
||||||
|
s2_mark = await s2_log.mark()
|
||||||
|
|
||||||
|
await manager.api.enable_injection(servers[2].ip_addr, "disable_raft_drop_append_entries_for_specified_group", one_shot=True)
|
||||||
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c, new_col) VALUES (10, 20, 30)", host=leader_host)
|
||||||
|
|
||||||
|
await s2_log.wait_for(f"Column definitions for {ks}.test changed", timeout=60, from_mark=s2_mark)
|
||||||
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
||||||
|
assert len(rows) == 1
|
||||||
|
row = rows[0]
|
||||||
|
assert row.pk == 10
|
||||||
|
assert row.c == 20
|
||||||
|
assert row.new_col == 30
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
|
async def test_old_schema_when_apply_write(manager: ManagerClient):
|
||||||
|
config = {
|
||||||
|
'experimental_features': ['strongly-consistent-tables']
|
||||||
|
}
|
||||||
|
cmdline = [
|
||||||
|
'--logger-log-level', 'sc_groups_manager=debug',
|
||||||
|
'--logger-log-level', 'sc_coordinator=debug'
|
||||||
|
]
|
||||||
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
||||||
|
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
||||||
|
# because we want `servers[2]` to receive Raft commands from others.
|
||||||
|
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
||||||
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
||||||
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
||||||
|
|
||||||
|
def host_by_host_id(host_id):
|
||||||
|
for hid, host in zip(host_ids, hosts):
|
||||||
|
if hid == host_id:
|
||||||
|
return host
|
||||||
|
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
||||||
|
|
||||||
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
||||||
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||||
|
|
||||||
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
||||||
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
||||||
|
assert leader_host_id != host_ids[2]
|
||||||
|
leader_host = host_by_host_id(leader_host_id)
|
||||||
|
|
||||||
|
table_schema_version = (await cql.run_async(f"SELECT version FROM system_schema.scylla_tables WHERE keyspace_name = '{ks}' AND table_name = 'test'"))[0].version
|
||||||
|
|
||||||
|
await manager.api.enable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply", one_shot=False)
|
||||||
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host)
|
||||||
|
|
||||||
|
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=leader_host)
|
||||||
|
# Following injection simulates that old schema version was already removed from the memory
|
||||||
|
await manager.api.enable_injection(servers[2].ip_addr, "schema_registry_ignore_version", one_shot=False, parameters={'value': table_schema_version})
|
||||||
|
await manager.api.message_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
||||||
|
await manager.api.disable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
||||||
|
|
||||||
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
||||||
|
assert len(rows) == 1
|
||||||
|
row = rows[0]
|
||||||
|
assert row.pk == 10
|
||||||
|
assert row.c == 20
|
||||||
|
assert row.new_col is None
|
||||||
|
|||||||
@@ -960,7 +960,7 @@ private:
|
|||||||
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); });
|
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); });
|
||||||
|
|
||||||
_groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp),
|
_groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp),
|
||||||
std::ref(_db), std::ref(_feature_service)).get();
|
std::ref(_db), std::ref(_mm), std::ref(_sys_ks), std::ref(_feature_service)).get();
|
||||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); });
|
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); });
|
||||||
|
|
||||||
_sc_coordinator.start(std::ref(_groups_manager), std::ref(_db)).get();
|
_sc_coordinator.start(std::ref(_groups_manager), std::ref(_db)).get();
|
||||||
|
|||||||
Reference in New Issue
Block a user