Compare commits
33 Commits
master
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3297824e3 | ||
|
|
4eb220d3ab | ||
|
|
c9de7d68f2 | ||
|
|
b535f44db2 | ||
|
|
ec1dd1bf31 | ||
|
|
7b30f487dd | ||
|
|
c3c489d3d4 | ||
|
|
6fb6bb8dc7 | ||
|
|
02c038efa8 | ||
|
|
d3175671b7 | ||
|
|
4651c44747 | ||
|
|
0e67f6f6c2 | ||
|
|
859d9dd3b1 | ||
|
|
a25bd068bf | ||
|
|
9bc487e79e | ||
|
|
41dc86ffa8 | ||
|
|
f78a352a29 | ||
|
|
b647dbd547 | ||
|
|
0e7d3b4eb9 | ||
|
|
c8043e05c1 | ||
|
|
54fb9ed03b | ||
|
|
f60c54df77 | ||
|
|
f1ec51133e | ||
|
|
648fe6a4e8 | ||
|
|
1bd536a228 | ||
|
|
d5b11098e8 | ||
|
|
775916132e | ||
|
|
76bf279e0e | ||
|
|
61364624e3 | ||
|
|
6e6c00dcfe | ||
|
|
c26eb8ef14 | ||
|
|
8a1d09862e | ||
|
|
e64bb3819c |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.3.0-dev
|
||||
VERSION=2025.3.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -3161,6 +3161,22 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/raft_topology/cmd_rpc_status",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get information about currently running topology cmd rpc",
|
||||
"type":"string",
|
||||
"nickname":"raft_topology_get_cmd_status",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"models":{
|
||||
|
||||
@@ -1670,6 +1670,18 @@ rest_raft_topology_upgrade_status(sharded<service::storage_service>& ss, std::un
|
||||
co_return sstring(format("{}", ustate));
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_raft_topology_get_cmd_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
const auto status = co_await ss.invoke_on(0, [] (auto& ss) {
|
||||
return ss.get_topology_cmd_status();
|
||||
});
|
||||
if (status.active_dst.empty()) {
|
||||
co_return sstring("none");
|
||||
}
|
||||
co_return sstring(fmt::format("{}[{}]: {}", status.current, status.index, fmt::join(status.active_dst, ",")));
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_move_tablet(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
@@ -1902,6 +1914,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
|
||||
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
|
||||
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
|
||||
ss::raft_topology_get_cmd_status.set(r, rest_bind(rest_raft_topology_get_cmd_status, ss));
|
||||
ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss));
|
||||
ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss));
|
||||
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));
|
||||
|
||||
@@ -245,12 +245,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
service::topology_mutation_builder builder(ts);
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
if (!qp.proxy().features().topology_global_request_queue) {
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(global_request_id);
|
||||
builder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
} else {
|
||||
builder.queue_global_topology_request_id(global_request_id);
|
||||
rtbuilder.set("request_type", service::global_topology_request::keyspace_rf_change)
|
||||
.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
|
||||
};
|
||||
service::topology_change change{{builder.build()}};
|
||||
|
||||
@@ -259,13 +265,6 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
return cm.to_mutation(topo_schema);
|
||||
});
|
||||
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", service::global_topology_request::keyspace_rf_change);
|
||||
if (qp.proxy().features().topology_global_request_queue) {
|
||||
rtbuilder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
}
|
||||
service::topology_change req_change{{rtbuilder.build()}};
|
||||
|
||||
auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
|
||||
|
||||
@@ -36,7 +36,7 @@
|
||||
|
||||
static logging::logger blogger("batchlog_manager");
|
||||
|
||||
const uint32_t db::batchlog_manager::replay_interval;
|
||||
const std::chrono::seconds db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
|
||||
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
|
||||
@@ -116,7 +116,8 @@ future<> db::batchlog_manager::batchlog_replay_loop() {
|
||||
} catch (...) {
|
||||
blogger.error("Exception in batch replay: {}", std::current_exception());
|
||||
}
|
||||
delay = std::chrono::milliseconds(replay_interval);
|
||||
delay = utils::get_local_injector().is_enabled("short_batchlog_manager_replay_interval") ?
|
||||
std::chrono::seconds(1) : replay_interval;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,6 +133,8 @@ future<> db::batchlog_manager::drain() {
|
||||
_sem.broken();
|
||||
}
|
||||
|
||||
co_await _qp.proxy().abort_batch_writes();
|
||||
|
||||
co_await std::move(_loop_done);
|
||||
blogger.info("Drained");
|
||||
}
|
||||
@@ -173,6 +176,11 @@ future<> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cle
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
||||
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
// check version of serialization format
|
||||
if (!row.has("version")) {
|
||||
blogger.warn("Skipping logged batch because of unknown version");
|
||||
@@ -242,7 +250,8 @@ future<> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cle
|
||||
// send to partially or wholly fail in actually sending stuff. Since we don't
|
||||
// have hints (yet), send with CL=ALL, and hope we can re-do this soon.
|
||||
// See below, we use retry on write failure.
|
||||
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit(), db::allow_per_partition_rate_limit::no);
|
||||
auto timeout = db::timeout_clock::now() + write_timeout;
|
||||
return _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
||||
});
|
||||
}).then_wrapped([this, id](future<> batch_result) {
|
||||
try {
|
||||
|
||||
@@ -43,8 +43,9 @@ public:
|
||||
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
|
||||
|
||||
private:
|
||||
static constexpr uint32_t replay_interval = 60 * 1000; // milliseconds
|
||||
static constexpr std::chrono::seconds replay_interval = std::chrono::seconds(60);
|
||||
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
|
||||
static constexpr std::chrono::seconds write_timeout = std::chrono::seconds(300);
|
||||
|
||||
using clock_type = lowres_clock;
|
||||
|
||||
|
||||
@@ -1230,7 +1230,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
|
||||
"bytes written to data file. Value must be between 0 and 1.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, (size_t(128) << 10) + 1, "Warn about memory allocations above this size; set to zero to disable.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
|
||||
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
|
||||
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")
|
||||
, enable_node_aggregated_table_metrics(this, "enable_node_aggregated_table_metrics", value_status::Used, true, "Enable aggregated per node, per keyspace and per table metrics reporting, applicable if enable_keyspace_column_family_metrics is false.")
|
||||
|
||||
@@ -481,7 +481,8 @@ Creating a new user-defined type is done using a ``CREATE TYPE`` statement defin
|
||||
field_definition: `identifier` `cql_type`
|
||||
|
||||
A UDT has a name (``udt_name``), which is used to declare columns of that type and is a set of named and typed fields. The ``udt_name`` can be any
|
||||
type, including collections or other UDTs. UDTs and collections inside collections must always be frozen (no matter which version of ScyllaDB you are using).
|
||||
type, including collections or other UDTs.
|
||||
Similar to collections, a UDT can be frozen or non-frozen. A frozen UDT is immutable and can only be updated as a whole. Nested UDTs or UDTs used in keys must always be frozen.
|
||||
|
||||
For example::
|
||||
|
||||
@@ -506,26 +507,15 @@ For example::
|
||||
|
||||
CREATE TABLE superheroes (
|
||||
name frozen<full_name> PRIMARY KEY,
|
||||
home frozen<address>
|
||||
home address
|
||||
);
|
||||
|
||||
.. note::
|
||||
|
||||
- Attempting to create an already existing type will result in an error unless the ``IF NOT EXISTS`` option is used. If it is used, the statement will be a no-op if the type already exists.
|
||||
- A type is intrinsically bound to the keyspace in which it is created and can only be used in that keyspace. At creation, if the type name is prefixed by a keyspace name, it is created in that keyspace. Otherwise, it is created in the current keyspace.
|
||||
- As of ScyllaDB Open Source 3.2, UDTs not inside collections do not have to be frozen, but in all versions prior to ScyllaDB Open Source 3.2, and in all ScyllaDB Enterprise versions, UDTs **must** be frozen.
|
||||
|
||||
|
||||
A non-frozen UDT example with ScyllaDB Open Source 3.2 and higher::
|
||||
|
||||
CREATE TYPE ut (a int, b int);
|
||||
CREATE TABLE cf (a int primary key, b ut);
|
||||
|
||||
Same UDT in versions prior::
|
||||
|
||||
CREATE TYPE ut (a int, b int);
|
||||
CREATE TABLE cf (a int primary key, b frozen<ut>);
|
||||
|
||||
UDT literals
|
||||
~~~~~~~~~~~~
|
||||
|
||||
|
||||
@@ -157,7 +157,7 @@ will leave the recovery mode and remove the obsolete internal Raft data.
|
||||
|
||||
After completing this step, Raft should be fully functional.
|
||||
|
||||
#. Replace all dead nodes from the cluster using the
|
||||
#. Replace all dead nodes in the cluster using the
|
||||
:doc:`node replacement procedure </operating-scylla/procedures/cluster-management/replace-dead-node/>`.
|
||||
|
||||
.. note::
|
||||
|
||||
@@ -335,18 +335,25 @@ void tablet_metadata::drop_tablet_map(table_id id) {
|
||||
}
|
||||
|
||||
future<> tablet_metadata::clear_gently() {
|
||||
for (auto&& [id, map] : _tablets) {
|
||||
const auto shard = map.get_owner_shard();
|
||||
co_await smp::submit_to(shard, [map = std::move(map)] () mutable {
|
||||
auto map_ptr = map.release();
|
||||
// Others copies exist, we simply drop ours, no need to clear anything.
|
||||
if (map_ptr.use_count() > 1) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return const_cast<tablet_map&>(*map_ptr).clear_gently().finally([map_ptr = std::move(map_ptr)] { });
|
||||
});
|
||||
tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this));
|
||||
// First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs
|
||||
// on this shard. We don't use sharded<> here since it will require a similar
|
||||
// submit_to to each shard owner per tablet-map.
|
||||
std::vector<std::vector<tablet_map_ptr>> tablet_maps_per_shard;
|
||||
tablet_maps_per_shard.resize(smp::count);
|
||||
for (auto& [_, map_ptr] : _tablets) {
|
||||
tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr));
|
||||
}
|
||||
_tablets.clear();
|
||||
|
||||
// Now destroy the foreign tablet map pointers on each shard.
|
||||
co_await smp::invoke_on_all([&] -> future<> {
|
||||
for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) {
|
||||
auto map = map_ptr.release();
|
||||
co_await utils::clear_gently(map);
|
||||
}
|
||||
});
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
@@ -357,6 +357,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
|
||||
}
|
||||
|
||||
future<> token_metadata_impl::clear_gently() noexcept {
|
||||
_version_tracker = {};
|
||||
co_await utils::clear_gently(_token_to_endpoint_map);
|
||||
co_await utils::clear_gently(_normal_token_owners);
|
||||
co_await utils::clear_gently(_bootstrap_tokens);
|
||||
@@ -834,16 +835,30 @@ token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
|
||||
{
|
||||
}
|
||||
|
||||
token_metadata::token_metadata(config cfg)
|
||||
: _impl(std::make_unique<token_metadata_impl>(cfg))
|
||||
token_metadata::token_metadata(shared_token_metadata& stm, config cfg)
|
||||
: _shared_token_metadata(&stm)
|
||||
, _impl(std::make_unique<token_metadata_impl>(std::move(cfg)))
|
||||
{
|
||||
}
|
||||
|
||||
token_metadata::~token_metadata() = default;
|
||||
token_metadata::~token_metadata() {
|
||||
clear_and_dispose_impl();
|
||||
}
|
||||
|
||||
token_metadata::token_metadata(token_metadata&&) noexcept = default;
|
||||
|
||||
token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default;
|
||||
token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept {
|
||||
if (this != &o) {
|
||||
clear_and_dispose_impl();
|
||||
_shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr);
|
||||
_impl = std::exchange(o._impl, nullptr);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) {
|
||||
_shared_token_metadata = &stm;
|
||||
}
|
||||
|
||||
const std::vector<token>&
|
||||
token_metadata::sorted_tokens() const {
|
||||
@@ -1027,6 +1042,15 @@ token_metadata::clone_after_all_left() const noexcept {
|
||||
co_return token_metadata(co_await _impl->clone_after_all_left());
|
||||
}
|
||||
|
||||
void token_metadata::clear_and_dispose_impl() noexcept {
|
||||
if (!_shared_token_metadata) {
|
||||
return;
|
||||
}
|
||||
if (auto impl = std::exchange(_impl, nullptr)) {
|
||||
_shared_token_metadata->clear_and_dispose(std::move(impl));
|
||||
}
|
||||
}
|
||||
|
||||
future<> token_metadata::clear_gently() noexcept {
|
||||
return _impl->clear_gently();
|
||||
}
|
||||
@@ -1143,6 +1167,17 @@ version_tracker shared_token_metadata::new_tracker(token_metadata::version_t ver
|
||||
return tracker;
|
||||
}
|
||||
|
||||
future<> shared_token_metadata::stop() noexcept {
|
||||
co_await _background_dispose_gate.close();
|
||||
}
|
||||
|
||||
void shared_token_metadata::clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept {
|
||||
// Safe to drop the future since the gate is closed in stop()
|
||||
if (auto gh = _background_dispose_gate.try_hold()) {
|
||||
(void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {});
|
||||
}
|
||||
}
|
||||
|
||||
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
|
||||
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
|
||||
@@ -1154,6 +1189,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
_stale_versions_in_use = _versions_barrier.advance_and_await();
|
||||
}
|
||||
|
||||
tmptr->set_shared_token_metadata(*this);
|
||||
_shared = std::move(tmptr);
|
||||
_shared->set_version_tracker(new_tracker(_shared->get_version()));
|
||||
|
||||
@@ -1216,7 +1252,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
|
||||
|
||||
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
|
||||
pending_token_metadata_ptr.resize(smp::count);
|
||||
auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async());
|
||||
auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async());
|
||||
auto& tm = *tmptr;
|
||||
// bump the token_metadata ring_version
|
||||
// to invalidate cached token/replication mappings
|
||||
@@ -1227,7 +1263,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
|
||||
// Apply the mutated token_metadata only after successfully cloning it on all shards.
|
||||
pending_token_metadata_ptr[base_shard] = tmptr;
|
||||
co_await smp::invoke_on_others(base_shard, [&] () -> future<> {
|
||||
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async());
|
||||
pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async());
|
||||
});
|
||||
|
||||
co_await stm.invoke_on_all([&] (shared_token_metadata& stm) {
|
||||
|
||||
@@ -47,7 +47,7 @@ class abstract_replication_strategy;
|
||||
|
||||
using token = dht::token;
|
||||
|
||||
class token_metadata;
|
||||
class shared_token_metadata;
|
||||
class tablet_metadata;
|
||||
|
||||
struct host_id_or_endpoint {
|
||||
@@ -166,6 +166,7 @@ private:
|
||||
};
|
||||
|
||||
class token_metadata final {
|
||||
shared_token_metadata* _shared_token_metadata = nullptr;
|
||||
std::unique_ptr<token_metadata_impl> _impl;
|
||||
private:
|
||||
friend class token_metadata_ring_splitter;
|
||||
@@ -178,7 +179,7 @@ public:
|
||||
using version_t = service::topology::version_t;
|
||||
using version_tracker_t = version_tracker;
|
||||
|
||||
token_metadata(config cfg);
|
||||
token_metadata(shared_token_metadata& stm, config cfg);
|
||||
explicit token_metadata(std::unique_ptr<token_metadata_impl> impl);
|
||||
token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr
|
||||
token_metadata& operator=(token_metadata&&) noexcept;
|
||||
@@ -355,6 +356,11 @@ public:
|
||||
friend class shared_token_metadata;
|
||||
private:
|
||||
void set_version_tracker(version_tracker_t tracker);
|
||||
|
||||
void set_shared_token_metadata(shared_token_metadata& stm);
|
||||
|
||||
// Clears and disposes the token metadata impl in the background, if present.
|
||||
void clear_and_dispose_impl() noexcept;
|
||||
};
|
||||
|
||||
struct topology_change_info {
|
||||
@@ -371,12 +377,8 @@ struct topology_change_info {
|
||||
using token_metadata_lock = semaphore_units<>;
|
||||
using token_metadata_lock_func = noncopyable_function<future<token_metadata_lock>() noexcept>;
|
||||
|
||||
template <typename... Args>
|
||||
mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) {
|
||||
return make_lw_shared<token_metadata>(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
class shared_token_metadata {
|
||||
class shared_token_metadata : public peering_sharded_service<shared_token_metadata> {
|
||||
named_gate _background_dispose_gate{"shared_token_metadata::background_dispose_gate"};
|
||||
mutable_token_metadata_ptr _shared;
|
||||
token_metadata_lock_func _lock_func;
|
||||
std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2);
|
||||
@@ -408,7 +410,7 @@ public:
|
||||
// used to construct the shared object as a sharded<> instance
|
||||
// lock_func returns semaphore_units<>
|
||||
explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg)
|
||||
: _shared(make_token_metadata_ptr(std::move(cfg)))
|
||||
: _shared(make_lw_shared<token_metadata>(*this, cfg))
|
||||
, _lock_func(std::move(lock_func))
|
||||
, _versions_barrier("shared_token_metadata::versions_barrier")
|
||||
{
|
||||
@@ -418,6 +420,17 @@ public:
|
||||
shared_token_metadata(const shared_token_metadata& x) = delete;
|
||||
shared_token_metadata(shared_token_metadata&& x) = default;
|
||||
|
||||
future<> stop() noexcept;
|
||||
|
||||
mutable_token_metadata_ptr make_token_metadata_ptr() {
|
||||
return make_lw_shared<token_metadata>(*this, token_metadata::config{_shared->get_topology().get_config()});
|
||||
}
|
||||
|
||||
mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) {
|
||||
tm.set_shared_token_metadata(*this);
|
||||
return make_lw_shared<token_metadata>(std::move(tm));
|
||||
}
|
||||
|
||||
token_metadata_ptr get() const noexcept {
|
||||
return _shared;
|
||||
}
|
||||
@@ -467,6 +480,8 @@ public:
|
||||
// Must be called on shard 0.
|
||||
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
|
||||
void clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept;
|
||||
|
||||
private:
|
||||
// for testing only, unsafe to be called without awaiting get_lock() first
|
||||
void mutate_token_metadata_for_test(seastar::noncopyable_function<void (token_metadata&)> func);
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:f69e30ac03713e439d4f9fe347aafe2201d8605880358d3142b6f6bc706c3014
|
||||
size 5966816
|
||||
oid sha256:4c7c513b0a83214e35598a41db10ddb9a4266a63f640d2a49e35d646061969b1
|
||||
size 5990560
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:9ec68edb2980fae1fcf63046b399f30b882fc7b77b4bc316c7055f75820d26f1
|
||||
size 5975376
|
||||
oid sha256:ad22fc390a168892eda150ed19966405e2ceb3393cbf507ebd5df0f1332a869c
|
||||
size 6009316
|
||||
|
||||
@@ -2244,7 +2244,7 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
|
||||
auto reason = streaming::stream_reason::replace;
|
||||
// update a cloned version of tmptr
|
||||
// no need to set the original version
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
auto cloned_tmptr = _db.local().get_shared_token_metadata().make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
auto source_dc = utils::optional_param(myloc.dc);
|
||||
@@ -2283,7 +2283,8 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
}
|
||||
table_id tid = t->schema()->id();
|
||||
// Invoke group0 read barrier before obtaining erm pointer so that it sees all prior metadata changes
|
||||
auto dropped = co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
|
||||
auto dropped = !utils::get_local_injector().enter("repair_tablets_no_sync") &&
|
||||
co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
|
||||
if (dropped) {
|
||||
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
|
||||
continue;
|
||||
@@ -2292,11 +2293,15 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
while (true) {
|
||||
_repair_module->check_in_shutdown();
|
||||
erm = t->get_effective_replication_map();
|
||||
auto local_version = erm->get_token_metadata().get_version();
|
||||
const locator::tablet_map& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
|
||||
if (!tmap.has_transitions()) {
|
||||
if (!tmap.has_transitions() && co_await container().invoke_on(0, [local_version] (repair_service& rs) {
|
||||
// We need to ensure that there is no ongoing global request.
|
||||
return local_version == rs._tsm.local()._topology.version && !rs._tsm.local()._topology.is_busy();
|
||||
})) {
|
||||
break;
|
||||
}
|
||||
rlogger.info("repair[{}] Table {}.{} has tablet transitions, waiting for topology to quiesce", rid.uuid(), keyspace_name, table_name);
|
||||
rlogger.info("repair[{}] Topology is busy, waiting for it to quiesce", rid.uuid());
|
||||
erm = nullptr;
|
||||
co_await container().invoke_on(0, [] (repair_service& rs) {
|
||||
return rs._tsm.local().await_not_busy();
|
||||
|
||||
@@ -1448,7 +1448,9 @@ private:
|
||||
size_t row_bytes = co_await get_repair_rows_size(row_list);
|
||||
_metrics.tx_row_nr += row_list.size();
|
||||
_metrics.tx_row_bytes += row_bytes;
|
||||
for (repair_row& r : row_list) {
|
||||
while (!row_list.empty()) {
|
||||
repair_row r = std::move(row_list.front());
|
||||
row_list.pop_front();
|
||||
const auto& dk_with_hash = r.get_dk_with_hash();
|
||||
// No need to search from the beginning of the rows. Look at the end of repair_rows_on_wire is enough.
|
||||
if (rows.empty()) {
|
||||
|
||||
@@ -355,7 +355,7 @@ database::view_update_read_concurrency_sem() {
|
||||
return *sem;
|
||||
}
|
||||
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
|
||||
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
|
||||
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory& scf, const abort_source& abort, utils::cross_shard_barrier barrier)
|
||||
: _stats(make_lw_shared<db_stats>())
|
||||
, _user_types(std::make_shared<db_user_types_storage>(*this))
|
||||
|
||||
@@ -1599,7 +1599,7 @@ private:
|
||||
service::migration_notifier& _mnotifier;
|
||||
gms::feature_service& _feat;
|
||||
std::vector<std::any> _listeners;
|
||||
const locator::shared_token_metadata& _shared_token_metadata;
|
||||
locator::shared_token_metadata& _shared_token_metadata;
|
||||
lang::manager& _lang_manager;
|
||||
|
||||
reader_concurrency_semaphore_group _reader_concurrency_semaphores_group;
|
||||
@@ -1684,7 +1684,7 @@ public:
|
||||
// (keyspace/table definitions, column mappings etc.)
|
||||
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);
|
||||
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
|
||||
compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&, const abort_source& abort,
|
||||
utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
|
||||
database(database&&) = delete;
|
||||
@@ -1719,7 +1719,7 @@ public:
|
||||
return _compaction_manager;
|
||||
}
|
||||
|
||||
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
||||
locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); }
|
||||
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
||||
|
||||
|
||||
@@ -1101,26 +1101,23 @@ private:
|
||||
|
||||
global_request_id = guard.new_group0_state_id();
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
|
||||
trbuilder.set_truncate_table_data(table_id)
|
||||
.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
|
||||
if (!_sp._features.topology_global_request_queue) {
|
||||
builder.set_global_topology_request(global_topology_request::truncate_table)
|
||||
.set_global_topology_request_id(global_request_id);
|
||||
} else {
|
||||
builder.queue_global_topology_request_id(global_request_id);
|
||||
trbuilder.set("request_type", global_topology_request::truncate_table);
|
||||
}
|
||||
updates.emplace_back(builder.build());
|
||||
|
||||
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id, _sp._features.topology_requests_type_column)
|
||||
.set_truncate_table_data(table_id)
|
||||
.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::truncate_table)
|
||||
.build());
|
||||
|
||||
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
|
||||
|
||||
topology_change change{std::move(updates)};
|
||||
topology_change change{{builder.build(), trbuilder.build()}};
|
||||
sstring reason = "Truncating table";
|
||||
group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
@@ -1615,6 +1612,10 @@ public:
|
||||
return _type == db::write_type::VIEW;
|
||||
}
|
||||
|
||||
bool is_batch() const noexcept {
|
||||
return _type == db::write_type::BATCH;
|
||||
}
|
||||
|
||||
void set_cdc_operation_result_tracker(lw_shared_ptr<cdc::operation_result_tracker> tracker) {
|
||||
_cdc_operation_result_tracker = std::move(tracker);
|
||||
}
|
||||
@@ -2120,7 +2121,7 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
|
||||
// create_write_response_handler is overloaded for paxos::proposal and will
|
||||
// create cas_mutation holder, which consequently will ensure paxos::learn is
|
||||
// used.
|
||||
auto f = _proxy->mutate_internal(std::move(m), db::consistency_level::ANY, false, tr_state, _permit, _timeout)
|
||||
auto f = _proxy->mutate_internal(std::move(m), db::consistency_level::ANY, tr_state, _permit, _timeout)
|
||||
.then(utils::result_into_future<result<>>);
|
||||
|
||||
// TODO: provided commits did not invalidate the prepare we just did above (which they
|
||||
@@ -2472,7 +2473,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
|
||||
return v.schema()->id() == base_tbl_id;
|
||||
});
|
||||
if (!mutations.empty()) {
|
||||
f_cdc = _proxy->mutate_internal(std::move(mutations), _cl_for_learn, false, tr_state, _permit, _timeout, std::move(tracker))
|
||||
f_cdc = _proxy->mutate_internal(std::move(mutations), _cl_for_learn, tr_state, _permit, _timeout, {}, std::move(tracker))
|
||||
.then(utils::result_into_future<result<>>);
|
||||
}
|
||||
}
|
||||
@@ -2480,7 +2481,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
|
||||
|
||||
// Path for the "base" mutations
|
||||
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout)
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, tr_state, _permit, _timeout)
|
||||
.then(utils::result_into_future<result<>>);
|
||||
|
||||
co_await when_all_succeed(std::move(f_cdc), std::move(f_lwt)).discard_result();
|
||||
@@ -3071,6 +3072,10 @@ struct hint_wrapper {
|
||||
mutation mut;
|
||||
};
|
||||
|
||||
struct batchlog_replay_mutation {
|
||||
mutation mut;
|
||||
};
|
||||
|
||||
struct read_repair_mutation {
|
||||
std::unordered_map<locator::host_id, std::optional<mutation>> value;
|
||||
locator::effective_replication_map_ptr ermp;
|
||||
@@ -3084,6 +3089,12 @@ template <> struct fmt::formatter<service::hint_wrapper> : fmt::formatter<string
|
||||
}
|
||||
};
|
||||
|
||||
template <> struct fmt::formatter<service::batchlog_replay_mutation> : fmt::formatter<string_view> {
|
||||
auto format(const service::batchlog_replay_mutation& h, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "batchlog_replay_mutation{{{}}}", h.mut);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<service::read_repair_mutation> : fmt::formatter<string_view> {
|
||||
auto format(const service::read_repair_mutation& m, fmt::format_context& ctx) const {
|
||||
@@ -3449,6 +3460,12 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste
|
||||
std::move(permit), allow_limit, is_cancellable::yes);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler(const batchlog_replay_mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
return create_write_response_handler_helper(m.mut.schema(), m.mut.token(), std::make_unique<shared_mutation>(m.mut), cl, type, tr_state,
|
||||
std::move(permit), allow_limit, is_cancellable::yes);
|
||||
}
|
||||
|
||||
result<storage_proxy::response_id_type>
|
||||
storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
|
||||
host_id_vector_replica_set endpoints;
|
||||
@@ -3843,7 +3860,7 @@ future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::c
|
||||
}).begin();
|
||||
return seastar::when_all_succeed(
|
||||
mutate_counters(std::ranges::subrange(mutations.begin(), mid), cl, tr_state, permit, timeout),
|
||||
mutate_internal(std::ranges::subrange(mid, mutations.end()), cl, false, tr_state, permit, timeout, std::move(cdc_tracker), allow_limit)
|
||||
mutate_internal(std::ranges::subrange(mid, mutations.end()), cl, tr_state, permit, timeout, {}, std::move(cdc_tracker), allow_limit)
|
||||
).then([] (std::tuple<result<>> res) {
|
||||
// For now, only mutate_internal returns a result<>
|
||||
return std::get<0>(std::move(res));
|
||||
@@ -3852,8 +3869,10 @@ future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::c
|
||||
|
||||
future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistency_level cl, tracing::trace_state_ptr tr_state,
|
||||
clock_type::time_point timeout, service_permit permit) {
|
||||
// we need to pass correct db::write_type in case of a timeout so that
|
||||
// client doesn't attempt to retry the request.
|
||||
// FIXME: do not send the mutation to itself, it has already been applied (it is not incorrect to do so, though)
|
||||
return mutate_internal(std::array<mutation, 1>{std::move(m)}, cl, true, std::move(tr_state), std::move(permit), timeout)
|
||||
return mutate_internal(std::array<mutation, 1>{std::move(m)}, cl, std::move(tr_state), std::move(permit), timeout, db::write_type::COUNTER)
|
||||
.then(utils::result_into_future<result<>>);
|
||||
}
|
||||
|
||||
@@ -3864,8 +3883,8 @@ future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistenc
|
||||
*/
|
||||
template<typename Range>
|
||||
future<result<>>
|
||||
storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool counters, tracing::trace_state_ptr tr_state, service_permit permit,
|
||||
std::optional<clock_type::time_point> timeout_opt, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker,
|
||||
storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit,
|
||||
std::optional<clock_type::time_point> timeout_opt, std::optional<db::write_type> type_opt, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker,
|
||||
db::allow_per_partition_rate_limit allow_limit) {
|
||||
if (std::ranges::empty(mutations)) {
|
||||
return make_ready_future<result<>>(bo::success());
|
||||
@@ -3874,12 +3893,10 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
|
||||
slogger.trace("mutate cl={}", cl);
|
||||
mlogger.trace("mutations={}", mutations);
|
||||
|
||||
// If counters is set it means that we are replicating counter shards. There
|
||||
// is no need for special handling anymore, since the leader has already
|
||||
// done its job, but we need to return correct db::write_type in case of
|
||||
// a timeout so that client doesn't attempt to retry the request.
|
||||
auto type = counters ? db::write_type::COUNTER
|
||||
: (std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
|
||||
// the parameter type_opt allows to pass a specific type if needed for
|
||||
// special handling, e.g. counters. otherwise, a default type is used.
|
||||
auto type = type_opt.value_or(std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
|
||||
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
@@ -4065,6 +4082,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
|
||||
};
|
||||
future<> async_remove_from_batchlog() {
|
||||
// delete batch
|
||||
utils::get_local_injector().inject("storage_proxy_fail_remove_from_batchlog", [] { throw std::runtime_error("Error injection: failing remove from batchlog"); });
|
||||
auto key = partition_key::from_exploded(*_schema, {uuid_type->decompose(_batch_uuid)});
|
||||
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
|
||||
mutation m(_schema, key);
|
||||
@@ -4136,13 +4154,15 @@ mutation storage_proxy::do_get_batchlog_mutation_for(schema_ptr schema, const st
|
||||
for (auto& m : fm) {
|
||||
ser::serialize(out, m);
|
||||
}
|
||||
return to_bytes(out.linearize());
|
||||
return std::move(out).to_managed_bytes();
|
||||
}();
|
||||
|
||||
mutation m(schema, key);
|
||||
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("version"), version, timestamp);
|
||||
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("written_at"), now, timestamp);
|
||||
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("data"), data_value(std::move(data)), timestamp);
|
||||
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
|
||||
auto cdef_data = schema->get_column_definition(to_bytes("data"));
|
||||
m.set_cell(clustering_key_prefix::make_empty(), *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
|
||||
|
||||
return m;
|
||||
}
|
||||
@@ -4248,7 +4268,16 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s,
|
||||
|
||||
future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) {
|
||||
std::array<hint_wrapper, 1> ms{hint_wrapper { fm_a_s.fm.unfreeze(fm_a_s.s) }};
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit())
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, nullptr, empty_service_permit())
|
||||
.then(utils::result_into_future<result<>>);
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_batchlog_replay_to_all_replicas(std::vector<mutation> mutations, clock_type::time_point timeout) {
|
||||
std::vector<batchlog_replay_mutation> ms = mutations | std::views::transform([] (auto&& m) {
|
||||
return batchlog_replay_mutation(std::move(m));
|
||||
}) | std::ranges::to<std::vector<batchlog_replay_mutation>>();
|
||||
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, nullptr, empty_service_permit(), timeout, db::write_type::BATCH)
|
||||
.then(utils::result_into_future<result<>>);
|
||||
}
|
||||
|
||||
@@ -4431,7 +4460,7 @@ future<result<>> storage_proxy::schedule_repair(locator::effective_replication_m
|
||||
std::views::transform([ermp] (auto& v) { return read_repair_mutation{std::move(v), ermp}; }) |
|
||||
// The transform above is destructive, materialize into a vector to make the range re-iterable.
|
||||
std::ranges::to<std::vector<read_repair_mutation>>()
|
||||
, cl, false, std::move(trace_state), std::move(permit));
|
||||
, cl, std::move(trace_state), std::move(permit));
|
||||
}
|
||||
|
||||
class abstract_read_resolver {
|
||||
@@ -6964,6 +6993,12 @@ future<> storage_proxy::abort_view_writes() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_proxy::abort_batch_writes() {
|
||||
return async([this] {
|
||||
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_batch(); });
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
storage_proxy::stop() {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -87,6 +87,7 @@ class mutation_holder;
|
||||
class client_state;
|
||||
class migration_manager;
|
||||
struct hint_wrapper;
|
||||
struct batchlog_replay_mutation;
|
||||
struct read_repair_mutation;
|
||||
|
||||
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
|
||||
@@ -340,6 +341,7 @@ private:
|
||||
const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable);
|
||||
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const batchlog_replay_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
|
||||
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
|
||||
@@ -427,7 +429,7 @@ private:
|
||||
void unthrottle();
|
||||
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);
|
||||
template<typename Range>
|
||||
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { }, db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no);
|
||||
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, std::optional<db::write_type> type = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { }, db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no);
|
||||
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_nonsingular_mutations_locally(
|
||||
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state,
|
||||
clock_type::time_point timeout);
|
||||
@@ -631,6 +633,8 @@ public:
|
||||
|
||||
future<> send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s);
|
||||
|
||||
future<> send_batchlog_replay_to_all_replicas(std::vector<mutation> mutations, clock_type::time_point timeout);
|
||||
|
||||
// Send a mutation to one specific remote target.
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
@@ -705,6 +709,7 @@ public:
|
||||
void allow_replaying_hints() noexcept;
|
||||
future<> drain_hints_for_left_nodes();
|
||||
future<> abort_view_writes();
|
||||
future<> abort_batch_writes();
|
||||
|
||||
future<> change_hints_host_filter(db::hints::host_filter new_filter);
|
||||
const db::hints::host_filter& get_hints_host_filter() const;
|
||||
|
||||
@@ -111,7 +111,6 @@
|
||||
#include "node_ops/task_manager_module.hh"
|
||||
#include "service/task_manager_module.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "service/topology_coordinator.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
@@ -740,9 +739,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
auto saved_tmpr = get_token_metadata_ptr();
|
||||
{
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = make_token_metadata_ptr(token_metadata::config {
|
||||
get_token_metadata().get_topology().get_config()
|
||||
});
|
||||
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
|
||||
tmptr->invalidate_cached_rings();
|
||||
|
||||
tmptr->set_version(_topology_state_machine._topology.version);
|
||||
@@ -1134,7 +1131,8 @@ future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::hol
|
||||
_tablet_allocator.local(),
|
||||
get_ring_delay(),
|
||||
_lifecycle_notifier,
|
||||
_feature_service);
|
||||
_feature_service,
|
||||
_topology_cmd_rpc_tracker);
|
||||
}
|
||||
} catch (...) {
|
||||
rtlogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
|
||||
@@ -3146,9 +3144,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
try {
|
||||
auto base_shard = this_shard_id();
|
||||
pending_token_metadata_ptr[base_shard] = tmptr;
|
||||
auto& sharded_token_metadata = _shared_token_metadata.container();
|
||||
// clone a local copy of updated token_metadata on all other shards
|
||||
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
|
||||
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async());
|
||||
pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
|
||||
});
|
||||
|
||||
// Precalculate new effective_replication_map for all keyspaces
|
||||
@@ -5747,7 +5746,7 @@ future<> storage_service::snitch_reconfigured() {
|
||||
|
||||
future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd) {
|
||||
raft_topology_cmd_result result;
|
||||
rtlogger.debug("topology cmd rpc {} is called", cmd.cmd);
|
||||
rtlogger.info("topology cmd rpc {} is called index={}", cmd.cmd, cmd_index);
|
||||
|
||||
try {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
@@ -6077,6 +6076,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
} catch (...) {
|
||||
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
|
||||
}
|
||||
|
||||
rtlogger.info("topology cmd rpc {} completed with status={} index={}",
|
||||
cmd.cmd, (result.status == raft_topology_cmd_result::command_status::success) ? "suceeded" : "failed", cmd_index);
|
||||
co_return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
#include "timestamp.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
#include "utils/sequenced_set.hh"
|
||||
#include "service/topology_coordinator.hh"
|
||||
|
||||
class node_ops_cmd_request;
|
||||
class node_ops_cmd_response;
|
||||
@@ -282,12 +283,12 @@ private:
|
||||
future<> snitch_reconfigured();
|
||||
|
||||
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
|
||||
return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) {
|
||||
return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) {
|
||||
// bump the token_metadata ring_version
|
||||
// to invalidate cached token/replication mappings
|
||||
// when the modified token_metadata is committed.
|
||||
tm.invalidate_cached_rings();
|
||||
return make_ready_future<mutable_token_metadata_ptr>(make_token_metadata_ptr(std::move(tm)));
|
||||
return _shared_token_metadata.make_token_metadata_ptr(std::move(tm));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -873,6 +874,11 @@ private:
|
||||
std::optional<shared_future<>> _rebuild_result;
|
||||
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
|
||||
tablet_op_registry _tablet_ops;
|
||||
// This tracks active topology cmd rpc. There can be only one active
|
||||
// cmd running and by inspecting this structure it can be checked which
|
||||
// cmd is current executing and which nodes are still did not reply.
|
||||
// Needed for debugging.
|
||||
topology_coordinator_cmd_rpc_tracker _topology_cmd_rpc_tracker;
|
||||
struct {
|
||||
raft::term_t term{0};
|
||||
uint64_t last_index{0};
|
||||
@@ -941,6 +947,10 @@ public:
|
||||
// Waits for topology state in which none of tablets has replaced_id as a replica.
|
||||
// Must be called on shard 0.
|
||||
future<> await_tablets_rebuilt(raft::server_id replaced_id);
|
||||
|
||||
topology_coordinator_cmd_rpc_tracker get_topology_cmd_status() {
|
||||
return _topology_cmd_rpc_tracker;
|
||||
}
|
||||
private:
|
||||
// Tracks progress of the upgrade to topology coordinator.
|
||||
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
|
||||
|
||||
@@ -842,7 +842,7 @@ public:
|
||||
db_clock::duration repair_time_diff;
|
||||
};
|
||||
|
||||
std::vector<repair_plan> plans;
|
||||
utils::chunked_vector<repair_plan> plans;
|
||||
auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids();
|
||||
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
||||
auto& tmap = *tmap_;
|
||||
|
||||
@@ -147,6 +147,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
group0_voter_handler _voter_handler;
|
||||
|
||||
topology_coordinator_cmd_rpc_tracker& _topology_cmd_rpc_tracker;
|
||||
|
||||
const locator::token_metadata& get_token_metadata() const noexcept {
|
||||
return *_shared_tm.get();
|
||||
}
|
||||
@@ -389,6 +391,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) {
|
||||
rtlogger.debug("send {} command with term {} and index {} to {}",
|
||||
cmd.cmd, _term, cmd_index, id);
|
||||
_topology_cmd_rpc_tracker.active_dst.emplace(id);
|
||||
auto _ = seastar::defer([this, id] { _topology_cmd_rpc_tracker.active_dst.erase(id); });
|
||||
|
||||
auto result = _db.get_token_metadata().get_topology().is_me(to_host_id(id)) ?
|
||||
co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) :
|
||||
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
|
||||
@@ -403,12 +408,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto id = node.id;
|
||||
release_node(std::move(node));
|
||||
const auto cmd_index = ++_last_cmd_index;
|
||||
_topology_cmd_rpc_tracker.current = cmd.cmd;
|
||||
_topology_cmd_rpc_tracker.index = cmd_index;
|
||||
co_await exec_direct_command_helper(id, cmd_index, cmd);
|
||||
co_return retake_node(co_await start_operation(), id);
|
||||
};
|
||||
|
||||
future<> exec_global_command_helper(auto nodes, const raft_topology_cmd& cmd) {
|
||||
const auto cmd_index = ++_last_cmd_index;
|
||||
_topology_cmd_rpc_tracker.current = cmd.cmd;
|
||||
_topology_cmd_rpc_tracker.index = cmd_index;
|
||||
auto f = co_await coroutine::as_future(
|
||||
seastar::parallel_for_each(std::move(nodes), [this, &cmd, cmd_index] (raft::server_id id) {
|
||||
return exec_direct_command_helper(id, cmd_index, cmd);
|
||||
@@ -1730,6 +1739,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
future<> handle_tablet_resize_finalization(group0_guard g) {
|
||||
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
|
||||
rtlogger.info("handle_tablet_resize_finalization: waiting");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
|
||||
});
|
||||
|
||||
// Executes a global barrier to guarantee that any process (e.g. repair) holding stale version
|
||||
// of token metadata will complete before we update topology.
|
||||
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
|
||||
@@ -2988,7 +3002,8 @@ public:
|
||||
raft_topology_cmd_handler_type raft_topology_cmd_handler,
|
||||
tablet_allocator& tablet_allocator,
|
||||
std::chrono::milliseconds ring_delay,
|
||||
gms::feature_service& feature_service)
|
||||
gms::feature_service& feature_service,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
|
||||
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
|
||||
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
|
||||
, _group0(group0), _topo_sm(topo_sm), _as(as)
|
||||
@@ -3000,6 +3015,7 @@ public:
|
||||
, _ring_delay(ring_delay)
|
||||
, _group0_holder(_group0.hold_group0_gate())
|
||||
, _voter_handler(group0, topo_sm._topology, gossiper, feature_service)
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{}
|
||||
|
||||
@@ -3614,7 +3630,8 @@ future<> run_topology_coordinator(
|
||||
tablet_allocator& tablet_allocator,
|
||||
std::chrono::milliseconds ring_delay,
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service) {
|
||||
gms::feature_service& feature_service,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
|
||||
|
||||
topology_coordinator coordinator{
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
@@ -3622,7 +3639,8 @@ future<> run_topology_coordinator(
|
||||
std::move(raft_topology_cmd_handler),
|
||||
tablet_allocator,
|
||||
ring_delay,
|
||||
feature_service};
|
||||
feature_service,
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
|
||||
@@ -62,6 +62,12 @@ future<> wait_for_gossiper(raft::server_id id, const gms::gossiper& g, seastar::
|
||||
using raft_topology_cmd_handler_type = noncopyable_function<future<raft_topology_cmd_result>(
|
||||
raft::term_t, uint64_t, const raft_topology_cmd&)>;
|
||||
|
||||
struct topology_coordinator_cmd_rpc_tracker {
|
||||
raft_topology_cmd::command current;
|
||||
uint64_t index;
|
||||
std::set<raft::server_id> active_dst;
|
||||
};
|
||||
|
||||
future<> run_topology_coordinator(
|
||||
seastar::sharded<db::system_distributed_keyspace>& sys_dist_ks, gms::gossiper& gossiper,
|
||||
netw::messaging_service& messaging, locator::shared_token_metadata& shared_tm,
|
||||
@@ -71,6 +77,7 @@ future<> run_topology_coordinator(
|
||||
tablet_allocator& tablet_allocator,
|
||||
std::chrono::milliseconds ring_delay,
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service);
|
||||
gms::feature_service& feature_service,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <functional>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "locator/types.hh"
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
@@ -213,6 +214,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
|
||||
.local_dc_rack = locator::endpoint_dc_rack::default_location
|
||||
}
|
||||
});
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, node1_shard_count);
|
||||
|
||||
@@ -280,6 +280,7 @@ void simple_test() {
|
||||
tm_cfg.topo_cfg.this_endpoint = my_address;
|
||||
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
std::vector<ring_point> ring_points = {
|
||||
{ 1.0, inet_address("192.100.10.1") },
|
||||
@@ -363,6 +364,7 @@ void heavy_origin_test() {
|
||||
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); },
|
||||
locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
std::vector<int> dc_racks = {2, 4, 8};
|
||||
std::vector<int> dc_endpoints = {128, 256, 512};
|
||||
@@ -476,6 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
|
||||
|
||||
// Initialize the token_metadata
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
@@ -567,6 +570,7 @@ static void test_random_balancing(sharded<snitch_ptr>& snitch, gms::inet_address
|
||||
|
||||
// Initialize the token_metadata
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
@@ -897,6 +901,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
|
||||
for (size_t run = 0; run < RUNS; ++run) {
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
std::unordered_set<dht::token> random_tokens;
|
||||
while (random_tokens.size() < nodes.size() * VNODES) {
|
||||
@@ -1043,6 +1048,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
auto& topo = tm.get_topology();
|
||||
generate_topology(topo, datacenters, nodes);
|
||||
@@ -1087,6 +1093,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) {
|
||||
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
generate_topology(tm.get_topology(), datacenters, nodes);
|
||||
return make_ready_future();
|
||||
@@ -1122,6 +1129,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
.local_dc_rack = ip1_dc_rack,
|
||||
}
|
||||
});
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
// get_location() should work before any node is added
|
||||
|
||||
@@ -1249,6 +1257,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
||||
|
||||
// Initialize the token_metadata
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
@@ -1401,6 +1410,7 @@ void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
|
||||
|
||||
// Initialize the token_metadata
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
#include <fmt/std.h>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "seastarx.hh"
|
||||
|
||||
#include "service/qos/qos_common.hh"
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
@@ -107,6 +109,7 @@ SEASTAR_THREAD_TEST_CASE(subscriber_simple) {
|
||||
sl_options.shares.emplace<int32_t>(1000);
|
||||
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
|
||||
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
||||
auto stop_tm = deferred_stop(tm);
|
||||
sharded<abort_source> as;
|
||||
as.start().get();
|
||||
auto stop_as = defer([&as] { as.stop().get(); });
|
||||
@@ -180,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) {
|
||||
sl_options.workload = service_level_options::workload_type::interactive;
|
||||
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg1", 1.0).get();
|
||||
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
||||
auto stop_tm = deferred_stop(tm);
|
||||
sharded<abort_source> as;
|
||||
as.start().get();
|
||||
auto stop_as = defer([&as] { as.stop().get(); });
|
||||
@@ -256,6 +260,7 @@ SEASTAR_THREAD_TEST_CASE(add_remove_bad_sequence) {
|
||||
sl_options.shares.emplace<int32_t>(1000);
|
||||
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg3", 1.0).get();
|
||||
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
||||
auto stop_tm = deferred_stop(tm);
|
||||
sharded<abort_source> as;
|
||||
as.start().get();
|
||||
auto stop_as = defer([&as] { as.stop().get(); });
|
||||
@@ -282,6 +287,7 @@ SEASTAR_THREAD_TEST_CASE(verify_unset_shares_in_cache_when_service_level_created
|
||||
sl_options.shares.emplace<int32_t>(1000);
|
||||
scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get();
|
||||
locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
||||
auto stop_tm = deferred_stop(tm);
|
||||
sharded<abort_source> as;
|
||||
|
||||
as.start().get();
|
||||
|
||||
@@ -52,9 +52,11 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
}
|
||||
};
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
|
||||
{
|
||||
// Ring with minimum token
|
||||
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
|
||||
auto tmptr = stm.make_token_metadata_ptr();
|
||||
const auto host_id = locator::host_id{utils::UUID(0, 1)};
|
||||
tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);
|
||||
tmptr->update_normal_tokens(std::unordered_set<dht::token>({dht::minimum_token()}), host_id).get();
|
||||
@@ -69,7 +71,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
|
||||
}
|
||||
|
||||
{
|
||||
auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()});
|
||||
auto tmptr = stm.make_token_metadata_ptr();
|
||||
const auto id1 = locator::host_id{utils::UUID(0, 1)};
|
||||
const auto id2 = locator::host_id{utils::UUID(0, 2)};
|
||||
tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal);
|
||||
|
||||
@@ -799,6 +799,7 @@ SEASTAR_TEST_CASE(test_get_shard) {
|
||||
.local_dc_rack = locator::endpoint_dc_rack::default_location
|
||||
}
|
||||
});
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
tablet_id tid(0);
|
||||
tablet_id tid1(0);
|
||||
@@ -1048,7 +1049,7 @@ SEASTAR_TEST_CASE(test_sharder) {
|
||||
|
||||
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
|
||||
token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
|
||||
tokm.get_topology().add_or_update_endpoint(h1);
|
||||
|
||||
std::vector<tablet_id> tablet_ids;
|
||||
@@ -1263,7 +1264,14 @@ SEASTAR_TEST_CASE(test_intranode_sharding) {
|
||||
|
||||
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
|
||||
|
||||
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
|
||||
locator::token_metadata::config tm_cfg;
|
||||
tm_cfg.topo_cfg.this_host_id = h1;
|
||||
tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location;
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto tmptr = stm.make_token_metadata_ptr();
|
||||
auto& tokm = *tmptr;
|
||||
tokm.get_topology().add_or_update_endpoint(h1);
|
||||
|
||||
auto leaving_replica = tablet_replica{h1, 5};
|
||||
@@ -3606,6 +3614,7 @@ static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_
|
||||
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
||||
tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id;
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
|
||||
// Initialize the token_metadata
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
@@ -31,13 +32,11 @@ namespace {
|
||||
};
|
||||
}
|
||||
|
||||
mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) {
|
||||
return make_lw_shared<token_metadata>(token_metadata::config {
|
||||
topology::config {
|
||||
.this_host_id = this_host_id,
|
||||
.local_dc_rack = get_dc_rack(this_host_id)
|
||||
}
|
||||
});
|
||||
token_metadata::config create_token_metadata_config(host_id this_host_id) {
|
||||
return token_metadata::config{topology::config{
|
||||
.this_host_id = this_host_id,
|
||||
.local_dc_rack = get_dc_rack(this_host_id)
|
||||
}};
|
||||
}
|
||||
|
||||
template <typename Strategy>
|
||||
@@ -55,7 +54,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy
|
||||
const auto t1 = dht::token::from_int64(10);
|
||||
const auto t2 = dht::token::from_int64(20);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_normal_tokens({t1}, e1_id).get();
|
||||
@@ -75,7 +78,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
|
||||
const auto e1_id = gen_id(1);
|
||||
const auto e2_id = gen_id(2);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_normal_tokens({t1}, e1_id).get();
|
||||
@@ -103,7 +110,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
|
||||
const auto e2_id = gen_id(2);
|
||||
const auto e3_id = gen_id(3);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
|
||||
@@ -133,7 +144,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
|
||||
const auto e2_id = gen_id(2);
|
||||
const auto e3_id = gen_id(3);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
|
||||
@@ -165,7 +180,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
|
||||
const auto e3_id = gen_id(3);
|
||||
const auto e4_id = gen_id(4);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
|
||||
@@ -201,7 +220,11 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
|
||||
const auto e2_id = gen_id(2);
|
||||
const auto e3_id = gen_id(3);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal);
|
||||
token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal);
|
||||
token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal);
|
||||
@@ -254,7 +277,11 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
|
||||
const auto e1_id1 = gen_id(1);
|
||||
const auto e1_id2 = gen_id(2);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1_id2);
|
||||
semaphore sem(1);
|
||||
auto tm_cfg = create_token_metadata_config(e1_id2);
|
||||
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
auto token_metadata = stm.make_token_metadata_ptr();
|
||||
token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced);
|
||||
token_metadata->update_normal_tokens({t1}, e1_id1).get();
|
||||
|
||||
|
||||
204
test/cluster/test_batchlog_manager.py
Normal file
204
test/cluster/test_batchlog_manager.py
Normal file
@@ -0,0 +1,204 @@
|
||||
#
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.cluster.util import new_test_keyspace, reconnect_driver, wait_for_cql_and_get_hosts
|
||||
from test.cluster.conftest import skip_mode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_batchlog_replay_while_a_node_is_down(manager: ManagerClient) -> None:
|
||||
""" Test that batchlog replay handles the case when a node is down while replaying a batch.
|
||||
Reproduces issue #24599.
|
||||
1. Create a cluster with 3 nodes.
|
||||
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
|
||||
needs to be replayed.
|
||||
3. Stop server 1.
|
||||
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
|
||||
so it should fail.
|
||||
5. Bring server 1 back up.
|
||||
6. Verify that the batch is replayed and removed from the batchlog eventually.
|
||||
"""
|
||||
|
||||
cmdline=['--logger-log-level', 'batchlog_manager=trace']
|
||||
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
|
||||
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
|
||||
|
||||
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
|
||||
|
||||
# make sure the batch is replayed only after the server is stopped
|
||||
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
|
||||
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
|
||||
try:
|
||||
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
|
||||
except Exception as e:
|
||||
# injected error is expected
|
||||
logger.error(f"Error executing batch: {e}")
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
|
||||
|
||||
await manager.server_stop(servers[1].server_id)
|
||||
|
||||
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
|
||||
assert batchlog_row_count > 0
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
|
||||
|
||||
# The batch is replayed while server 1 is down
|
||||
await s0_log.wait_for('Replaying batch', timeout=60)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Bring server 1 back up and verify that eventually the batch is replayed and removed from the batchlog
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
s0_mark = await s0_log.mark()
|
||||
await s0_log.wait_for('Finished replayAllFailedBatches', timeout=60, from_mark=s0_mark)
|
||||
|
||||
async def batchlog_empty() -> bool:
|
||||
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
|
||||
if batchlog_row_count == 0:
|
||||
return True
|
||||
await wait_for(batchlog_empty, time.time() + 60)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_batchlog_replay_aborted_on_shutdown(manager: ManagerClient) -> None:
|
||||
""" Similar to the previous test, but also verifies that the batchlog replay is aborted on shutdown,
|
||||
and node shutdown is not stuck.
|
||||
1. Create a cluster with 3 nodes.
|
||||
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
|
||||
needs to be replayed.
|
||||
3. Stop server 1.
|
||||
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
|
||||
so it should fail.
|
||||
5. Shut down server 0 gracefully, which should abort the batchlog replay which is in progress.
|
||||
6. Bring server 0 and server 1 back up.
|
||||
6. Verify that the batch is replayed and removed from the batchlog eventually.
|
||||
"""
|
||||
|
||||
cmdline=['--logger-log-level', 'batchlog_manager=trace']
|
||||
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
|
||||
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
|
||||
|
||||
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
|
||||
|
||||
# make sure the batch is replayed only after the server is stopped
|
||||
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
|
||||
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
|
||||
try:
|
||||
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
|
||||
except Exception as e:
|
||||
# injected error is expected
|
||||
logger.error(f"Error executing batch: {e}")
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
|
||||
|
||||
await manager.server_stop(servers[1].server_id)
|
||||
|
||||
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
|
||||
|
||||
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
|
||||
assert batchlog_row_count > 0
|
||||
|
||||
# The batch is replayed while server 1 is down
|
||||
await s0_log.wait_for('Replaying batch', timeout=60)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# verify shutdown is not stuck
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
await manager.server_start(servers[0].server_id)
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
async def batchlog_empty() -> bool:
|
||||
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog", host=hosts[0]))[0].count
|
||||
if batchlog_row_count == 0:
|
||||
return True
|
||||
await wait_for(batchlog_empty, time.time() + 60)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_batchlog_replay_includes_cdc(manager: ManagerClient) -> None:
|
||||
""" Test that when a batch is replayed from the batchlog, it includes CDC mutations.
|
||||
1. Create a cluster with a single node.
|
||||
2. Create a table with CDC enabled.
|
||||
3. Write a batch and inject an error to fail it after it's written to the batchlog but before the mutation is applied.
|
||||
4. Wait for the batch to be replayed.
|
||||
5. Verify that the data is written to the base table.
|
||||
6. Verify that CDC mutations are also applied and visible in the CDC log table.
|
||||
"""
|
||||
|
||||
cmdline = ['--logger-log-level', 'batchlog_manager=trace']
|
||||
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
|
||||
|
||||
servers = await manager.servers_add(1, config=config, cmdline=cmdline)
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks:
|
||||
# Create table with CDC enabled
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c)) WITH cdc = {{'enabled': true}}")
|
||||
|
||||
# Enable error injection to make the batch fail after writing to batchlog
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False)
|
||||
|
||||
# Execute a batch that will fail due to injection but be written to batchlog
|
||||
try:
|
||||
await cql.run_async(
|
||||
"BEGIN BATCH " +
|
||||
f"INSERT INTO {ks}.tab(key, c, v) VALUES (10, 20, 30); " +
|
||||
f"INSERT INTO {ks}.tab(key, c, v) VALUES (40, 50, 60); " +
|
||||
"APPLY BATCH"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info(f"Expected error executing batch: {e}")
|
||||
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog")
|
||||
|
||||
# Wait for data to appear in the base table
|
||||
async def data_written():
|
||||
result1 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 10 AND c = 20")
|
||||
result2 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 40 AND c = 50")
|
||||
if len(result1) > 0 and len(result2) > 0:
|
||||
return True
|
||||
await wait_for(data_written, time.time() + 60)
|
||||
|
||||
# Check that CDC log table exists and has the CDC mutations
|
||||
cdc_table_name = f"{ks}.tab_scylla_cdc_log"
|
||||
|
||||
# Wait for CDC mutations to be visible
|
||||
async def cdc_data_present():
|
||||
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
|
||||
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
|
||||
if len(result1) > 0 and len(result2) > 0:
|
||||
return True
|
||||
await wait_for(cdc_data_present, time.time() + 60)
|
||||
|
||||
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
|
||||
assert len(result1) == 1, f"Expected 1 CDC mutation for key 10, got {len(result1)}"
|
||||
|
||||
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
|
||||
assert len(result2) == 1, f"Expected 1 CDC mutation for key 40, got {len(result2)}"
|
||||
@@ -1088,6 +1088,46 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
|
||||
logger.info("Waiting for migrations to complete")
|
||||
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_split_finalization_with_repair(manager: ManagerClient):
|
||||
injection = "handle_tablet_resize_finalization_wait"
|
||||
cfg = {
|
||||
'enable_tablets': True,
|
||||
'error_injections_at_startup': [
|
||||
injection,
|
||||
"repair_tablets_no_sync",
|
||||
'short_tablet_stats_refresh_interval',
|
||||
]
|
||||
}
|
||||
servers = await manager.servers_add(2, config=cfg)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH compaction = {'class': 'NullCompactionStrategy'};")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
||||
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
|
||||
|
||||
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
||||
marks = [await log.mark() for log in logs]
|
||||
|
||||
logger.info("Trigger split in table")
|
||||
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
|
||||
|
||||
logger.info("Wait for tablets to split")
|
||||
done, pending = await asyncio.wait([asyncio.create_task(log.wait_for('handle_tablet_resize_finalization: waiting', from_mark=mark)) for log, mark in zip(logs, marks)], return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
async def repair():
|
||||
await manager.api.client.post(f"/storage_service/repair_async/test", host=servers[0].ip_addr)
|
||||
|
||||
async def check_repair_waits():
|
||||
await logs[0].wait_for("Topology is busy, waiting for it to quiesce", from_mark=marks[0])
|
||||
await manager.api.message_injection(servers[0].ip_addr, injection)
|
||||
|
||||
await asyncio.gather(repair(), check_repair_waits())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):
|
||||
|
||||
@@ -388,6 +388,71 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks)
|
||||
return tablet_count < old_tablet_count or None
|
||||
await wait_for(finished_merging, time.time() + 120)
|
||||
|
||||
# Reproduces #23284
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2):
|
||||
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
|
||||
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
|
||||
|
||||
servers = []
|
||||
rf = racks
|
||||
for rack_id in range(0, racks):
|
||||
rack = f'rack{rack_id+1}'
|
||||
servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack}))
|
||||
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};")
|
||||
await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, 200)])
|
||||
|
||||
async def check_logs(when):
|
||||
for server in servers:
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
matches = await log.grep("Too long queue accumulated for gossip")
|
||||
if matches:
|
||||
pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}")
|
||||
|
||||
await check_logs("after creating tables")
|
||||
|
||||
total_keys = 400
|
||||
keys = range(total_keys)
|
||||
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
||||
for pk in keys:
|
||||
value = random.randbytes(2000)
|
||||
cql.execute(insert, [pk, value])
|
||||
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
async def finished_splitting():
|
||||
# FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits.
|
||||
# (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled.
|
||||
# Per-table hints (min_tablet_count) can be used to improve this.
|
||||
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
return tablet_count >= 16 or None
|
||||
# Give enough time for split to happen in debug mode
|
||||
await wait_for(finished_splitting, time.time() + 120)
|
||||
|
||||
await check_logs("after split completion")
|
||||
|
||||
delete_keys = range(total_keys - 1)
|
||||
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
||||
keys = range(total_keys - 1, total_keys)
|
||||
|
||||
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
||||
|
||||
async def finished_merging():
|
||||
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
return tablet_count < old_tablet_count or None
|
||||
await wait_for(finished_merging, time.time() + 120)
|
||||
|
||||
await check_logs("after merge completion")
|
||||
|
||||
# Reproduces use-after-free when migration right after merge, but concurrently to background
|
||||
# merge completion handler.
|
||||
# See: https://github.com/scylladb/scylladb/issues/24045
|
||||
|
||||
@@ -45,15 +45,14 @@ public:
|
||||
template <typename T>
|
||||
seastar::future<T> submit(seastar::noncopyable_function<T()> f) {
|
||||
auto p = seastar::promise<T>();
|
||||
auto fut = p.get_future();
|
||||
auto wrapper = [p = std::move(p), f = std::move(f), shard = seastar::this_shard_id(), &alien = seastar::engine().alien()] () mutable noexcept {
|
||||
auto wrapper = [&p, f = std::move(f), shard = seastar::this_shard_id(), &alien = seastar::engine().alien()] () mutable noexcept {
|
||||
try {
|
||||
auto v = f();
|
||||
seastar::alien::run_on(alien, shard, [v = std::move(v), p = std::move(p)] () mutable noexcept {
|
||||
seastar::alien::run_on(alien, shard, [&p, v = std::move(v)] () mutable noexcept {
|
||||
p.set_value(std::move(v));
|
||||
});
|
||||
} catch (...) {
|
||||
seastar::alien::run_on(alien, shard, [p = std::move(p), ep = std::current_exception()] () mutable noexcept {
|
||||
seastar::alien::run_on(alien, shard, [&p, ep = std::current_exception()] () mutable noexcept {
|
||||
p.set_exception(ep);
|
||||
});
|
||||
}
|
||||
@@ -63,7 +62,7 @@ public:
|
||||
_pending.push(std::move(wrapper));
|
||||
}
|
||||
_cv.notify_one();
|
||||
return fut;
|
||||
co_return co_await p.get_future();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user