Merge 'Introduce a queue of global topology requests.' from Gleb Natapov

Currently only one global topology request (such as truncate, cdc repair, cleanup and alter table) can be pending. If one is already pending others will be rejected with an error. This is not very user friendly, so this series introduces a queue of global requests which allows queuing many global topology requests simultaneously.

Fixes: #16822

No need to backport since this is a new feature.

Closes scylladb/scylladb#24293

* https://github.com/scylladb/scylladb:
  topology coordinator: simplify truncate handling in case request queue feature is disable
  topology coordinator: fix indentation after the previous patch
  topology coordinator: allow running multiple global commands in parallel
  topology coordinator: Implement global topology request queue
  topology coordinator: Do not cancel global requests in cancel_all_requests
  topology coordinator: store request type for each global command
  topology request: make it possible to hold global request types in request_type field
  topology coordinator: move alter table global request parameters into topology_request table
  topology coordinator: move cleanup global command to report completion through topology_request table
  topology coordinator: no need to create updates vector explicitly
  topology coordinator: use topology_request_tracking_mutation_builder::done() instead of open code it
  topology coordinator: handle error during new_cdc_generation command processing
  topology coordinator: remove unneeded semicolon
  topology coordinator: fix indentation after the last commit
  topology coordinator: move new_cdc_generation topology request to use topology_request table for completion
  gms/feature_service: add TOPOLOGY_GLOBAL_REQUEST_QUEUE feature flag
This commit is contained in:
Patryk Jędrzejczak
2025-06-23 12:58:48 +02:00
committed by Avi Kivity
14 changed files with 376 additions and 113 deletions

View File

@@ -12,8 +12,10 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/on_internal_error.hh>
#include <stdexcept>
#include <vector>
#include "alter_keyspace_statement.hh"
#include "locator/tablets.hh"
#include "mutation/canonical_mutation.hh"
#include "prepared_statement.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
@@ -232,7 +234,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
// when in reality nothing or only schema is being changed
if (changes_tablets(qp)) {
if (!qp.topology_global_queue_empty()) {
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
@@ -243,9 +245,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
qp.db().real_database().validate_keyspace_update(*ks_md_update);
service::topology_mutation_builder builder(ts);
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);
builder.set_new_keyspace_rf_change_data(_name, ks_options);
if (!qp.proxy().features().topology_global_request_queue) {
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);
builder.set_new_keyspace_rf_change_data(_name, ks_options);
} else {
builder.queue_global_topology_request_id(global_request_id);
};
service::topology_change change{{builder.build()}};
auto topo_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
@@ -253,9 +259,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
return cm.to_mutation(topo_schema);
});
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id};
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
.set("start_time", db_clock::now())
.set("request_type", service::global_topology_request::keyspace_rf_change);
if (qp.proxy().features().topology_global_request_queue) {
rtbuilder.set_new_keyspace_rf_change_data(_name, ks_options);
}
service::topology_change req_change{{rtbuilder.build()}};
auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);

View File

@@ -260,8 +260,8 @@ schema_ptr system_keyspace::topology() {
.with_column("request_id", timeuuid_type)
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column) // deprecated
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column) // deprecated
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
@@ -273,6 +273,7 @@ schema_ptr system_keyspace::topology() {
.with_column("session", uuid_type, column_kind::static_column)
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
.with_column("upgrade_state", utf8_type, column_kind::static_column)
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.set_comment("Current state of topology change machine")
.with_hash_version()
.build();
@@ -292,6 +293,8 @@ schema_ptr system_keyspace::topology_requests() {
.with_column("error", utf8_type)
.with_column("end_time", timestamp_type)
.with_column("truncate_table_id", uuid_type)
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
.set_comment("Topology request tracking")
.with_hash_version()
.build();
@@ -3294,6 +3297,12 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
}
if (some_row.has("global_requests")) {
for (auto&& v : deserialize_set_column(*topology(), some_row, "global_requests")) {
ret.global_requests_queue.push_back(value_cast<utils::UUID>(v));
}
}
if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}
@@ -3469,7 +3478,13 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
entry.initiating_host = row.get_as<utils::UUID>("initiating_host");
}
if (row.has("request_type")) {
entry.request_type = service::topology_request_from_string(row.get_as<sstring>("request_type"));
auto rts = row.get_as<sstring>("request_type");
auto rt = service::try_topology_request_from_string(rts);
if (rt) {
entry.request_type = *rt;
} else {
entry.request_type = service::global_topology_request_from_string(rts);
}
}
if (row.has("start_time")) {
entry.start_time = row.get_as<db_clock::time_point>("start_time");
@@ -3486,6 +3501,11 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
if (row.has("truncate_table_id")) {
entry.truncate_table_id = table_id(row.get_as<utils::UUID>("truncate_table_id"));
}
if (row.has("new_keyspace_rf_change_data")) {
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}
return entry;
}

View File

@@ -395,13 +395,17 @@ public:
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
std::optional<service::topology_request> request_type;
std::variant<std::monostate, service::topology_request, service::global_topology_request> request_type;
db_clock::time_point start_time;
bool done;
sstring error;
db_clock::time_point end_time;
db_clock::time_point ts;
table_id truncate_table_id;
// The name of the KS that is being the target of the scheduled ALTER KS statement
std::optional<sstring> new_keyspace_rf_change_ks_name;
// The KS options to be used when executing the scheduled ALTER KS statement
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
};
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;

View File

@@ -668,6 +668,7 @@ CREATE TABLE system.topology (
new_cdc_generation_data_uuid timeuuid static,
new_keyspace_rf_change_ks_name text static,
new_keyspace_rf_change_data frozen<map<text, text>> static,
global_requests set<timeuuid_type> static,
PRIMARY KEY (key, host_id)
)
```
@@ -701,6 +702,8 @@ There are also a few static columns for cluster-global properties:
- `upgrade_state` - describes the progress of the upgrade to raft-based topology.
- `new_keyspace_rf_change_ks_name` - the name of the KS that is being the target of the scheduled ALTER KS statement
- `new_keyspace_rf_change_data` - the KS options to be used when executing the scheduled ALTER KS statement
- `global_requests` - contains a list of ids of pending global requests, the information about requests (type and parameters)
can be obtained from topology_requests table by using request's id as a look up key.
# Join procedure

View File

@@ -171,6 +171,7 @@ public:
gms::feature repair_based_tablet_rebuild { *this, "REPAIR_BASED_TABLET_REBUILD"sv };
gms::feature enforced_raft_rpc_scheduling_group { *this, "ENFORCED_RAFT_RPC_SCHEDULING_GROUP"sv };
gms::feature load_and_stream_abort_rpc_message { *this, "LOAD_AND_STREAM_ABORT_RPC_MESSAGE"sv };
gms::feature topology_global_request_queue { *this, "TOPOLOGY_GLOBAL_REQUEST_QUEUE"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -14,24 +14,34 @@
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include <variant>
#include "utils/overloaded_functor.hh"
using namespace std::chrono_literals;
namespace node_ops {
static sstring request_type_to_task_type(const std::optional<service::topology_request>& request_type) {
return request_type.transform([] (auto type) -> sstring {
switch (type) {
case service::topology_request::join:
return "bootstrap";
case service::topology_request::remove:
return "remove node";
case service::topology_request::leave:
return "decommission";
default:
return fmt::to_string(type);
static sstring request_type_to_task_type(const std::variant<std::monostate, service::topology_request, service::global_topology_request>& request_type) {
return std::visit(overloaded_functor {
[] (const service::topology_request& type) -> sstring {
switch (type) {
case service::topology_request::join:
return "bootstrap";
case service::topology_request::remove:
return "remove node";
case service::topology_request::leave:
return "decommission";
default:
return fmt::to_string(type);
}
},
[] (const service::global_topology_request type) -> sstring {
return fmt::to_string(type);
},
[] (const std::monostate type) -> sstring {
return "";
}
}).value_or("");
}, request_type);
}
static tasks::task_manager::task_state get_state(const db::system_keyspace::topology_requests_entry& entry) {
@@ -114,7 +124,7 @@ future<std::optional<tasks::virtual_task_hint>> node_ops_virtual_task::contains(
}
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry(task_id.uuid(), false);
co_return bool(entry.id) && entry.request_type ? empty_hint : std::nullopt;
co_return bool(entry.id) && std::holds_alternative<service::topology_request>(entry.request_type) ? empty_hint : std::nullopt;
}
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint) const {

View File

@@ -23,6 +23,7 @@
#include "db/consistency_level.hh"
#include "db/commitlog/commitlog.hh"
#include "storage_proxy.hh"
#include "service/topology_state_machine.hh"
#include "unimplemented.hh"
#include "mutation/mutation.hh"
#include "mutation/frozen_mutation.hh"
@@ -1070,44 +1071,51 @@ private:
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
// only for a truncate which is still waiting.
std::optional<global_topology_request>& global_request = _topology_state_machine._topology.global_request;
if (global_request.has_value()) {
if (*global_request == global_topology_request::truncate_table) {
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
const utils::UUID& ongoing_global_request_id = *_topology_state_machine._topology.global_request_id;
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
if (topology_requests_entry.truncate_table_id == table_id) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
ks_name, cf_name, global_request_id);
break;
if (!_sp._features.topology_global_request_queue) {
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
// only for a truncate which is still waiting.
if (_topology_state_machine._topology.global_request) {
utils::UUID ongoing_global_request_id = _topology_state_machine._topology.global_request_id.value();
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
auto global_request = std::get<service::global_topology_request>(topology_requests_entry.request_type);
if (global_request == global_topology_request::truncate_table) {
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
if (topology_requests_entry.truncate_table_id == table_id) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
ks_name, cf_name, global_request_id);
break;
}
}
}
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
global_request, ks_name, cf_name);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
ks_name, cf_name));
}
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
*global_request, ks_name, cf_name);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
ks_name, cf_name));
}
global_request_id = guard.new_group0_state_id();
std::vector<canonical_mutation> updates;
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
.set_global_topology_request(global_topology_request::truncate_table)
.set_global_topology_request_id(global_request_id)
.build());
topology_mutation_builder builder(guard.write_timestamp());
if (!_sp._features.topology_global_request_queue) {
builder.set_global_topology_request(global_topology_request::truncate_table)
.set_global_topology_request_id(global_request_id);
} else {
builder.queue_global_topology_request_id(global_request_id);
}
updates.emplace_back(builder.build());
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id)
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id, _sp._features.topology_requests_type_column)
.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::truncate_table)
.build());
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);

View File

@@ -4728,13 +4728,13 @@ future<> storage_service::do_drain() {
future<> storage_service::do_cluster_cleanup() {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::cleanup) {
// FIXME: replace this with a queue
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::cleanup) {
throw std::runtime_error{
"topology coordinator: cluster cleanup: a different topology request is already pending, try again later"};
}
@@ -4753,8 +4753,20 @@ future<> storage_service::do_cluster_cleanup() {
rtlogger.info("cluster cleanup requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::cleanup);
topology_change change{{builder.build()}};
std::vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
builder.queue_global_topology_request_id(request_id);
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::cleanup);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::cleanup);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
try {
@@ -4766,7 +4778,18 @@ future<> storage_service::do_cluster_cleanup() {
break;
}
// Wait cleanup finishes on all nodes
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Cleanup failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
}
// The wait above only wait until the comand is processed by the topology coordinator which start cleanup process,
// but we still need to wait for cleanup to complete here.
co_await _topology_state_machine.event.when([this] {
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
return n.second.cleanup == cleanup_status::clean;
@@ -4979,15 +5002,30 @@ future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
future<> storage_service::raft_check_and_repair_cdc_streams() {
std::optional<cdc::generation_id_v2> last_committed_gen;
utils::UUID request_id;
while (true) {
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
std::optional<global_topology_request> curr_req;
if (_topology_state_machine._topology.global_request) {
curr_req = *_topology_state_machine._topology.global_request;
request_id = _topology_state_machine._topology.global_request_id.value();
} else if (!_topology_state_machine._topology.global_requests_queue.empty()) {
request_id = _topology_state_machine._topology.global_requests_queue[0];
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id, true);
curr_req = std::get<global_topology_request>(req_entry.request_type);
} else {
request_id = utils::UUID{};
}
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
// FIXME: replace this with a queue
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
if (!_feature_service.topology_global_request_queue) {
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
} else {
request_id = utils::UUID{};
}
}
if (_topology_state_machine._topology.committed_cdc_generations.empty()) {
@@ -5002,27 +5040,62 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
cdc_log.info("CDC generation {} needs repair, requesting a new one", last_committed_gen);
}
topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
continue;
// With global request queue coalescing requests should not be needed, but test_cdc_generation_publishing assumes that multiple new_cdc_generation
// commands will be coalesced here, so do that until the test is fixed.
if (!request_id) {
topology_mutation_builder builder(guard.write_timestamp());
std::vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
builder.queue_global_topology_request_id(request_id);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::new_cdc_generation);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
continue;
}
}
break;
}
// Wait until we commit a new CDC generation.
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Check and repair cdc stream failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
return last_committed_gen != gen;
});
if (last_committed_gen == gen) {
on_internal_error(rtlogger, "Wrong generation after complation of check and repair cdc stream");
}
} else {
// Wait until we commit a new CDC generation.
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
return last_committed_gen != gen;
});
}
}
future<> storage_service::rebuild(utils::optional_param source_dc) {

View File

@@ -874,7 +874,20 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// Precondition: there is no node request and no ongoing topology transition
// (checked under the guard we're holding).
future<> handle_global_request(group0_guard guard) {
switch (_topo_sm._topology.global_request.value()) {
global_topology_request req;
utils::UUID req_id;
db::system_keyspace::topology_requests_entry req_entry;
if (_topo_sm._topology.global_request) {
req = *_topo_sm._topology.global_request;
req_id = _topo_sm._topology.global_request_id.value();
} else {
assert(_feature_service.topology_global_request_queue);
req_id = _topo_sm._topology.global_requests_queue[0];
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
req = std::get<global_topology_request>(req_entry.request_type);
}
switch (req) {
case global_topology_request::new_cdc_generation: {
rtlogger.info("new CDC generation requested");
@@ -889,24 +902,33 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// after the generation is committed prevents this from happening. The second request would have
// no effect - it would just overwrite the first request.
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid);
.set_new_cdc_generation_data_uuid(gen_uuid)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
}
break;
case global_topology_request::cleanup:
co_await start_cleanup_on_dirty_nodes(std::move(guard), true);
co_await start_cleanup_on_dirty_nodes(std::move(guard), req_id);
break;
case global_topology_request::keyspace_rf_change: {
rtlogger.info("keyspace_rf_change requested");
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
std::unordered_map<sstring, sstring> saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
sstring ks_name;
std::unordered_map<sstring, sstring> saved_ks_props;
if (_topo_sm._topology.new_keyspace_rf_change_ks_name) {
ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
} else {
ks_name = *req_entry.new_keyspace_rf_change_ks_name;
saved_ks_props = *req_entry.new_keyspace_rf_change_data;
}
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
auto repl_opts = new_ks_props.get_replication_options();
repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
utils::UUID req_uuid = *_topo_sm._topology.global_request_id;
std::vector<canonical_mutation> updates;
sstring error;
if (_db.has_keyspace(ks_name)) {
@@ -954,8 +976,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_version(_topo_sm._topology.version + 1)
.del_global_topology_request()
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid)
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.done(error)
.build()));
if (error.empty()) {
@@ -979,12 +1002,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
case global_topology_request::truncate_table: {
rtlogger.info("TRUNCATE TABLE requested");
std::vector<canonical_mutation> updates;
updates.push_back(topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::truncate_table)
.set_session(session_id(_topo_sm._topology.global_request_id.value()))
.build());
co_await update_topology_state(std::move(guard), std::move(updates), "TRUNCATE TABLE requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::truncate_table)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.set_session(session_id(req_id));
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
}
break;
}
@@ -1841,8 +1865,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_global_topology_request_id()
.build());
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
.set("end_time", db_clock::now())
.set("done", true)
.done()
.build());
try {
@@ -1874,7 +1897,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request) {
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
return std::make_pair(true, std::move(guard));
}
@@ -1906,17 +1929,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto node_builder = builder.with_node(id).del("topology_request");
auto done_msg = fmt::format("Canceled. Dead nodes: {}", dead_nodes);
rtbuilder.done(done_msg);
if (_topo_sm._topology.global_request_id) {
try {
utils::UUID uuid = utils::UUID{*_topo_sm._topology.global_request_id};
topology_request_tracking_mutation_builder rt_global_req_builder{uuid};
rt_global_req_builder.done(done_msg)
.set("end_time", db_clock::now());
muts.emplace_back(rt_global_req_builder.build());
} catch (...) {
rtlogger.warn("failed to cancel topology global request: {}", std::current_exception());
}
}
switch (req) {
case topology_request::replace:
[[fallthrough]];
@@ -1978,13 +1990,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), false);
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard), utils::UUID{});
co_return true;
}
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request) {
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
co_await handle_global_request(std::move(guard));
co_return true;
}
@@ -2156,6 +2168,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// Note: if there was a replace or removenode going on, we'd need to put the replaced/removed
// node into `exclude_nodes` parameter in `exec_global_command`, but CDC generations are never
// introduced during replace/remove.
topology_mutation_builder builder(guard.write_timestamp());
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
builder.del_global_topology_request();
builder.del_global_topology_request_id();
builder.del_transition_state();
}
try {
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier, {_raft.id()});
} catch (term_changed_error&) {
@@ -2168,6 +2187,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.error("transition_state::commit_cdc_generation, "
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
_rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception());
}
if (_rollback) {
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation && _feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
// if this is global command fail it since there is nothing to rollback
rtbuilder.done(*_rollback);
_rollback.reset();
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "committed new CDC generation command failed");
}
break;
}
@@ -2221,18 +2250,22 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// in the middle of a CDC generation switch (when they are prepared to switch but not
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
// majority and commit.
topology_mutation_builder builder(guard.write_timestamp());
std::vector<canonical_mutation> updates;
builder.add_new_committed_cdc_generation(cdc_gen_id);
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
builder.del_global_topology_request();
builder.del_transition_state();
if (_feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
rtbuilder.done();
updates.push_back(rtbuilder.build());
}
} else {
builder.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_session(session_id(guard.new_group0_state_id()));
builder.set_version(_topo_sm._topology.version + 1);
}
updates.push_back(builder.build());
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
co_await update_topology_state(std::move(guard), std::move(updates), std::move(str));
}
break;
case topology::transition_state::tablet_draining:
@@ -2795,7 +2828,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
"Topology coordinator is called for node {} in state 'left'", node.id));
break;
}
};
}
std::variant<join_node_response_params::accepted, join_node_response_params::rejected>
validate_joining_node(const node_to_work_on& node) {
@@ -2889,14 +2922,21 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return muts;
}
future<> start_cleanup_on_dirty_nodes(group0_guard guard, bool global_request) {
future<> start_cleanup_on_dirty_nodes(group0_guard guard, utils::UUID global_request_id) {
auto& topo = _topo_sm._topology;
std::vector<canonical_mutation> muts;
muts.reserve(topo.normal_nodes.size() + size_t(global_request));
muts.reserve(topo.normal_nodes.size() + size_t(bool(global_request_id)));
if (global_request) {
if (global_request_id) {
topology_mutation_builder builder(guard.write_timestamp());
builder.del_global_topology_request();
if (_feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(global_request_id);
builder.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, global_request_id);
rtbuilder.done();
muts.emplace_back(rtbuilder.build());
}
muts.emplace_back(builder.build());
}
for (auto& [id, rs] : topo.normal_nodes) {

View File

@@ -91,6 +91,11 @@ Builder& topology_mutation_builder_base<Builder>::set(const char* cell, topology
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, global_topology_request value) {
return apply_atomic(cell, sstring{::format("{}", value)});
}
template<typename Builder>
Builder& topology_mutation_builder_base<Builder>::set(const char* cell, const sstring& value) {
return apply_atomic(cell, value);
@@ -239,6 +244,18 @@ topology_mutation_builder& topology_mutation_builder::set_global_topology_reques
return apply_atomic("global_topology_request_id", value);
}
topology_mutation_builder& topology_mutation_builder::queue_global_topology_request_id(const utils::UUID& value) {
return apply_set("global_requests", collection_apply_mode::update, std::vector<data_value>{value});
}
topology_mutation_builder& topology_mutation_builder::drop_first_global_topology_request_id(const std::vector<utils::UUID>& values, utils::UUID& id) {
if (!values.empty() && values[0] == id) {
return apply_set("global_requests", collection_apply_mode::overwrite, std::span(values.begin() + 1, values.size() - 1));
} else {
return *this;
}
}
topology_mutation_builder& topology_mutation_builder::set_upgrade_state(topology::upgrade_state_type value) {
return apply_atomic("upgrade_state", ::format("{}", value));
}
@@ -305,6 +322,10 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
return _set_type ? builder_base::set(cell, value) : *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set(const char* cell, global_topology_request value) {
return _set_type ? builder_base::set(cell, value) : *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::done(std::optional<sstring> error) {
set("end_time", db_clock::now());
if (error) {
@@ -318,6 +339,16 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
return *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_new_keyspace_rf_change_data(
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
apply_atomic("new_keyspace_rf_change_data",
make_map_value(schema().get_column_definition("new_keyspace_rf_change_data")->type,
map_type_impl::native_type(rf_per_dc.begin(), rf_per_dc.end())));
return *this;
}
template class topology_mutation_builder_base<topology_mutation_builder>;
template class topology_mutation_builder_base<topology_node_mutation_builder>;
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;

View File

@@ -46,6 +46,7 @@ protected:
Builder& apply_set(const char* cell, collection_apply_mode apply_mode, const C& c);
Builder& set(const char* cell, node_state value);
Builder& set(const char* cell, topology_request value);
Builder& set(const char* cell, global_topology_request value);
Builder& set(const char* cell, const sstring& value);
Builder& set(const char* cell, const raft::server_id& value);
Builder& set(const char* cell, const uint32_t& value);
@@ -126,6 +127,8 @@ public:
topology_mutation_builder& del_session();
topology_mutation_builder& del_global_topology_request();
topology_mutation_builder& del_global_topology_request_id();
topology_mutation_builder& queue_global_topology_request_id(const utils::UUID& value);
topology_mutation_builder& drop_first_global_topology_request_id(const std::vector<utils::UUID>&, utils::UUID&);
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
@@ -149,8 +152,11 @@ public:
using builder_base::set;
using builder_base::del;
topology_request_tracking_mutation_builder& set(const char* cell, topology_request value);
topology_request_tracking_mutation_builder& set(const char* cell, global_topology_request value);
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id);
topology_request_tracking_mutation_builder& set_new_keyspace_rf_change_data(const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};

View File

@@ -192,12 +192,20 @@ static std::unordered_map<topology_request, sstring> topology_request_to_name_ma
{topology_request::rebuild, "rebuild"}
};
topology_request topology_request_from_string(const sstring& s) {
std::optional<topology_request> try_topology_request_from_string(const sstring& s) {
for (auto&& e : topology_request_to_name_map) {
if (e.second == s) {
return e.first;
}
}
return std::nullopt;
}
topology_request topology_request_from_string(const sstring& s) {
auto r = try_topology_request_from_string(s);
if (r) {
return *r;
}
throw std::runtime_error(fmt::format("cannot map name {} to topology_request", s));
}

View File

@@ -163,6 +163,9 @@ struct topology {
// Pending global topology request's id, which is a new group0's state id
std::optional<utils::UUID> global_request_id;
// A queue of pending global topology request's ids. Replaces the single one above
std::vector<utils::UUID> global_requests_queue;
// The IDs of the committed CDC generations sorted by timestamps.
// The obsolete generations may not be in this list as they are continually deleted.
std::vector<cdc::generation_id_v2> committed_cdc_generations;
@@ -283,6 +286,7 @@ struct topology_request_state {
topology::transition_state transition_state_from_string(const sstring& s);
node_state node_state_from_string(const sstring& s);
std::optional<topology_request> try_topology_request_from_string(const sstring& s);
topology_request topology_request_from_string(const sstring& s);
global_topology_request global_topology_request_from_string(const sstring&);
cleanup_status cleanup_status_from_string(const sstring& s);

View File

@@ -254,9 +254,6 @@ async def test_truncate_while_truncate_already_waiting(manager: ManagerClient):
# Run another truncate on the same table while the timedout one is still waiting
truncate_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=hosts[1])
# Make sure the second truncate re-used the existing global topology request
await s1_log.wait_for(f'Ongoing TRUNCATE for table {ks}.test')
# Release streaming
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
@@ -302,3 +299,51 @@ async def test_replay_position_check_during_truncate(manager):
await manager.api.message_injection(server.ip_addr, "database_truncate_wait")
await s1_log.wait_for(f"database_truncate_wait: message received", from_mark=s1_mark)
await truncate_task
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_parallel_truncate(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['migration_streaming_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
await cql.run_async(f'CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test1 (pk, c) VALUES ({k}, {k});') for k in keys])
# Add a node to the cluster. This will cause the load balancer to migrate one tablet to the new node
servers.append(await manager.server_add(config=cfg))
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
s1_log = await manager.server_open_log(servers[1].server_id)
# Wait for tablet streaming to start
await s1_log.wait_for('migration_streaming_wait: start')
tf1 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test', retry_policy=FallthroughRetryPolicy()))
tf2 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test1', retry_policy=FallthroughRetryPolicy()))
# Release streaming
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
# Wait for the joined truncate to complete
await tf1
await tf2
# Check if we have any data
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test1', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0