Merge 'Introduce a queue of global topology requests.' from Gleb Natapov
Currently only one global topology request (such as truncate, cdc repair, cleanup and alter table) can be pending. If one is already pending others will be rejected with an error. This is not very user friendly, so this series introduces a queue of global requests which allows queuing many global topology requests simultaneously. Fixes: #16822 No need to backport since this is a new feature. Closes scylladb/scylladb#24293 * https://github.com/scylladb/scylladb: topology coordinator: simplify truncate handling in case request queue feature is disable topology coordinator: fix indentation after the previous patch topology coordinator: allow running multiple global commands in parallel topology coordinator: Implement global topology request queue topology coordinator: Do not cancel global requests in cancel_all_requests topology coordinator: store request type for each global command topology request: make it possible to hold global request types in request_type field topology coordinator: move alter table global request parameters into topology_request table topology coordinator: move cleanup global command to report completion through topology_request table topology coordinator: no need to create updates vector explicitly topology coordinator: use topology_request_tracking_mutation_builder::done() instead of open code it topology coordinator: handle error during new_cdc_generation command processing topology coordinator: remove unneeded semicolon topology coordinator: fix indentation after the last commit topology coordinator: move new_cdc_generation topology request to use topology_request table for completion gms/feature_service: add TOPOLOGY_GLOBAL_REQUEST_QUEUE feature flag
This commit is contained in:
@@ -12,8 +12,10 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <stdexcept>
|
||||
#include <vector>
|
||||
#include "alter_keyspace_statement.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
@@ -232,7 +234,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.topology_global_queue_empty()) {
|
||||
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
@@ -243,9 +245,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
service::topology_mutation_builder builder(ts);
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(global_request_id);
|
||||
builder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
if (!qp.proxy().features().topology_global_request_queue) {
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(global_request_id);
|
||||
builder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
} else {
|
||||
builder.queue_global_topology_request_id(global_request_id);
|
||||
};
|
||||
service::topology_change change{{builder.build()}};
|
||||
|
||||
auto topo_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
||||
@@ -253,9 +259,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
return cm.to_mutation(topo_schema);
|
||||
});
|
||||
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id};
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", service::global_topology_request::keyspace_rf_change);
|
||||
if (qp.proxy().features().topology_global_request_queue) {
|
||||
rtbuilder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
}
|
||||
service::topology_change req_change{{rtbuilder.build()}};
|
||||
|
||||
auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
|
||||
|
||||
@@ -260,8 +260,8 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("request_id", timeuuid_type)
|
||||
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
||||
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column) // deprecated
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column) // deprecated
|
||||
.with_column("version", long_type, column_kind::static_column)
|
||||
.with_column("fence_version", long_type, column_kind::static_column)
|
||||
.with_column("transition_state", utf8_type, column_kind::static_column)
|
||||
@@ -273,6 +273,7 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("session", uuid_type, column_kind::static_column)
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -292,6 +293,8 @@ schema_ptr system_keyspace::topology_requests() {
|
||||
.with_column("error", utf8_type)
|
||||
.with_column("end_time", timestamp_type)
|
||||
.with_column("truncate_table_id", uuid_type)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.set_comment("Topology request tracking")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -3294,6 +3297,12 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
|
||||
}
|
||||
|
||||
if (some_row.has("global_requests")) {
|
||||
for (auto&& v : deserialize_set_column(*topology(), some_row, "global_requests")) {
|
||||
ret.global_requests_queue.push_back(value_cast<utils::UUID>(v));
|
||||
}
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
@@ -3469,7 +3478,13 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
entry.initiating_host = row.get_as<utils::UUID>("initiating_host");
|
||||
}
|
||||
if (row.has("request_type")) {
|
||||
entry.request_type = service::topology_request_from_string(row.get_as<sstring>("request_type"));
|
||||
auto rts = row.get_as<sstring>("request_type");
|
||||
auto rt = service::try_topology_request_from_string(rts);
|
||||
if (rt) {
|
||||
entry.request_type = *rt;
|
||||
} else {
|
||||
entry.request_type = service::global_topology_request_from_string(rts);
|
||||
}
|
||||
}
|
||||
if (row.has("start_time")) {
|
||||
entry.start_time = row.get_as<db_clock::time_point>("start_time");
|
||||
@@ -3486,6 +3501,11 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
if (row.has("truncate_table_id")) {
|
||||
entry.truncate_table_id = table_id(row.get_as<utils::UUID>("truncate_table_id"));
|
||||
}
|
||||
if (row.has("new_keyspace_rf_change_data")) {
|
||||
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
|
||||
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
||||
@@ -395,13 +395,17 @@ public:
|
||||
struct topology_requests_entry {
|
||||
utils::UUID id;
|
||||
utils::UUID initiating_host;
|
||||
std::optional<service::topology_request> request_type;
|
||||
std::variant<std::monostate, service::topology_request, service::global_topology_request> request_type;
|
||||
db_clock::time_point start_time;
|
||||
bool done;
|
||||
sstring error;
|
||||
db_clock::time_point end_time;
|
||||
db_clock::time_point ts;
|
||||
table_id truncate_table_id;
|
||||
// The name of the KS that is being the target of the scheduled ALTER KS statement
|
||||
std::optional<sstring> new_keyspace_rf_change_ks_name;
|
||||
// The KS options to be used when executing the scheduled ALTER KS statement
|
||||
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
|
||||
};
|
||||
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;
|
||||
|
||||
|
||||
@@ -668,6 +668,7 @@ CREATE TABLE system.topology (
|
||||
new_cdc_generation_data_uuid timeuuid static,
|
||||
new_keyspace_rf_change_ks_name text static,
|
||||
new_keyspace_rf_change_data frozen<map<text, text>> static,
|
||||
global_requests set<timeuuid_type> static,
|
||||
PRIMARY KEY (key, host_id)
|
||||
)
|
||||
```
|
||||
@@ -701,6 +702,8 @@ There are also a few static columns for cluster-global properties:
|
||||
- `upgrade_state` - describes the progress of the upgrade to raft-based topology.
|
||||
- `new_keyspace_rf_change_ks_name` - the name of the KS that is being the target of the scheduled ALTER KS statement
|
||||
- `new_keyspace_rf_change_data` - the KS options to be used when executing the scheduled ALTER KS statement
|
||||
- `global_requests` - contains a list of ids of pending global requests, the information about requests (type and parameters)
|
||||
can be obtained from topology_requests table by using request's id as a look up key.
|
||||
|
||||
# Join procedure
|
||||
|
||||
|
||||
@@ -171,6 +171,7 @@ public:
|
||||
gms::feature repair_based_tablet_rebuild { *this, "REPAIR_BASED_TABLET_REBUILD"sv };
|
||||
gms::feature enforced_raft_rpc_scheduling_group { *this, "ENFORCED_RAFT_RPC_SCHEDULING_GROUP"sv };
|
||||
gms::feature load_and_stream_abort_rpc_message { *this, "LOAD_AND_STREAM_ABORT_RPC_MESSAGE"sv };
|
||||
gms::feature topology_global_request_queue { *this, "TOPOLOGY_GLOBAL_REQUEST_QUEUE"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -14,24 +14,34 @@
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "tasks/virtual_task_hint.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include <variant>
|
||||
#include "utils/overloaded_functor.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace node_ops {
|
||||
|
||||
static sstring request_type_to_task_type(const std::optional<service::topology_request>& request_type) {
|
||||
return request_type.transform([] (auto type) -> sstring {
|
||||
switch (type) {
|
||||
case service::topology_request::join:
|
||||
return "bootstrap";
|
||||
case service::topology_request::remove:
|
||||
return "remove node";
|
||||
case service::topology_request::leave:
|
||||
return "decommission";
|
||||
default:
|
||||
return fmt::to_string(type);
|
||||
static sstring request_type_to_task_type(const std::variant<std::monostate, service::topology_request, service::global_topology_request>& request_type) {
|
||||
return std::visit(overloaded_functor {
|
||||
[] (const service::topology_request& type) -> sstring {
|
||||
switch (type) {
|
||||
case service::topology_request::join:
|
||||
return "bootstrap";
|
||||
case service::topology_request::remove:
|
||||
return "remove node";
|
||||
case service::topology_request::leave:
|
||||
return "decommission";
|
||||
default:
|
||||
return fmt::to_string(type);
|
||||
}
|
||||
},
|
||||
[] (const service::global_topology_request type) -> sstring {
|
||||
return fmt::to_string(type);
|
||||
},
|
||||
[] (const std::monostate type) -> sstring {
|
||||
return "";
|
||||
}
|
||||
}).value_or("");
|
||||
}, request_type);
|
||||
}
|
||||
|
||||
static tasks::task_manager::task_state get_state(const db::system_keyspace::topology_requests_entry& entry) {
|
||||
@@ -114,7 +124,7 @@ future<std::optional<tasks::virtual_task_hint>> node_ops_virtual_task::contains(
|
||||
}
|
||||
|
||||
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry(task_id.uuid(), false);
|
||||
co_return bool(entry.id) && entry.request_type ? empty_hint : std::nullopt;
|
||||
co_return bool(entry.id) && std::holds_alternative<service::topology_request>(entry.request_type) ? empty_hint : std::nullopt;
|
||||
}
|
||||
|
||||
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint) const {
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "db/consistency_level.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "storage_proxy.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "mutation/mutation.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
@@ -1070,44 +1071,51 @@ private:
|
||||
|
||||
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
|
||||
|
||||
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
|
||||
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
|
||||
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
|
||||
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
|
||||
// only for a truncate which is still waiting.
|
||||
std::optional<global_topology_request>& global_request = _topology_state_machine._topology.global_request;
|
||||
if (global_request.has_value()) {
|
||||
if (*global_request == global_topology_request::truncate_table) {
|
||||
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
|
||||
if (!tstate || *tstate != topology::transition_state::truncate_table) {
|
||||
const utils::UUID& ongoing_global_request_id = *_topology_state_machine._topology.global_request_id;
|
||||
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
|
||||
if (topology_requests_entry.truncate_table_id == table_id) {
|
||||
global_request_id = ongoing_global_request_id;
|
||||
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
|
||||
ks_name, cf_name, global_request_id);
|
||||
break;
|
||||
if (!_sp._features.topology_global_request_queue) {
|
||||
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
|
||||
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
|
||||
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
|
||||
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
|
||||
// only for a truncate which is still waiting.
|
||||
if (_topology_state_machine._topology.global_request) {
|
||||
utils::UUID ongoing_global_request_id = _topology_state_machine._topology.global_request_id.value();
|
||||
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
|
||||
auto global_request = std::get<service::global_topology_request>(topology_requests_entry.request_type);
|
||||
if (global_request == global_topology_request::truncate_table) {
|
||||
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
|
||||
if (!tstate || *tstate != topology::transition_state::truncate_table) {
|
||||
if (topology_requests_entry.truncate_table_id == table_id) {
|
||||
global_request_id = ongoing_global_request_id;
|
||||
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
|
||||
ks_name, cf_name, global_request_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
|
||||
global_request, ks_name, cf_name);
|
||||
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
|
||||
ks_name, cf_name));
|
||||
}
|
||||
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
|
||||
*global_request, ks_name, cf_name);
|
||||
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
|
||||
ks_name, cf_name));
|
||||
}
|
||||
|
||||
global_request_id = guard.new_group0_state_id();
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_global_topology_request(global_topology_request::truncate_table)
|
||||
.set_global_topology_request_id(global_request_id)
|
||||
.build());
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
if (!_sp._features.topology_global_request_queue) {
|
||||
builder.set_global_topology_request(global_topology_request::truncate_table)
|
||||
.set_global_topology_request_id(global_request_id);
|
||||
} else {
|
||||
builder.queue_global_topology_request_id(global_request_id);
|
||||
}
|
||||
updates.emplace_back(builder.build());
|
||||
|
||||
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id, _sp._features.topology_requests_type_column)
|
||||
.set_truncate_table_data(table_id)
|
||||
.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::truncate_table)
|
||||
.build());
|
||||
|
||||
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
|
||||
|
||||
@@ -4728,13 +4728,13 @@ future<> storage_service::do_drain() {
|
||||
future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto holder = _group0->hold_group0_gate();
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::cleanup) {
|
||||
// FIXME: replace this with a queue
|
||||
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::cleanup) {
|
||||
throw std::runtime_error{
|
||||
"topology coordinator: cluster cleanup: a different topology request is already pending, try again later"};
|
||||
}
|
||||
@@ -4753,8 +4753,20 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
|
||||
rtlogger.info("cluster cleanup requested");
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.set_global_topology_request(global_topology_request::cleanup);
|
||||
topology_change change{{builder.build()}};
|
||||
std::vector<canonical_mutation> muts;
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::cleanup);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
builder.set_global_topology_request(global_topology_request::cleanup);
|
||||
}
|
||||
muts.push_back(builder.build());
|
||||
topology_change change{std::move(muts)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
|
||||
|
||||
try {
|
||||
@@ -4766,7 +4778,18 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait cleanup finishes on all nodes
|
||||
if (request_id) {
|
||||
// Wait until request completes
|
||||
auto error = co_await wait_for_topology_request_completion(request_id);
|
||||
if (!error.empty()) {
|
||||
auto err = fmt::format("Cleanup failed. See earlier errors ({}). Request ID: {}", error, request_id);
|
||||
rtlogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
}
|
||||
|
||||
// The wait above only wait until the comand is processed by the topology coordinator which start cleanup process,
|
||||
// but we still need to wait for cleanup to complete here.
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
|
||||
return n.second.cleanup == cleanup_status::clean;
|
||||
@@ -4979,15 +5002,30 @@ future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
|
||||
|
||||
future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
std::optional<cdc::generation_id_v2> last_committed_gen;
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
std::optional<global_topology_request> curr_req;
|
||||
if (_topology_state_machine._topology.global_request) {
|
||||
curr_req = *_topology_state_machine._topology.global_request;
|
||||
request_id = _topology_state_machine._topology.global_request_id.value();
|
||||
} else if (!_topology_state_machine._topology.global_requests_queue.empty()) {
|
||||
request_id = _topology_state_machine._topology.global_requests_queue[0];
|
||||
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id, true);
|
||||
curr_req = std::get<global_topology_request>(req_entry.request_type);
|
||||
} else {
|
||||
request_id = utils::UUID{};
|
||||
}
|
||||
|
||||
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
|
||||
// FIXME: replace this with a queue
|
||||
throw std::runtime_error{
|
||||
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
|
||||
if (!_feature_service.topology_global_request_queue) {
|
||||
throw std::runtime_error{
|
||||
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
|
||||
} else {
|
||||
request_id = utils::UUID{};
|
||||
}
|
||||
}
|
||||
|
||||
if (_topology_state_machine._topology.committed_cdc_generations.empty()) {
|
||||
@@ -5002,27 +5040,62 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
cdc_log.info("CDC generation {} needs repair, requesting a new one", last_committed_gen);
|
||||
}
|
||||
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
|
||||
topology_change change{{builder.build()}};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
// With global request queue coalescing requests should not be needed, but test_cdc_generation_publishing assumes that multiple new_cdc_generation
|
||||
// commands will be coalesced here, so do that until the test is fixed.
|
||||
if (!request_id) {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
std::vector<canonical_mutation> muts;
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::new_cdc_generation);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
|
||||
}
|
||||
muts.push_back(builder.build());
|
||||
topology_change change{std::move(muts)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait until we commit a new CDC generation.
|
||||
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
|
||||
if (request_id) {
|
||||
// Wait until request completes
|
||||
auto error = co_await wait_for_topology_request_completion(request_id);
|
||||
|
||||
if (!error.empty()) {
|
||||
auto err = fmt::format("Check and repair cdc stream failed. See earlier errors ({}). Request ID: {}", error, request_id);
|
||||
rtlogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
|
||||
? std::nullopt
|
||||
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
|
||||
return last_committed_gen != gen;
|
||||
});
|
||||
|
||||
if (last_committed_gen == gen) {
|
||||
on_internal_error(rtlogger, "Wrong generation after complation of check and repair cdc stream");
|
||||
}
|
||||
} else {
|
||||
// Wait until we commit a new CDC generation.
|
||||
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
|
||||
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
|
||||
? std::nullopt
|
||||
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
|
||||
return last_committed_gen != gen;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::rebuild(utils::optional_param source_dc) {
|
||||
|
||||
@@ -874,7 +874,20 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// Precondition: there is no node request and no ongoing topology transition
|
||||
// (checked under the guard we're holding).
|
||||
future<> handle_global_request(group0_guard guard) {
|
||||
switch (_topo_sm._topology.global_request.value()) {
|
||||
global_topology_request req;
|
||||
utils::UUID req_id;
|
||||
db::system_keyspace::topology_requests_entry req_entry;
|
||||
|
||||
if (_topo_sm._topology.global_request) {
|
||||
req = *_topo_sm._topology.global_request;
|
||||
req_id = _topo_sm._topology.global_request_id.value();
|
||||
} else {
|
||||
assert(_feature_service.topology_global_request_queue);
|
||||
req_id = _topo_sm._topology.global_requests_queue[0];
|
||||
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
|
||||
req = std::get<global_topology_request>(req_entry.request_type);
|
||||
}
|
||||
switch (req) {
|
||||
case global_topology_request::new_cdc_generation: {
|
||||
rtlogger.info("new CDC generation requested");
|
||||
|
||||
@@ -889,24 +902,33 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// after the generation is committed prevents this from happening. The second request would have
|
||||
// no effect - it would just overwrite the first request.
|
||||
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
|
||||
.set_new_cdc_generation_data_uuid(gen_uuid);
|
||||
.set_new_cdc_generation_data_uuid(gen_uuid)
|
||||
.set_global_topology_request(req)
|
||||
.set_global_topology_request_id(req_id)
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
|
||||
auto reason = ::format(
|
||||
"insert CDC generation data (UUID: {})", gen_uuid);
|
||||
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
|
||||
}
|
||||
break;
|
||||
case global_topology_request::cleanup:
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(guard), true);
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(guard), req_id);
|
||||
break;
|
||||
case global_topology_request::keyspace_rf_change: {
|
||||
rtlogger.info("keyspace_rf_change requested");
|
||||
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
|
||||
std::unordered_map<sstring, sstring> saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
|
||||
sstring ks_name;
|
||||
std::unordered_map<sstring, sstring> saved_ks_props;
|
||||
if (_topo_sm._topology.new_keyspace_rf_change_ks_name) {
|
||||
ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
|
||||
saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
|
||||
} else {
|
||||
ks_name = *req_entry.new_keyspace_rf_change_ks_name;
|
||||
saved_ks_props = *req_entry.new_keyspace_rf_change_data;
|
||||
}
|
||||
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
|
||||
|
||||
auto repl_opts = new_ks_props.get_replication_options();
|
||||
repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
utils::UUID req_uuid = *_topo_sm._topology.global_request_id;
|
||||
std::vector<canonical_mutation> updates;
|
||||
sstring error;
|
||||
if (_db.has_keyspace(ks_name)) {
|
||||
@@ -954,8 +976,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid)
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.done(error)
|
||||
.build()));
|
||||
if (error.empty()) {
|
||||
@@ -979,12 +1002,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case global_topology_request::truncate_table: {
|
||||
rtlogger.info("TRUNCATE TABLE requested");
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::truncate_table)
|
||||
.set_session(session_id(_topo_sm._topology.global_request_id.value()))
|
||||
.build());
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "TRUNCATE TABLE requested");
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.set_transition_state(topology::transition_state::truncate_table)
|
||||
.set_global_topology_request(req)
|
||||
.set_global_topology_request_id(req_id)
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.set_session(session_id(req_id));
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -1841,8 +1865,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_global_topology_request_id()
|
||||
.build());
|
||||
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
.set("end_time", db_clock::now())
|
||||
.set("done", true)
|
||||
.done()
|
||||
.build());
|
||||
|
||||
try {
|
||||
@@ -1874,7 +1897,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
guard = std::get<group0_guard>(std::move(work));
|
||||
|
||||
if (_topo_sm._topology.global_request) {
|
||||
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
|
||||
return std::make_pair(true, std::move(guard));
|
||||
}
|
||||
|
||||
@@ -1906,17 +1929,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto node_builder = builder.with_node(id).del("topology_request");
|
||||
auto done_msg = fmt::format("Canceled. Dead nodes: {}", dead_nodes);
|
||||
rtbuilder.done(done_msg);
|
||||
if (_topo_sm._topology.global_request_id) {
|
||||
try {
|
||||
utils::UUID uuid = utils::UUID{*_topo_sm._topology.global_request_id};
|
||||
topology_request_tracking_mutation_builder rt_global_req_builder{uuid};
|
||||
rt_global_req_builder.done(done_msg)
|
||||
.set("end_time", db_clock::now());
|
||||
muts.emplace_back(rt_global_req_builder.build());
|
||||
} catch (...) {
|
||||
rtlogger.warn("failed to cancel topology global request: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
switch (req) {
|
||||
case topology_request::replace:
|
||||
[[fallthrough]];
|
||||
@@ -1978,13 +1990,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), false);
|
||||
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), utils::UUID{});
|
||||
co_return true;
|
||||
}
|
||||
|
||||
guard = std::get<group0_guard>(std::move(work));
|
||||
|
||||
if (_topo_sm._topology.global_request) {
|
||||
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
|
||||
co_await handle_global_request(std::move(guard));
|
||||
co_return true;
|
||||
}
|
||||
@@ -2156,6 +2168,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// Note: if there was a replace or removenode going on, we'd need to put the replaced/removed
|
||||
// node into `exclude_nodes` parameter in `exec_global_command`, but CDC generations are never
|
||||
// introduced during replace/remove.
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
|
||||
builder.del_global_topology_request();
|
||||
builder.del_global_topology_request_id();
|
||||
builder.del_transition_state();
|
||||
}
|
||||
|
||||
try {
|
||||
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier, {_raft.id()});
|
||||
} catch (term_changed_error&) {
|
||||
@@ -2168,6 +2187,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.error("transition_state::commit_cdc_generation, "
|
||||
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
|
||||
_rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception());
|
||||
}
|
||||
|
||||
if (_rollback) {
|
||||
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation && _feature_service.topology_global_request_queue) {
|
||||
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
|
||||
// if this is global command fail it since there is nothing to rollback
|
||||
rtbuilder.done(*_rollback);
|
||||
_rollback.reset();
|
||||
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "committed new CDC generation command failed");
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2221,18 +2250,22 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// in the middle of a CDC generation switch (when they are prepared to switch but not
|
||||
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
|
||||
// majority and commit.
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
std::vector<canonical_mutation> updates;
|
||||
builder.add_new_committed_cdc_generation(cdc_gen_id);
|
||||
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
|
||||
builder.del_global_topology_request();
|
||||
builder.del_transition_state();
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
|
||||
rtbuilder.done();
|
||||
updates.push_back(rtbuilder.build());
|
||||
}
|
||||
} else {
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_old);
|
||||
builder.set_session(session_id(guard.new_group0_state_id()));
|
||||
builder.set_version(_topo_sm._topology.version + 1);
|
||||
}
|
||||
updates.push_back(builder.build());
|
||||
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), std::move(str));
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::tablet_draining:
|
||||
@@ -2795,7 +2828,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
"Topology coordinator is called for node {} in state 'left'", node.id));
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
std::variant<join_node_response_params::accepted, join_node_response_params::rejected>
|
||||
validate_joining_node(const node_to_work_on& node) {
|
||||
@@ -2889,14 +2922,21 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
return muts;
|
||||
}
|
||||
|
||||
future<> start_cleanup_on_dirty_nodes(group0_guard guard, bool global_request) {
|
||||
future<> start_cleanup_on_dirty_nodes(group0_guard guard, utils::UUID global_request_id) {
|
||||
auto& topo = _topo_sm._topology;
|
||||
std::vector<canonical_mutation> muts;
|
||||
muts.reserve(topo.normal_nodes.size() + size_t(global_request));
|
||||
muts.reserve(topo.normal_nodes.size() + size_t(bool(global_request_id)));
|
||||
|
||||
if (global_request) {
|
||||
if (global_request_id) {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.del_global_topology_request();
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
topology_request_tracking_mutation_builder rtbuilder(global_request_id);
|
||||
builder.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, global_request_id);
|
||||
rtbuilder.done();
|
||||
muts.emplace_back(rtbuilder.build());
|
||||
}
|
||||
muts.emplace_back(builder.build());
|
||||
}
|
||||
for (auto& [id, rs] : topo.normal_nodes) {
|
||||
|
||||
@@ -91,6 +91,11 @@ Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology
|
||||
return apply_atomic(cell, sstring{::format("{}", value)});
|
||||
}
|
||||
|
||||
template<typename Builder>
|
||||
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, global_topology_request value) {
|
||||
return apply_atomic(cell, sstring{::format("{}", value)});
|
||||
}
|
||||
|
||||
template<typename Builder>
|
||||
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
|
||||
return apply_atomic(cell, value);
|
||||
@@ -239,6 +244,18 @@ topology_mutation_builder& topology_mutation_builder::set_global_topology_reques
|
||||
return apply_atomic("global_topology_request_id", value);
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::queue_global_topology_request_id(const utils::UUID& value) {
|
||||
return apply_set("global_requests", collection_apply_mode::update, std::vector<data_value>{value});
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector<utils::UUID>& values, utils::UUID& id) {
|
||||
if (!values.empty() && values[0] == id) {
|
||||
return apply_set("global_requests", collection_apply_mode::overwrite, std::span(values.begin() + 1, values.size() - 1));
|
||||
} else {
|
||||
return *this;
|
||||
}
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_upgrade_state(topology::upgrade_state_type value) {
|
||||
return apply_atomic("upgrade_state", ::format("{}", value));
|
||||
}
|
||||
@@ -305,6 +322,10 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
|
||||
return _set_type ? builder_base::set(cell, value) : *this;
|
||||
}
|
||||
|
||||
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set(const char* cell, global_topology_request value) {
|
||||
return _set_type ? builder_base::set(cell, value) : *this;
|
||||
}
|
||||
|
||||
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
|
||||
set("end_time", db_clock::now());
|
||||
if (error) {
|
||||
@@ -318,6 +339,16 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
|
||||
return *this;
|
||||
}
|
||||
|
||||
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_new_keyspace_rf_change_data(
|
||||
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
|
||||
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
|
||||
apply_atomic("new_keyspace_rf_change_data",
|
||||
make_map_value(schema().get_column_definition("new_keyspace_rf_change_data")->type,
|
||||
map_type_impl::native_type(rf_per_dc.begin(), rf_per_dc.end())));
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
template class topology_mutation_builder_base<topology_mutation_builder>;
|
||||
template class topology_mutation_builder_base<topology_node_mutation_builder>;
|
||||
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;
|
||||
|
||||
@@ -46,6 +46,7 @@ protected:
|
||||
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
|
||||
Builder& set(const char* cell, node_state value);
|
||||
Builder& set(const char* cell, topology_request value);
|
||||
Builder& set(const char* cell, global_topology_request value);
|
||||
Builder& set(const char* cell, const sstring& value);
|
||||
Builder& set(const char* cell, const raft::server_id& value);
|
||||
Builder& set(const char* cell, const uint32_t& value);
|
||||
@@ -126,6 +127,8 @@ public:
|
||||
topology_mutation_builder& del_session();
|
||||
topology_mutation_builder& del_global_topology_request();
|
||||
topology_mutation_builder& del_global_topology_request_id();
|
||||
topology_mutation_builder& queue_global_topology_request_id(const utils::UUID& value);
|
||||
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, utils::UUID&);
|
||||
topology_node_mutation_builder& with_node(raft::server_id);
|
||||
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
||||
};
|
||||
@@ -149,8 +152,11 @@ public:
|
||||
using builder_base::set;
|
||||
using builder_base::del;
|
||||
topology_request_tracking_mutation_builder& set(const char* cell, topology_request value);
|
||||
topology_request_tracking_mutation_builder& set(const char* cell, global_topology_request value);
|
||||
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
|
||||
topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id);
|
||||
topology_request_tracking_mutation_builder& set_new_keyspace_rf_change_data(const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc);
|
||||
|
||||
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
||||
};
|
||||
|
||||
|
||||
@@ -192,12 +192,20 @@ static std::unordered_map<topology_request, sstring> topology_request_to_name_ma
|
||||
{topology_request::rebuild, "rebuild"}
|
||||
};
|
||||
|
||||
topology_request topology_request_from_string(const sstring& s) {
|
||||
std::optional<topology_request> try_topology_request_from_string(const sstring& s) {
|
||||
for (auto&& e : topology_request_to_name_map) {
|
||||
if (e.second == s) {
|
||||
return e.first;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
topology_request topology_request_from_string(const sstring& s) {
|
||||
auto r = try_topology_request_from_string(s);
|
||||
if (r) {
|
||||
return *r;
|
||||
}
|
||||
throw std::runtime_error(fmt::format("cannot map name {} to topology_request", s));
|
||||
}
|
||||
|
||||
|
||||
@@ -163,6 +163,9 @@ struct topology {
|
||||
// Pending global topology request's id, which is a new group0's state id
|
||||
std::optional<utils::UUID> global_request_id;
|
||||
|
||||
// A queue of pending global topology request's ids. Replaces the single one above
|
||||
std::vector<utils::UUID> global_requests_queue;
|
||||
|
||||
// The IDs of the committed CDC generations sorted by timestamps.
|
||||
// The obsolete generations may not be in this list as they are continually deleted.
|
||||
std::vector<cdc::generation_id_v2> committed_cdc_generations;
|
||||
@@ -283,6 +286,7 @@ struct topology_request_state {
|
||||
|
||||
topology::transition_state transition_state_from_string(const sstring& s);
|
||||
node_state node_state_from_string(const sstring& s);
|
||||
std::optional<topology_request> try_topology_request_from_string(const sstring& s);
|
||||
topology_request topology_request_from_string(const sstring& s);
|
||||
global_topology_request global_topology_request_from_string(const sstring&);
|
||||
cleanup_status cleanup_status_from_string(const sstring& s);
|
||||
|
||||
@@ -254,9 +254,6 @@ async def test_truncate_while_truncate_already_waiting(manager: ManagerClient):
|
||||
# Run another truncate on the same table while the timedout one is still waiting
|
||||
truncate_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=hosts[1])
|
||||
|
||||
# Make sure the second truncate re-used the existing global topology request
|
||||
await s1_log.wait_for(f'Ongoing TRUNCATE for table {ks}.test')
|
||||
|
||||
# Release streaming
|
||||
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
|
||||
|
||||
@@ -302,3 +299,51 @@ async def test_replay_position_check_during_truncate(manager):
|
||||
await manager.api.message_injection(server.ip_addr, "database_truncate_wait")
|
||||
await s1_log.wait_for(f"database_truncate_wait: message received", from_mark=s1_mark)
|
||||
await truncate_task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_parallel_truncate(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
|
||||
'error_injections_at_startup': ['migration_streaming_wait']
|
||||
}
|
||||
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
# Create a keyspace with tablets and initial_tablets == 2, then insert data
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
||||
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
|
||||
await cql.run_async(f'CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, c int);')
|
||||
|
||||
keys = range(1024)
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test1 (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
|
||||
# Add a node to the cluster. This will cause the load balancer to migrate one tablet to the new node
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
s1_log = await manager.server_open_log(servers[1].server_id)
|
||||
|
||||
# Wait for tablet streaming to start
|
||||
await s1_log.wait_for('migration_streaming_wait: start')
|
||||
|
||||
tf1 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test', retry_policy=FallthroughRetryPolicy()))
|
||||
tf2 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test1', retry_policy=FallthroughRetryPolicy()))
|
||||
|
||||
# Release streaming
|
||||
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
|
||||
|
||||
# Wait for the joined truncate to complete
|
||||
await tf1
|
||||
await tf2
|
||||
|
||||
# Check if we have any data
|
||||
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
|
||||
assert row[0].count == 0
|
||||
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test1', consistency_level=ConsistencyLevel.ALL))
|
||||
assert row[0].count == 0
|
||||
|
||||
Reference in New Issue
Block a user