Compare commits
49 Commits
copilot/co
...
scylla-6.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d90b81766 | ||
|
|
bc0097688f | ||
|
|
69c1a0e2ca | ||
|
|
c382e19e5e | ||
|
|
b786e6a39a | ||
|
|
706761d8ec | ||
|
|
41e4c39087 | ||
|
|
d5bdef9ee5 | ||
|
|
a4dcf3956e | ||
|
|
858fa914b1 | ||
|
|
ec923171a6 | ||
|
|
0144549cd6 | ||
|
|
0f246bfbc9 | ||
|
|
1a1583a5b6 | ||
|
|
f78b88b59b | ||
|
|
73d46ec548 | ||
|
|
dcee7839d4 | ||
|
|
75477f5661 | ||
|
|
78d7c953b0 | ||
|
|
753fc87efa | ||
|
|
c75dbc1f9c | ||
|
|
96e5ebe28c | ||
|
|
c45e92142e | ||
|
|
d69f0e529a | ||
|
|
86ff3c2aa3 | ||
|
|
efac73109e | ||
|
|
8c975712d3 | ||
|
|
1fdfe11bb0 | ||
|
|
58c06819d7 | ||
|
|
5b604509ce | ||
|
|
abbf0b24a6 | ||
|
|
347857e5e5 | ||
|
|
cd2ca5ef57 | ||
|
|
5a4065ecd5 | ||
|
|
ed4f2ecca4 | ||
|
|
8f80a84e93 | ||
|
|
95abb6d4a7 | ||
|
|
30b0cb4f5d | ||
|
|
97ae704f99 | ||
|
|
738e4c3681 | ||
|
|
ee74fe4e0e | ||
|
|
b2ea946837 | ||
|
|
92e725c467 | ||
|
|
e57d48253f | ||
|
|
47df9f9b05 | ||
|
|
193dc87bd0 | ||
|
|
11d1950957 | ||
|
|
6317325ed5 | ||
|
|
14222ad205 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.1.0-dev
|
||||
VERSION=6.1.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "types/types.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace alternator {
|
||||
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
|
||||
if (!salted_hash_col) {
|
||||
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
|
||||
if (!salted_hash_col || !can_login_col) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
|
||||
}
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
if (result_set->empty()) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
|
||||
const auto& result = result_set->rows().front();
|
||||
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
|
||||
if (!can_login) {
|
||||
// This is a valid role name, but has "login=False" so should not be
|
||||
// usable for authentication (see #19735).
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result.front();
|
||||
if (!salted_hash) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include "alternator/executor.hh"
|
||||
#include "cdc/log.hh"
|
||||
#include "db/config.hh"
|
||||
#include "log.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
@@ -4439,8 +4440,10 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
|
||||
auto tables = _proxy.data_dictionary().get_tables(); // hold on to temporary, table_names isn't a container, it's a view
|
||||
auto table_names = tables
|
||||
| boost::adaptors::filtered([] (data_dictionary::table t) {
|
||||
return t.schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 && !t.schema()->is_view();
|
||||
| boost::adaptors::filtered([this] (data_dictionary::table t) {
|
||||
return t.schema()->ks_name().find(KEYSPACE_NAME_PREFIX) == 0 &&
|
||||
!t.schema()->is_view() &&
|
||||
!cdc::is_log_for_some_table(_proxy.local_db(), t.schema()->ks_name(), t.schema()->cf_name());
|
||||
})
|
||||
| boost::adaptors::transformed([] (data_dictionary::table t) {
|
||||
return t.schema()->cf_name();
|
||||
|
||||
@@ -211,7 +211,10 @@ protected:
|
||||
sstring local_dc = topology.get_datacenter();
|
||||
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
|
||||
for (auto& ip : local_dc_nodes) {
|
||||
if (_gossiper.is_alive(ip)) {
|
||||
// Note that it's not enough for the node to be is_alive() - a
|
||||
// node joining the cluster is also "alive" but not responsive to
|
||||
// requests. We need the node to be in normal state. See #19694.
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
// Use the gossiped broadcast_rpc_address if available instead
|
||||
// of the internal IP address "ip". See discussion in #18711.
|
||||
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
|
||||
|
||||
@@ -194,6 +194,21 @@
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/system/highest_supported_sstable_version",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get highest supported sstable version",
|
||||
"type":"string",
|
||||
"nickname":"get_highest_supported_sstable_version",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "api/api-doc/system.json.hh"
|
||||
#include "api/api-doc/metrics.json.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
|
||||
#include <rapidjson/document.h>
|
||||
#include <seastar/core/reactor.hh>
|
||||
@@ -182,6 +183,11 @@ void set_system(http_context& ctx, routes& r) {
|
||||
apilog.info("Profile dumped to {}", profile_dest);
|
||||
return make_ready_future<json::json_return_type>(json::json_return_type(json::json_void()));
|
||||
}) ;
|
||||
|
||||
hs::get_highest_supported_sstable_version.set(r, [&ctx] (const_req req) {
|
||||
auto& table = ctx.db.local().find_column_family("system", "local");
|
||||
return seastar::to_sstring(table.get_sstables_manager().get_highest_supported_format());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ static future<> announce_mutations_with_guard(
|
||||
::service::raft_group0_client& group0_client,
|
||||
std::vector<canonical_mutation> muts,
|
||||
::service::group0_guard group0_guard,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_cmd = group0_client.prepare_command(
|
||||
::service::write_mutations{
|
||||
@@ -137,7 +137,7 @@ future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
// account for command's overhead, it's better to use smaller threshold than constantly bounce off the limit
|
||||
size_t memory_threshold = group0_client.max_command_size() * 0.75;
|
||||
@@ -188,7 +188,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_guard = co_await group0_client.start_operation(as, timeout);
|
||||
auto timestamp = group0_guard.write_timestamp();
|
||||
|
||||
@@ -80,7 +80,7 @@ future<> create_legacy_metadata_table_if_missing(
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
// Use this function when need to perform read before write on a single guard or if
|
||||
// you have more than one mutation and potentially exceed single command size limit.
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source*)>;
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source&)>;
|
||||
future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
// since we can operate also in topology coordinator context where we need stronger
|
||||
@@ -88,7 +88,7 @@ future<> announce_mutations_with_batching(
|
||||
// function here
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
@@ -97,7 +97,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Appends mutations to a collector, they will be applied later on all nodes via group0 mechanism.
|
||||
|
||||
@@ -136,7 +136,7 @@ future<> password_authenticator::create_default_if_missing() {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query,
|
||||
{salted_pwd, _superuser}, &_as, ::service::raft_timeout{});
|
||||
{salted_pwd, _superuser}, _as, ::service::raft_timeout{});
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,7 +681,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
co_await announce_mutations_with_batching(g0,
|
||||
start_operation_func,
|
||||
std::move(gen),
|
||||
&as,
|
||||
as,
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, &_as, ::service::raft_timeout{});
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, _as, ::service::raft_timeout{});
|
||||
}
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
|
||||
@@ -167,6 +167,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
|
||||
return io_check([name = _hints_dir.c_str()] { return recursive_touch_directory(name); }).then([this] () {
|
||||
commitlog::config cfg;
|
||||
|
||||
cfg.sched_group = _shard_manager.local_db().commitlog()->active_config().sched_group;
|
||||
cfg.commit_log_location = _hints_dir.c_str();
|
||||
cfg.commitlog_segment_size_in_mb = resource_manager::hint_segment_size_in_mb;
|
||||
cfg.commitlog_total_space_in_mb = resource_manager::max_hints_per_ep_size_mb;
|
||||
|
||||
@@ -1673,7 +1673,22 @@ get_view_natural_endpoint(
|
||||
return {};
|
||||
}
|
||||
auto replica = view_endpoints[base_it - base_endpoints.begin()];
|
||||
return view_topology.get_node(replica).endpoint();
|
||||
|
||||
// https://github.com/scylladb/scylladb/issues/19439
|
||||
// With tablets, a node being replaced might transition to "left" state
|
||||
// but still be kept as a replica. In such case, the IP of the replaced
|
||||
// node will be lost and `endpoint()` will return an empty IP here.
|
||||
// As of writing this, storage proxy was not migrated to host IDs yet
|
||||
// (#6403) and hints are not prepared to handle nodes that are left
|
||||
// but are still replicas. Therefore, there is no other sensible option
|
||||
// right now but to give up attempt to send the update or write a hint
|
||||
// to the paired, permanently down replica.
|
||||
const auto ep = view_topology.get_node(replica).endpoint();
|
||||
if (ep != gms::inet_address{}) {
|
||||
return ep;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
|
||||
2
dist/common/scripts/scylla_raid_setup
vendored
2
dist/common/scripts/scylla_raid_setup
vendored
@@ -325,6 +325,8 @@ WantedBy=local-fs.target
|
||||
os.chown(dpath, uid, gid)
|
||||
|
||||
if is_debian_variant():
|
||||
if not shutil.which('update-initramfs'):
|
||||
pkg_install('initramfs-tools')
|
||||
run('update-initramfs -u', shell=True, check=True)
|
||||
|
||||
if not udev_info.uuid_link:
|
||||
|
||||
@@ -123,10 +123,6 @@ the secret key is the `salted_hash`, i.e., the secret key can be found by
|
||||
|
||||
<!--- REMOVE IN FUTURE VERSIONS - Remove the note below in version 6.1 -->
|
||||
|
||||
(Note: If you upgraded from version 5.4 to version 6.0 without
|
||||
[enabling consistent topology updates](../upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.rst),
|
||||
the table name is `system_auth.roles`.)
|
||||
|
||||
By default, authorization is not enforced at all. It can be turned on
|
||||
by providing an entry in Scylla configuration:
|
||||
`alternator_enforce_authorization: true`
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
If you upgraded from 5.4, you must perform a manual action in order to enable
|
||||
consistent topology changes.
|
||||
See :doc:`the guide for enabling consistent topology changes</upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>` for more details.
|
||||
@@ -60,9 +60,8 @@ In summary, Raft makes schema changes safe, but it requires that a quorum of nod
|
||||
Verifying that the Raft upgrade procedure finished successfully
|
||||
========================================================================
|
||||
|
||||
You may need to perform the following procedure on upgrade if you explicitly
|
||||
disabled the Raft-based schema changes feature in the previous ScyllaDB
|
||||
version. Please consult the upgrade guide.
|
||||
You may need to perform the following procedure as part of
|
||||
the :ref:`manual recovery procedure <recovery-procedure>`.
|
||||
|
||||
The Raft upgrade procedure requires **full cluster availability** to correctly setup the Raft algorithm; after the setup finishes, Raft can proceed with only a majority of nodes, but this initial setup is an exception.
|
||||
An unlucky event, such as a hardware failure, may cause one of your nodes to fail. If this happens before the Raft upgrade procedure finishes, the procedure will get stuck and your intervention will be required.
|
||||
@@ -173,8 +172,6 @@ gossip-based topology.
|
||||
|
||||
The feature is automatically enabled in new clusters.
|
||||
|
||||
.. scylladb_include_flag:: consistent-topology-with-raft-upgrade-info.rst
|
||||
|
||||
Verifying that Raft is Enabled
|
||||
----------------------------------
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ The following options are available for all compaction strategies.
|
||||
=====
|
||||
|
||||
``tombstone_compaction_interval`` (default: 86400s (1 day))
|
||||
An SSTable that is suitable for single SSTable compaction, according to tombstone_threshold will not be compacted if it is newer than tombstone_compaction_interval.
|
||||
*tombstone_compaction_interval* is lower-bound for when a new tombstone compaction can start. If an SSTable was compacted at a time *X*, the earliest time it will be considered for tombstone compaction again is *X + tombstone_compaction_interval*. This does not guarantee that sstables will be considered for compaction immediately after tombstone_compaction_interval time has elapsed after the last compaction.
|
||||
|
||||
=====
|
||||
|
||||
|
||||
@@ -6,9 +6,9 @@ You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 5.4 | |v| | |v| | |x| | |v| | |v| | |v| |
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
|
||||
54
docs/getting-started/_common/setup-after-install.rst
Normal file
54
docs/getting-started/_common/setup-after-install.rst
Normal file
@@ -0,0 +1,54 @@
|
||||
Configure and Run ScyllaDB
|
||||
-------------------------------
|
||||
|
||||
#. Configure the following parameters in the ``/etc/scylla/scylla.yaml`` configuration file.
|
||||
|
||||
* ``cluster_name`` - The name of the cluster. All the nodes in the cluster must have the same
|
||||
cluster name configured.
|
||||
* ``seeds`` - The IP address of the first node. Other nodes will use it as the first contact
|
||||
point to discover the cluster topology when joining the cluster.
|
||||
* ``listen_address`` - The IP address that ScyllaDB uses to connect to other nodes in the cluster.
|
||||
* ``rpc_address`` - The IP address of the interface for CQL client connections.
|
||||
|
||||
#. Run the ``scylla_setup`` script to tune the system settings and determine the optimal configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo scylla_setup
|
||||
|
||||
* The script invokes a set of :ref:`scripts <system-configuration-scripts>` to configure several operating system settings; for example, it sets
|
||||
RAID0 and XFS filesystem.
|
||||
* The script runs a short (up to a few minutes) benchmark on your storage and generates the ``/etc/scylla.d/io.conf``
|
||||
configuration file. When the file is ready, you can start ScyllaDB. ScyllaDB will not run without XFS
|
||||
or ``io.conf`` file.
|
||||
* You can bypass this check by running ScyllaDB in :doc:`developer mode </getting-started/installation-common/dev-mod>`.
|
||||
We recommend against enabling developer mode in production environments to ensure ScyllaDB's maximum performance.
|
||||
|
||||
#. Run ScyllaDB as a service (if not already running).
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
|
||||
Now you can start using ScyllaDB. Here are some tools you may find useful.
|
||||
|
||||
Run nodetool:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool status
|
||||
|
||||
Run cqlsh:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cqlsh
|
||||
|
||||
Run cassandra-stress:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cassandra-stress write -mode cql3 native
|
||||
|
||||
|
||||
@@ -154,59 +154,7 @@ Install ScyllaDB
|
||||
sudo yum install scylla-5.2.3
|
||||
|
||||
|
||||
Configure and Run ScyllaDB
|
||||
-------------------------------
|
||||
|
||||
#. Configure the following parameters in the ``/etc/scylla/scylla.yaml`` configuration file.
|
||||
|
||||
* ``cluster_name`` - The name of the cluster. All the nodes in the cluster must have the same
|
||||
cluster name configured.
|
||||
* ``seeds`` - The IP address of the first node. Other nodes will use it as the first contact
|
||||
point to discover the cluster topology when joining the cluster.
|
||||
* ``listen_address`` - The IP address that ScyllaDB uses to connect to other nodes in the cluster.
|
||||
* ``rpc_address`` - The IP address of the interface for CQL client connections.
|
||||
|
||||
#. Run the ``scylla_setup`` script to tune the system settings and determine the optimal configuration.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo scylla_setup
|
||||
|
||||
* The script invokes a set of :ref:`scripts <system-configuration-scripts>` to configure several operating system settings; for example, it sets
|
||||
RAID0 and XFS filesystem.
|
||||
* The script runs a short (up to a few minutes) benchmark on your storage and generates the ``/etc/scylla.d/io.conf``
|
||||
configuration file. When the file is ready, you can start ScyllaDB. ScyllaDB will not run without XFS
|
||||
or ``io.conf`` file.
|
||||
* You can bypass this check by running ScyllaDB in :doc:`developer mode </getting-started/installation-common/dev-mod>`.
|
||||
We recommend against enabling developer mode in production environments to ensure ScyllaDB's maximum performance.
|
||||
|
||||
#. Run ScyllaDB as a service (if not already running).
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
|
||||
Now you can start using ScyllaDB. Here are some tools you may find useful.
|
||||
|
||||
Run nodetool:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool status
|
||||
|
||||
Run cqlsh:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cqlsh
|
||||
|
||||
Run cassandra-stress:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
cassandra-stress write -mode cql3 native
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
|
||||
Next Steps
|
||||
------------
|
||||
|
||||
@@ -12,7 +12,7 @@ Prerequisites
|
||||
Ensure that your platform is supported by the ScyllaDB version you want to install.
|
||||
See :doc:`OS Support by Platform and Version </getting-started/os-support/>`.
|
||||
|
||||
Installing ScyllaDB with Web Installer
|
||||
Install ScyllaDB with Web Installer
|
||||
---------------------------------------
|
||||
To install ScyllaDB with Web Installer, run:
|
||||
|
||||
@@ -40,22 +40,24 @@ options to install a different version or ScyllaDB Enterprise:
|
||||
You can run the command with the ``-h`` or ``--help`` flag to print information about the script.
|
||||
|
||||
Examples
|
||||
---------
|
||||
===========
|
||||
|
||||
Installing ScyllaDB Open Source 4.6.1:
|
||||
Installing ScyllaDB Open Source 6.0.1:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 4.6.1
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0.1
|
||||
|
||||
Installing the latest patch release for ScyllaDB Open Source 4.6:
|
||||
Installing the latest patch release for ScyllaDB Open Source 6.0:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 4.6
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0
|
||||
|
||||
Installing ScyllaDB Enterprise 2021.1:
|
||||
Installing ScyllaDB Enterprise 2024.1:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2021.1
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2024.1
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
@@ -1,7 +1,10 @@
|
||||
.. note::
|
||||
|
||||
This page only applies to clusters where consistent topology updates are not enabled.
|
||||
This page only applies to clusters where consistent topology updates are not enabled.
|
||||
Consistent topology updates are mandatory, so **this page serves troubleshooting purposes**.
|
||||
|
||||
The page does NOT apply if you:
|
||||
|
||||
* Created a cluster with ScyllaDB 6.0 (consistent topology updates are automatically enabled).
|
||||
* Upgraded from ScyllaDB 5.4 and :doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
* Created a cluster with ScyllaDB 6.0 or later (consistent topology updates are automatically enabled).
|
||||
* `Manually enabled consistent topology updates <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
after upgrading to 6.0 or before upgrading to 6.1 (required).
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
(Note: If you upgraded from version 5.4 without
|
||||
:doc:`enabling consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`,
|
||||
you must additionally alter the ``system_auth`` keyspace.)
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <add-dc-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <add-new-node-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <remove-node-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <replace-node-upgrade-info>`.
|
||||
@@ -1,24 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* You can only bootstrap one node at a time. You need to wait until the status
|
||||
of one new node becomes UN (Up Normal) before adding another new node.
|
||||
* If the node starts bootstrapping but fails in the middle, for example, due to
|
||||
a power loss, you can retry bootstrap by restarting the node. If you don't want to
|
||||
retry, or the node refuses to boot on subsequent attempts, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
* The ``system_auth`` keyspace has not been upgraded to ``system``.
|
||||
As a result, if ``authenticator`` is set to ``PasswordAuthenticator``, you must
|
||||
increase the replication factor of the ``system_auth`` keyspace. It is
|
||||
recommended to set ``system_auth`` replication factor to the number of nodes
|
||||
in each DC.
|
||||
@@ -1,21 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* It’s essential to ensure the removed node will **never** come back to the cluster,
|
||||
which might adversely affect your data (data resurrection/loss). To prevent the removed
|
||||
node from rejoining the cluster, remove that node from the cluster network or VPC.
|
||||
* You can only remove one node at a time. You need to verify that the node has
|
||||
been removed before removing another one.
|
||||
* If ``nodetool decommission`` starts executing but fails in the middle, for example,
|
||||
due to a power loss, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
@@ -1,23 +0,0 @@
|
||||
|
||||
----------------------------
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must consider the following
|
||||
limitations while applying the procedure:
|
||||
|
||||
* It’s essential to ensure the replaced (dead) node will never come back to the cluster,
|
||||
which might lead to a split-brain situation. Remove the replaced (dead) node from
|
||||
the cluster network or VPC.
|
||||
* You can only replace one node at a time. You need to wait until the status
|
||||
of the new node becomes UN (Up Normal) before replacing another new node.
|
||||
* If the new node starts and begins the replace operation but then fails in the middle,
|
||||
for example, due to a power loss, you can retry the replace by restarting the node.
|
||||
If you don’t want to retry, or the node refuses to boot on subsequent attempts, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
@@ -1,8 +1,6 @@
|
||||
Adding a New Data Center Into an Existing ScyllaDB Cluster
|
||||
***********************************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-add-new-dc.rst
|
||||
|
||||
The following procedure specifies how to add a Data Center (DC) to a live ScyllaDB Cluster, in a single data center, :ref:`multi-availability zone <faq-best-scenario-node-multi-availability-zone>`, or multi-datacenter. Adding a DC out-scales the cluster and provides higher availability (HA).
|
||||
|
||||
The procedure includes:
|
||||
@@ -164,8 +162,6 @@ Add New DC
|
||||
* Keyspace created by the user (which needed to replicate to the new DC).
|
||||
* System: ``system_distributed``, ``system_traces``, for example, replicate the data to three nodes in the new DC.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-alter-info.rst
|
||||
|
||||
For example:
|
||||
|
||||
Before
|
||||
@@ -234,7 +230,3 @@ Additional Resources for Java Clients
|
||||
* `DCAwareRoundRobinPolicy.Builder <https://java-driver.docs.scylladb.com/scylla-3.10.2.x/api/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.Builder.html>`_
|
||||
* `DCAwareRoundRobinPolicy <https://java-driver.docs.scylladb.com/scylla-3.10.2.x/api/com/datastax/driver/core/policies/DCAwareRoundRobinPolicy.html>`_
|
||||
|
||||
|
||||
.. _add-dc-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-add-new-node-or-dc.rst
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
Adding a New Node Into an Existing ScyllaDB Cluster (Out Scale)
|
||||
=================================================================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-add-new-node.rst
|
||||
|
||||
When you add a new node, other nodes in the cluster stream data to the new node. This operation is called bootstrapping and may
|
||||
be time-consuming, depending on the data size and network bandwidth. If using a :ref:`multi-availability-zone <faq-best-scenario-node-multi-availability-zone>`, make sure they are balanced.
|
||||
|
||||
@@ -100,7 +98,3 @@ Procedure
|
||||
|
||||
#. If you are using ScyllaDB Monitoring, update the `monitoring stack <https://monitoring.docs.scylladb.com/stable/install/monitoring_stack.html#configure-scylla-nodes-from-files>`_ to monitor it. If you are using ScyllaDB Manager, make sure you install the `Manager Agent <https://manager.docs.scylladb.com/stable/install-scylla-manager-agent.html>`_, and Manager can access it.
|
||||
|
||||
|
||||
.. _add-new-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-add-new-node-or-dc.rst
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
Remove a Node from a ScyllaDB Cluster (Down Scale)
|
||||
***************************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-remove-node.rst
|
||||
|
||||
You can remove nodes from your cluster to reduce its size.
|
||||
|
||||
-----------------------
|
||||
@@ -83,10 +81,6 @@ the ``nodetool removenode`` operation will fail. To ensure successful operation
|
||||
``nodetool removenode`` (not required when :doc:`Repair Based Node Operations (RBNO) <repair-based-node-operation>` for ``removenode``
|
||||
is enabled).
|
||||
|
||||
.. _remove-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-remove-node.rst
|
||||
|
||||
Additional Information
|
||||
----------------------
|
||||
* :doc:`Nodetool Reference </operating-scylla/nodetool>`
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
Replace a Dead Node in a ScyllaDB Cluster
|
||||
******************************************
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-replace-node.rst
|
||||
|
||||
Replace dead node operation will cause the other nodes in the cluster to stream data to the node that was replaced. This operation can take some time (depending on the data size and network bandwidth).
|
||||
|
||||
This procedure is for replacing one dead node. You can replace more than one dead node in parallel.
|
||||
@@ -194,7 +192,3 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
|
||||
|
||||
Sometimes the public/ private IP of instance is changed after restart. If so refer to the Replace Procedure_ above.
|
||||
|
||||
|
||||
.. _replace-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-replace-node.rst
|
||||
|
||||
@@ -23,8 +23,6 @@ Alter the following:
|
||||
* Keyspace created by the user.
|
||||
* System: ``system_distributed``, ``system_traces``.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-alter-info.rst
|
||||
|
||||
For example:
|
||||
|
||||
Before
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <authentication-upgrade-info>`.
|
||||
@@ -1,3 +0,0 @@
|
||||
.. note::
|
||||
|
||||
If you upgraded your cluster from version 5.4, see :ref:`After Upgrading from 5.4 <runtime-authentication-upgrade-info>`.
|
||||
@@ -1,20 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
----------------------------
|
||||
|
||||
The procedure described above applies to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must take additional steps
|
||||
to enable authentication:
|
||||
|
||||
* Before you start the procedure, set the ``system_auth`` keyspace replication factor
|
||||
to the number of nodes in the datacenter via cqlsh. It allows you to ensure that
|
||||
the user's information is kept highly available for the cluster. If ``system_auth``
|
||||
is not equal to the number of nodes and a node fails, the user whose information
|
||||
is on that node will be denied access.
|
||||
* After you start cqlsh with the default superuser username and password, run
|
||||
a repair on the ``system_auth`` keyspace on all the nodes in the cluster, for example:
|
||||
``nodetool repair -pr system_auth``
|
||||
@@ -1,20 +0,0 @@
|
||||
|
||||
After Upgrading from 5.4
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The procedures described above apply to clusters where consistent topology updates
|
||||
are enabled. The feature is automatically enabled in new clusters.
|
||||
|
||||
If you've upgraded an existing cluster from version 5.4, ensure that you
|
||||
:doc:`manually enabled consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Without consistent topology updates enabled, you must take additional steps
|
||||
to enable or disable authentication without downtime:
|
||||
|
||||
* Before you enable authentication without downtime, set the ``system_auth``
|
||||
keyspace replication factor to the number of nodes in the datacenter via cqlsh.
|
||||
It allows you to ensure that the user's information is kept highly available
|
||||
for the cluster. If ``system_auth`` is not equal to the number of nodes and
|
||||
a node fails, the user whose information is on that node will be denied access.
|
||||
* After you restart the nodes when you enable or disable authentication without
|
||||
downtime, run repair on the ``system_auth`` keyspace, one node at a time on
|
||||
all the nodes in the cluster.
|
||||
@@ -1,8 +1,6 @@
|
||||
Enable Authentication
|
||||
=====================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-authentication.rst
|
||||
|
||||
Authentication is the process where login accounts and their passwords are verified, and the user is allowed access to the database. Authentication is done internally within ScyllaDB and is not done with a third party. Users and passwords are created with roles using a ``CREATE ROLE`` statement. Refer to :doc:`Grant Authorization CQL Reference </operating-scylla/security/authorization>` for details.
|
||||
|
||||
The procedure described below enables Authentication on the ScyllaDB servers. It is intended to be used when you do **not** have applications running with ScyllaDB/Cassandra drivers.
|
||||
@@ -39,10 +37,6 @@ Procedure
|
||||
|
||||
#. If you want to create users and roles, continue to :doc:`Enable Authorization </operating-scylla/security/enable-authorization>`.
|
||||
|
||||
.. _authentication-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-authentication.rst
|
||||
|
||||
Additional Resources
|
||||
--------------------
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
Enable and Disable Authentication Without Downtime
|
||||
==================================================
|
||||
|
||||
.. scylladb_include_flag:: upgrade-note-runtime-authentication.rst
|
||||
|
||||
Authentication is the process where login accounts and their passwords are verified, and the user is allowed access into the database. Authentication is done internally within ScyllaDB and is not done with a third party. Users and passwords are created with :doc:`roles </operating-scylla/security/authorization>` using a ``CREATE ROLE`` statement. This procedure enables Authentication on the ScyllaDB servers using a transit state, allowing clients to work with or without Authentication at the same time. In this state, you can update the clients (application using ScyllaDB/Apache Cassandra drivers) one at the time. Once all the clients are using Authentication, you can enforce Authentication on all ScyllaDB nodes as well. If you would rather perform a faster authentication procedure where all clients (application using ScyllaDB/Apache Cassandra drivers) will stop working until they are updated to work with Authentication, refer to :doc:`Enable Authentication </operating-scylla/security/runtime-authentication>`.
|
||||
|
||||
|
||||
@@ -108,6 +106,3 @@ Procedure
|
||||
|
||||
#. Verify that all the client applications are working correctly with authentication disabled.
|
||||
|
||||
.. _runtime-authentication-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-runtime-authentication.rst
|
||||
@@ -1 +1 @@
|
||||
Perform :doc:`the procedure for enabling consistent topology changes </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`.
|
||||
Perform `the procedure for enabling consistent topology changes <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_.
|
||||
@@ -1,3 +1,3 @@
|
||||
:ref:`The Raft upgrade procedure <verify-raft-procedure>`
|
||||
or :doc:`the procedure for enabling consistent topology changes</upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`
|
||||
or `the procedure for enabling consistent topology changes <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
got stuck because one of the nodes failed in the middle of the procedure and is irrecoverable.
|
||||
@@ -1,3 +0,0 @@
|
||||
(Note: If you upgraded from version 5.4 without
|
||||
:doc:`enabling consistent topology updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`,
|
||||
the keyspace name is ``system_auth``.)
|
||||
@@ -4,8 +4,6 @@ Reset Authenticator Password
|
||||
This procedure describes what to do when a user loses his password and can not reset it with a superuser role.
|
||||
The procedure requires cluster downtime and as a result, all auth data is deleted.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-name-info.rst
|
||||
|
||||
Procedure
|
||||
.........
|
||||
|
||||
|
||||
@@ -5,12 +5,12 @@ Upgrade ScyllaDB Open Source
|
||||
.. toctree::
|
||||
:hidden:
|
||||
|
||||
ScyllaDB 5.4 to 6.0 <upgrade-guide-from-5.4-to-6.0/index>
|
||||
ScyllaDB 6.0 to 6.1 <upgrade-guide-from-6.0-to-6.1/index>
|
||||
ScyllaDB 6.x Maintenance Upgrade <upgrade-guide-from-6.x.y-to-6.x.z>
|
||||
|
||||
Procedures for upgrading to a newer version of ScyllaDB Open Source.
|
||||
|
||||
* :doc:`ScyllaDB 5.4 to 6.0 <upgrade-guide-from-5.4-to-6.0/index>`
|
||||
* :doc:`ScyllaDB 6.0 to 6.1 <upgrade-guide-from-6.0-to-6.1/index>`
|
||||
* :doc:`ScyllaDB 6.x Maintenance Upgrade <upgrade-guide-from-6.x.y-to-6.x.z>`
|
||||
|
||||
|
||||
@@ -1,113 +0,0 @@
|
||||
=====================================
|
||||
Enable Consistent Topology Updates
|
||||
=====================================
|
||||
|
||||
This article explains how to enable consistent topology changes
|
||||
when you upgrade from version 5.4 to 6.0.
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
ScyllaDB Open Source 6.0 introduces :ref:`consistent topology changes based on Raft <raft-topology-changes>`.
|
||||
Newly created clusters use consistent topology changes right from the start. However - unlike in the case
|
||||
of schema managed on Raft - consistent topology changes are *not* automatically enabled after the cluster
|
||||
was upgraded from an older version of ScyllaDB. If you have such a cluster, then you need to enable
|
||||
consistent topology changes manually with a dedicated upgrade procedure.
|
||||
|
||||
Before running the procedure, you **must** check that the cluster meets some prerequisites
|
||||
and you **must** ensure that some administrative procedures will not be run
|
||||
while the upgrade procedure is in progress.
|
||||
|
||||
.. _enable-raft-topology-6.0-prerequisites:
|
||||
|
||||
Prerequisites
|
||||
=============
|
||||
|
||||
* Make sure that all nodes in the cluster are upgraded to ScyllaDB Open Source 6.0.
|
||||
* Verify that :ref:`schema on raft is enabled <schema-on-raft-enabled>`.
|
||||
* Make sure that all nodes enabled ``SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`` cluster feature.
|
||||
One way to verify this is to look for the following message in the log:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
features - Feature SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES is enabled
|
||||
|
||||
Alternatively, this can be verified programmatically by checking whether ``value`` column under the key ``enabled_features`` contains the name of the feature in the ``system.scylla_local`` table.
|
||||
For example, this can be done with the following bash script:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
until cqlsh -e "select value from system.scylla_local where key = 'enabled_features'" | grep "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"
|
||||
do
|
||||
echo "Upgrade didn't finish yet on the local node, waiting 10 seconds before checking again..."
|
||||
sleep 10
|
||||
done
|
||||
echo "Upgrade completed on the local node"
|
||||
|
||||
* Make sure that all nodes are alive for the duration of the upgrade.
|
||||
|
||||
.. _enable-raft-topology-6.0-forbidden-operations:
|
||||
|
||||
Administrative operations which must not be running during upgrade
|
||||
==================================================================
|
||||
|
||||
Make sure that administrative operations will not be running while upgrade is in progress.
|
||||
In particular, you must abstain from:
|
||||
|
||||
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>` (adding, replacing, removing, decommissioning nodes etc.).
|
||||
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
|
||||
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
|
||||
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.
|
||||
* Any change of authorization via :doc:`CQL API </operating-scylla/security/authorization>`.
|
||||
* Doing schema changes.
|
||||
|
||||
Running the procedure
|
||||
=====================
|
||||
|
||||
.. warning::
|
||||
|
||||
Before proceeding, make sure that all the :ref:`prerequisites <enable-raft-topology-6.0-prerequisites>` are met
|
||||
and no :ref:`forbidden administrative operations <enable-raft-topology-6.0-forbidden-operations>` will run
|
||||
during upgrade. Failing to do so may put the cluster in an inconsistent state.
|
||||
|
||||
Starting the upgrade procedure is done by issuing an POST HTTP request to the ``/storage_service/raft_topology/upgrade`` endpoint,
|
||||
to any of the nodes in the cluster.
|
||||
|
||||
For example, you can do it via ``curl``, like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X POST "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
Next, wait until all nodes report that upgrade is complete. You can check that a single node finished upgrade in one of two ways:
|
||||
|
||||
* By sending a HTTP ``GET`` request on the ``/storage_service/raft_topology/upgrade`` endpoint. For example, you can do it with ``curl`` like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
curl -X GET "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
|
||||
|
||||
It will return a JSON string which will be equal to ``done`` after the upgrade is complete on this node.
|
||||
|
||||
* By querying the ``upgrade_state`` column in the ``system.topology`` table. You can use ``cqlsh`` to get the value of the column like this:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
cqlsh -e "select upgrade_state from system.topology"
|
||||
|
||||
The ``upgrade_state`` column should be set to ``done`` after the upgrade is complete on this node:
|
||||
|
||||
After the upgrade is complete on all nodes, wait at least one minute before issuing any topology changes in order to avoid data loss from writes that were started before the upgrade.
|
||||
|
||||
What if upgrade gets stuck?
|
||||
===========================
|
||||
|
||||
If the process gets stuck at some point, first check the status of your cluster:
|
||||
|
||||
- If there are some nodes that are not alive, try to restart them.
|
||||
- If all nodes are alive, ensure that the network is healthy and every node can reach all other nodes.
|
||||
- If all nodes are alive and the network is healthy, perform a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart/>` of the cluster.
|
||||
|
||||
If none of the above solves the issue, perform :ref:`the Raft recovery procedure <recovery-procedure>`.
|
||||
During recovery, the cluster will switch back to gossip-based topology management mechanism.
|
||||
After exiting recovery, you should upgrade the cluster to consistent topology updates using the procedure described in this document.
|
||||
@@ -1,16 +0,0 @@
|
||||
=====================================
|
||||
ScyllaDB 5.4 to 6.0 Upgrade Guide
|
||||
=====================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-5.4-to-6.0-generic>
|
||||
Enable Consistent Topology Updates <enable-consistent-topology.rst>
|
||||
Metrics Update <metric-update-5.4-to-6.0>
|
||||
|
||||
|
||||
* :doc:`Upgrade ScyllaDB from 5.4.x to 6.0.y <upgrade-guide-from-5.4-to-6.0-generic>`
|
||||
* :doc:`Enable Consistent Topology Updates <enable-consistent-topology>`
|
||||
* :doc:`ScyllaDB Metrics Update - ScyllaDB 5.4 to 6.0 <metric-update-5.4-to-6.0>`
|
||||
@@ -1,64 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 5.4
|
||||
.. |NEW_VERSION| replace:: 6.0
|
||||
|
||||
ScyllaDB Metric Update - ScyllaDB |SRC_VERSION| to |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_column_family_tablet_count
|
||||
- Tablet count
|
||||
* - scylla_cql_replication_strategy_fail_list_violations
|
||||
- Counts the number of replication_strategy_fail_list guardrail violations,
|
||||
i.e., attempts to set a forbidden replication strategy in a keyspace via
|
||||
CREATE/ALTER KEYSPACE.
|
||||
* - scylla_cql_replication_strategy_warn_list_violations
|
||||
- Counts the number of replication_strategy_warn_list guardrail violations,
|
||||
i.e., attempts to set a discouraged replication strategy in a keyspace
|
||||
via CREATE/ALTER KEYSPACE.
|
||||
* - scylla_load_balancer_resizes_emitted
|
||||
- Number of resizes produced by the load balancer
|
||||
* - scylla_load_balancer_resizes_finalized
|
||||
- Number of resizes finalized by the load balancer.
|
||||
* - scylla_reactor_fstream_read_bytes_blocked
|
||||
- Counts the number of bytes read from disk that could not be satisfied
|
||||
from read-ahead buffers, and had to block. Indicates short streams or
|
||||
incorrect read ahead configuration.
|
||||
* - scylla_reactor_fstream_read_bytes
|
||||
- Counts bytes read from disk file streams. A high rate indicates high disk
|
||||
activity. Divide by fstream_reads to determine the average read size.
|
||||
* - scylla_reactor_fstream_reads_ahead_bytes_discarded
|
||||
- Counts the number of buffered bytes that were read ahead of time and were
|
||||
discarded because they were not needed, wasting disk bandwidth. Indicates
|
||||
over-eager read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads_aheads_discarded
|
||||
- Counts the number of times a buffer that was read ahead of time and was
|
||||
discarded because it was not needed, wasting disk bandwidth. Indicates
|
||||
over-eager read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads_blocked
|
||||
- Counts the number of times a disk read could not be satisfied from
|
||||
read-ahead buffers, and had to block. Indicates short streams or
|
||||
incorrect read ahead configuration.
|
||||
* - scylla_reactor_fstream_reads
|
||||
- Counts reads from disk file streams. A high rate indicates high disk
|
||||
activity. Contrast with other fstream_read* counters to locate bottlenecks.
|
||||
* - scylla_tablets_count
|
||||
- Tablet count
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
=====================================
|
||||
ScyllaDB 6.0 to 6.1 Upgrade Guide
|
||||
=====================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-6.0-to-6.1-generic>
|
||||
Metrics Update <metric-update-6.0-to-6.1>
|
||||
|
||||
* :doc:`Upgrade ScyllaDB from 6.0.x to 6.1.y <upgrade-guide-from-6.0-to-6.1-generic>`
|
||||
* :doc:`ScyllaDB Metrics Update - ScyllaDB 6.0 to 6.1 <metric-update-6.0-to-6.1>`
|
||||
@@ -0,0 +1,57 @@
|
||||
.. |SRC_VERSION| replace:: 6.0
|
||||
.. |NEW_VERSION| replace:: 6.1
|
||||
|
||||
ScyllaDB Metric Update - ScyllaDB |SRC_VERSION| to |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
New Metrics
|
||||
------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION|:
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_database_total_view_updates_on_wrong_node
|
||||
- The total number of view updates which are computed on the wrong node.
|
||||
* - scylla_raft_apply_index
|
||||
- The applied index.
|
||||
* - scylla_raft_commit_index
|
||||
- The commit index.
|
||||
* - scylla_raft_log_last_term
|
||||
- The term of the last log entry.
|
||||
* - scylla_raft_log_last_index
|
||||
- The index of the last log entry.
|
||||
* - scylla_raft_snapshot_last_index
|
||||
- The index of the snapshot.
|
||||
* - scylla_raft_snapshot_last_term
|
||||
- The term of the snapshot.
|
||||
* - scylla_raft_state
|
||||
- The current state: 0 - follower, 1 - candidate, 2 - leader
|
||||
* - scylla_storage_proxy_replica_received_hints_bytes_total
|
||||
- The total size of hints and MV hints received by this node.
|
||||
* - scylla_storage_proxy_replica_received_hints_total
|
||||
- The number of hints and MV hints received by this node.
|
||||
* - scylla_storage_proxy_stats::REPLICA_STATS_CATEGORY_view_update_backlog
|
||||
- Tracks the size of ``scylla_database_view_update_backlog`` and is used
|
||||
instead of that one to calculate the max backlog across all shards, which
|
||||
is then used by other nodes to calculate appropriate throttling delays if it grows
|
||||
too large. If it's notably different from ``scylla_database_view_update_backlog``,
|
||||
it means that we're currently processing a write that generated a large number
|
||||
of view updates.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,35 +1,35 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 5.4
|
||||
.. |NEW_VERSION| replace:: 6.0
|
||||
.. |SRC_VERSION| replace:: 6.0
|
||||
.. |NEW_VERSION| replace:: 6.1
|
||||
|
||||
.. |DEBIAN_SRC_REPO| replace:: Debian
|
||||
.. _DEBIAN_SRC_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-5.4
|
||||
.. _DEBIAN_SRC_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.0
|
||||
|
||||
.. |UBUNTU_SRC_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_SRC_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-5.4
|
||||
.. _UBUNTU_SRC_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.0
|
||||
|
||||
.. |SCYLLA_DEB_SRC_REPO| replace:: ScyllaDB deb repo (|DEBIAN_SRC_REPO|_, |UBUNTU_SRC_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_SRC_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_SRC_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-5.4
|
||||
.. _SCYLLA_RPM_SRC_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.0
|
||||
|
||||
.. |DEBIAN_NEW_REPO| replace:: Debian
|
||||
.. _DEBIAN_NEW_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.0
|
||||
.. _DEBIAN_NEW_REPO: https://www.scylladb.com/download/?platform=debian-10&version=scylla-6.1
|
||||
|
||||
.. |UBUNTU_NEW_REPO| replace:: Ubuntu
|
||||
.. _UBUNTU_NEW_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.0
|
||||
.. _UBUNTU_NEW_REPO: https://www.scylladb.com/download/?platform=ubuntu-20.04&version=scylla-6.1
|
||||
|
||||
.. |SCYLLA_DEB_NEW_REPO| replace:: ScyllaDB deb repo (|DEBIAN_NEW_REPO|_, |UBUNTU_NEW_REPO|_)
|
||||
|
||||
.. |SCYLLA_RPM_NEW_REPO| replace:: ScyllaDB rpm repo
|
||||
.. _SCYLLA_RPM_NEW_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.0
|
||||
.. _SCYLLA_RPM_NEW_REPO: https://www.scylladb.com/download/?platform=centos&version=scylla-6.1
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 5.4 to 6.0
|
||||
.. _SCYLLA_METRICS: ../metric-update-5.4-to-6.0
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 6.0 to 6.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-6.0-to-6.1
|
||||
|
||||
=============================================================================
|
||||
Upgrade |SCYLLA_NAME| from |SRC_VERSION| to |NEW_VERSION|
|
||||
@@ -47,6 +47,20 @@ It also applies when using ScyllaDB official image on EC2, GCP, or Azure.
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
|
||||
**Ensure Consistent Topology Changes Are Enabled**
|
||||
|
||||
In ScyllaDB 6.1, the Raft-based *consistent topology changes* feature is mandatory.
|
||||
|
||||
* If you enabled the feature after upgrading from 5.4 to 6.0 or created your
|
||||
cluster with version 6.0, no action is required before upgrading to 6.1.
|
||||
* If you did not enable the feature after upgrading from 5.4 to 6.0, you must
|
||||
enable the feature before upgrading to 6.1 by following
|
||||
the `Enable Consistent Topology Updates <https://opensource.docs.scylladb.com/branch-6.0/upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology.html>`_
|
||||
procedure.
|
||||
|
||||
To verify if the *consistent topology changes* feature is enabled on your cluster,
|
||||
see :ref:`Verifying that Raft is Enabled - Consistent Topology Changes <verifying-consistent-topology-changes-enabled>`.
|
||||
|
||||
**Upgrade Your Driver**
|
||||
|
||||
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
|
||||
@@ -66,11 +80,6 @@ We recommend upgrading the Monitoring Stack to the latest version.
|
||||
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
|
||||
at the `ScyllaDB Community Forum <https://forum.scylladb.com/>`_.
|
||||
|
||||
.. note::
|
||||
|
||||
In ScyllaDB 6.0, Raft-based consistent schema management for new and existing
|
||||
deployments is enabled by default and cannot be disabled.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
@@ -95,13 +104,6 @@ node before validating that the node you upgraded is up and running the new vers
|
||||
or remove nodes.
|
||||
* Not to apply schema changes.
|
||||
|
||||
**After** the upgrade:
|
||||
|
||||
* You may need to verify that Raft has been successfully initiated in your cluster.
|
||||
* You need to enable consistent topology updates.
|
||||
|
||||
See :ref:`After Upgrading Every Node <upgrade-5.4-6.0-after-upgrading-nodes>` for details.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
@@ -237,61 +239,6 @@ Validate
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
|
||||
.. _upgrade-5.4-6.0-after-upgrading-nodes:
|
||||
|
||||
After Upgrading Every Node
|
||||
===============================
|
||||
|
||||
After you have upgraded every node, perform the following procedures.
|
||||
|
||||
#. Validate Raft setup. This step only applies if you manually disabled
|
||||
the ``consistent_cluster_management`` option before upgrading to version 5.4.
|
||||
|
||||
In ScyllaDB 6.0, Raft-based consistent schema management for new and existing
|
||||
deployments is enabled by default and cannot be disabled.
|
||||
You need to verify if Raft was successfully initiated in your cluster
|
||||
**before** you proceed to the next step.
|
||||
See :ref:`Validate Raft Setup <upgrade-5.4-6.0-validate-raft-setup>` for instructions.
|
||||
|
||||
#. Enable the Raft-based consistent topology updates feature. See
|
||||
:doc:`Enable Consistent Topology Updates </upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`
|
||||
for instructions.
|
||||
|
||||
.. _upgrade-5.4-6.0-validate-raft-setup:
|
||||
|
||||
Validate Raft Setup
|
||||
-------------------------
|
||||
|
||||
.. note::
|
||||
|
||||
Skip this step if you upgraded from 5.2 to 5.4 with default settings. This
|
||||
section only applies if you manually disabled the ``consistent_cluster_management``
|
||||
option before upgrading from version 5.2. to 5.4.
|
||||
|
||||
Enabling Raft causes the ScyllaDB cluster to start an internal Raft
|
||||
initialization procedure as soon as every node is upgraded to the new version.
|
||||
The goal of that procedure is to initialize data structures used by the Raft
|
||||
algorithm to consistently manage cluster-wide metadata, such as table schemas.
|
||||
|
||||
Assuming you performed the rolling upgrade procedure correctly (in particular,
|
||||
ensuring that the schema is synchronized on every step), and if there are no
|
||||
problems with cluster connectivity, that internal procedure should take a few
|
||||
seconds to finish. However, the procedure requires full cluster availability.
|
||||
If one of the nodes fails before the procedure finishes (for example, due to
|
||||
a hardware problem), the process may get stuck, which may prevent schema or
|
||||
topology changes in your cluster.
|
||||
|
||||
Therefore, following the rolling upgrade, you must verify that the internal
|
||||
Raft initialization procedure has finished successfully by checking the logs
|
||||
of every ScyllaDB node. If the process gets stuck, manual intervention is
|
||||
required.
|
||||
|
||||
Refer to the
|
||||
:ref:`Verifying that the internal Raft upgrade procedure finished successfully <verify-raft-procedure>`
|
||||
section for instructions on verifying that the procedure was successful and
|
||||
proceeding if it gets stuck.
|
||||
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
@@ -209,8 +209,8 @@ Two time series helper tables were introduced that will help simplify the queryi
|
||||
|
||||
``sessions_time_idx`` is for querying regular traces. Another table, the ``node_slow_log_time_idx`` table, is for querying slow query records.
|
||||
|
||||
``sessions_time_idx`` and ``node_slow_log_time`` table column descriptions
|
||||
==========================================================================
|
||||
``sessions_time_idx`` and ``node_slow_log_time_idx`` table column descriptions
|
||||
===============================================================================
|
||||
|
||||
* ``minute``: the minute, from epoch time, from when the record was taken.
|
||||
* ``started_at``: a timestamp taken when the tracing session has begun.
|
||||
|
||||
@@ -430,6 +430,9 @@ future<tablet_replica_set> network_topology_strategy::add_tablets_in_dc(schema_p
|
||||
auto& candidate = existing.empty() ?
|
||||
new_racks.emplace_back(rack) : existing_racks.emplace_back(rack);
|
||||
for (const auto& node : nodes) {
|
||||
if (!node->is_normal()) {
|
||||
continue;
|
||||
}
|
||||
const auto& host_id = node->host_id();
|
||||
if (!existing.contains(host_id)) {
|
||||
candidate.nodes.emplace_back(host_id, load.get_load(host_id));
|
||||
|
||||
@@ -3339,9 +3339,7 @@ repair_service::insert_repair_meta(
|
||||
reason,
|
||||
compaction_time] (schema_ptr s) {
|
||||
auto& db = get_db();
|
||||
auto& cf = db.local().find_column_family(s->id());
|
||||
return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s),
|
||||
&cf,
|
||||
return db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {}).then([s = std::move(s),
|
||||
this,
|
||||
from,
|
||||
repair_meta_id,
|
||||
@@ -3354,7 +3352,7 @@ repair_service::insert_repair_meta(
|
||||
compaction_time] (reader_permit permit) mutable {
|
||||
node_repair_meta_id id{from, repair_meta_id};
|
||||
auto rm = seastar::make_shared<repair_meta>(*this,
|
||||
cf,
|
||||
get_db().local().find_column_family(s->id()),
|
||||
s,
|
||||
std::move(permit),
|
||||
range,
|
||||
|
||||
@@ -1766,8 +1766,8 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns),
|
||||
std::move(regular_columns), { }, { }, query::max_rows);
|
||||
|
||||
return do_with(std::move(slice), std::move(m), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
return do_with(std::move(slice), std::move(m), cf.write_in_progress(), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, const utils::phased_barrier::operation& op, std::vector<locked_cell>& locks) mutable {
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector<locked_cell> lcs) mutable {
|
||||
locks = std::move(lcs);
|
||||
|
||||
@@ -485,6 +485,14 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke
|
||||
}
|
||||
|
||||
void compaction_group::backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables) {
|
||||
// If group was closed / is being closed, it's ok to ignore request to adjust backlog tracker,
|
||||
// since that might result in an exception due to the group being deregistered from compaction
|
||||
// manager already. And the group is being removed anyway, so that won't have any practical
|
||||
// impact.
|
||||
if (_async_gate.is_closed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tracker = get_backlog_tracker();
|
||||
tracker.replace_sstables(old_sstables, new_sstables);
|
||||
}
|
||||
@@ -3628,7 +3636,13 @@ future<> storage_group::stop() noexcept {
|
||||
auto closed_gate_fut = _async_gate.close();
|
||||
|
||||
// Synchronizes with in-flight writes if any, and also takes care of flushing if needed.
|
||||
co_await coroutine::parallel_for_each(compaction_groups(), [] (const compaction_group_ptr& cg_ptr) {
|
||||
|
||||
// The reason we have to stop main cg first, is because an ongoing split always run in main cg
|
||||
// and output will be written to left and right groups. If either left or right are stopped before
|
||||
// main, split completion will add sstable to a closed group, and that might in turn trigger an
|
||||
// exception while running under row_cache::external_updater::execute, resulting in node crash.
|
||||
co_await _main_cg->stop();
|
||||
co_await coroutine::parallel_for_each(_split_ready_groups, [] (const compaction_group_ptr& cg_ptr) {
|
||||
return cg_ptr->stop();
|
||||
});
|
||||
co_await std::move(closed_gate_fut);
|
||||
|
||||
@@ -906,7 +906,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
|
||||
},
|
||||
guard, std::move(description));
|
||||
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
|
||||
@@ -993,7 +993,7 @@ future<> migration_manager::announce<topology_change>(std::vector<mutation> sche
|
||||
|
||||
future<group0_guard> migration_manager::start_group0_operation() {
|
||||
assert(this_shard_id() == 0);
|
||||
return _group0_client.start_operation(&_as, raft_timeout{});
|
||||
return _group0_client.start_operation(_as, raft_timeout{});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
#pragma once
|
||||
#include "seastar/core/semaphore.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "log.hh"
|
||||
#include "utils/digest_algorithm.hh"
|
||||
@@ -31,6 +32,7 @@ private:
|
||||
|
||||
class key_lock_map {
|
||||
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
|
||||
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
|
||||
using map = std::unordered_map<dht::token, semaphore>;
|
||||
|
||||
semaphore& get_semaphore_for_key(const dht::token& key);
|
||||
@@ -46,22 +48,15 @@ private:
|
||||
key_lock_map& _map;
|
||||
dht::token _key;
|
||||
clock_type::time_point _timeout;
|
||||
bool _locked = false;
|
||||
key_lock_map::semaphore_units _units;
|
||||
public:
|
||||
future<> lock() {
|
||||
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
|
||||
_locked = true;
|
||||
return f;
|
||||
future<> lock () {
|
||||
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
|
||||
}
|
||||
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
|
||||
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
|
||||
o._locked = false;
|
||||
}
|
||||
guard(guard&& o) = default;
|
||||
~guard() {
|
||||
if (_locked) {
|
||||
_map.get_semaphore_for_key(_key).signal(1);
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -519,7 +519,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(&as);
|
||||
auto guard = co_await group0_client.start_operation(as);
|
||||
|
||||
std::vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
@@ -554,7 +554,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
|
||||
@@ -155,7 +155,7 @@ semaphore& raft_group0_client::operation_mutex() {
|
||||
return _operation_mutex;
|
||||
}
|
||||
|
||||
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as,
|
||||
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as,
|
||||
std::optional<raft_timeout> timeout)
|
||||
{
|
||||
if (this_shard_id() != 0) {
|
||||
@@ -239,7 +239,7 @@ static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
|
||||
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
|
||||
}
|
||||
|
||||
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout) {
|
||||
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "start_group0_operation: must run on shard 0");
|
||||
}
|
||||
@@ -251,12 +251,12 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
|
||||
auto [upgrade_lock_holder, upgrade_state] = co_await get_group0_upgrade_state();
|
||||
switch (upgrade_state) {
|
||||
case group0_upgrade_state::use_post_raft_procedures: {
|
||||
auto operation_holder = co_await get_units(_operation_mutex, 1);
|
||||
co_await _raft_gr.group0_with_timeouts().read_barrier(as, timeout);
|
||||
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
|
||||
co_await _raft_gr.group0_with_timeouts().read_barrier(&as, timeout);
|
||||
|
||||
// Take `_group0_read_apply_mutex` *after* read barrier.
|
||||
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
|
||||
auto read_apply_holder = co_await hold_read_apply_mutex();
|
||||
auto read_apply_holder = co_await hold_read_apply_mutex(as);
|
||||
|
||||
auto observed_group0_state_id = co_await _sys_ks.get_last_group0_state_id();
|
||||
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
|
||||
@@ -546,7 +546,7 @@ static future<> add_write_mutations_entry(
|
||||
std::string_view description,
|
||||
std::vector<canonical_mutation> muts,
|
||||
::service::group0_guard group0_guard,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
logger.trace("add_write_mutations_entry: {} mutations with description {}",
|
||||
muts.size(), description);
|
||||
@@ -582,7 +582,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
|
||||
// when producer expects substantial number or size of mutations it should use generator
|
||||
if (_generators.size() == 0) {
|
||||
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
|
||||
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
|
||||
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
|
||||
}
|
||||
// raft doesn't support streaming so we need to materialize all mutations in memory
|
||||
co_await materialize_mutations();
|
||||
@@ -591,7 +591,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
|
||||
}
|
||||
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
|
||||
_muts.clear();
|
||||
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
|
||||
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
|
||||
}
|
||||
|
||||
future<std::pair<std::vector<mutation>, ::service::group0_guard>> group0_batch::extract() && {
|
||||
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
// Call after `system_keyspace` is initialized.
|
||||
future<> init();
|
||||
|
||||
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
|
||||
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as);
|
||||
|
||||
@@ -133,7 +133,7 @@ public:
|
||||
// FIXME?: this is kind of annoying for the user.
|
||||
// we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0,
|
||||
// and add_entry would again forward to shard 0.
|
||||
future<group0_guard> start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
future<group0_guard> start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>
|
||||
|
||||
@@ -514,11 +514,11 @@ raft_server_with_timeouts::run_with_timeout(Op&& op, const char* op_name,
|
||||
}
|
||||
|
||||
future<> raft_server_with_timeouts::add_entry(raft::command command, raft::wait_type type,
|
||||
seastar::abort_source* as, std::optional<raft_timeout> timeout)
|
||||
seastar::abort_source& as, std::optional<raft_timeout> timeout)
|
||||
{
|
||||
return run_with_timeout([&](abort_source* as) {
|
||||
return _group_server.server->add_entry(std::move(command), type, as);
|
||||
}, "add_entry", as, timeout);
|
||||
}, "add_entry", &as, timeout);
|
||||
}
|
||||
|
||||
future<> raft_server_with_timeouts::modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del,
|
||||
|
||||
@@ -92,7 +92,7 @@ class raft_server_with_timeouts {
|
||||
run_with_timeout(Op&& op, const char* op_name, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
public:
|
||||
raft_server_with_timeouts(raft_server_for_group& group_server, raft_group_registry& registry);
|
||||
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source& as, std::optional<raft_timeout> timeout);
|
||||
future<> modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<bool> trigger_snapshot(seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<> read_barrier(seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
|
||||
@@ -6283,6 +6283,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
|
||||
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
|
||||
|
||||
co_await utils::get_local_injector().inject("cas_timeout_after_lock", write_timeout + std::chrono::milliseconds(100));
|
||||
|
||||
while (true) {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
|
||||
@@ -1019,7 +1019,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
|
||||
{
|
||||
// The scope for the guard
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
auto me = _topology_state_machine._topology.find(server.id());
|
||||
// Recheck that cleanup is needed after the barrier
|
||||
if (!me || me->second.cleanup != cleanup_status::running) {
|
||||
@@ -1056,7 +1056,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
rtlogger.info("cleanup ended");
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
|
||||
|
||||
@@ -1064,7 +1064,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -1281,7 +1281,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
}
|
||||
|
||||
rtlogger.info("adding myself as the first node to the topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto insert_join_request_mutations = build_mutation_from_join_params(params, guard);
|
||||
|
||||
@@ -1304,7 +1304,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
"bootstrap: adding myself as the first node to the topology");
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("bootstrap: concurrent operation is detected, retrying.");
|
||||
}
|
||||
@@ -1353,7 +1353,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
while (true) {
|
||||
rtlogger.info("refreshing topology to check if it's synchronized with local metadata");
|
||||
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (synchronized()) {
|
||||
break;
|
||||
@@ -1391,7 +1391,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("update topology with local metadata:"
|
||||
" concurrent operation is detected, retrying.");
|
||||
@@ -1428,7 +1428,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
|
||||
co_return;
|
||||
@@ -1441,7 +1441,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "upgrade: start");
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("upgrade: concurrent operation is detected, retrying.");
|
||||
@@ -1782,6 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
|
||||
set_mode(mode::JOINING);
|
||||
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
|
||||
|
||||
if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft
|
||||
rtlogger.info("topology changes are using raft");
|
||||
|
||||
@@ -3565,7 +3567,7 @@ future<> storage_service::raft_decommission() {
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -3595,7 +3597,7 @@ future<> storage_service::raft_decommission() {
|
||||
|
||||
request_id = guard.new_group0_state_id();
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("decommission: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -3603,13 +3605,16 @@ future<> storage_service::raft_decommission() {
|
||||
break;
|
||||
}
|
||||
|
||||
rtlogger.info("decommission: waiting for completion (request ID: {})", request_id);
|
||||
auto error = co_await wait_for_topology_request_completion(request_id);
|
||||
|
||||
if (error.empty()) {
|
||||
// Need to set it otherwise gossiper will try to send shutdown on exit
|
||||
rtlogger.info("decommission: successfully removed from topology (request ID: {}), updating gossip status", request_id);
|
||||
co_await _gossiper.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count())));
|
||||
rtlogger.info("Decommission succeeded. Request ID: {}", request_id);
|
||||
} else {
|
||||
auto err = fmt::format("Decommission failed. See earlier errors ({})", error);
|
||||
auto err = fmt::format("Decommission failed. See earlier errors ({}). Request ID: {}", error, request_id);
|
||||
rtlogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
@@ -3820,6 +3825,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
// Step 3: Prepare to sync data
|
||||
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
|
||||
|
||||
utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120)).get();
|
||||
|
||||
// Step 5: Sync data for bootstrap
|
||||
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
|
||||
on_streaming_finished();
|
||||
@@ -3898,7 +3905,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(id);
|
||||
|
||||
@@ -3955,7 +3962,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
try {
|
||||
// Make non voter during request submission for better HA
|
||||
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("removenode: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -3964,19 +3971,21 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
break;
|
||||
}
|
||||
|
||||
rtlogger.info("removenode: wait for completion");
|
||||
rtlogger.info("removenode: waiting for completion (request ID: {})", request_id);
|
||||
|
||||
// Wait until request completes
|
||||
auto error = co_await wait_for_topology_request_completion(request_id);
|
||||
|
||||
if (error.empty()) {
|
||||
rtlogger.info("removenode: successfully removed from topology (request ID: {}), removing from group 0 configuration", request_id);
|
||||
try {
|
||||
co_await _group0->remove_from_raft_config(id);
|
||||
} catch (raft::not_a_member&) {
|
||||
rtlogger.info("removenode: already removed from the raft config by the topology coordinator");
|
||||
}
|
||||
rtlogger.info("Removenode succeeded. Request ID: {}", request_id);
|
||||
} else {
|
||||
auto err = fmt::format("Removenode failed. See earlier errors ({})", error);
|
||||
auto err = fmt::format("Removenode failed. See earlier errors ({}). Request ID: {}", error, request_id);
|
||||
rtlogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
@@ -4529,7 +4538,7 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::cleanup) {
|
||||
@@ -4557,7 +4566,7 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("cleanup: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4587,11 +4596,11 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_topology_not_busy() {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4600,7 +4609,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -4633,7 +4642,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("rebuild: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4653,7 +4662,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
|
||||
while (true) {
|
||||
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
|
||||
// FIXME: replace this with a queue
|
||||
@@ -4679,7 +4688,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -5251,7 +5260,7 @@ future<> storage_service::process_tablet_split_candidate(table_id table) {
|
||||
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
|
||||
try {
|
||||
// Ensures that latest changes to tablet metadata, in group0, are visible
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
if (!tmap.needs_split()) {
|
||||
release_guard(std::move(guard));
|
||||
@@ -5497,6 +5506,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
|
||||
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
|
||||
|
||||
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
|
||||
@@ -5644,8 +5655,10 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const raft::request_aborted& e) {
|
||||
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
|
||||
} catch (...) {
|
||||
rtlogger.error("raft_topology_cmd failed with: {}", std::current_exception());
|
||||
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
@@ -6193,7 +6206,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
|
||||
future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function<std::tuple<std::vector<canonical_mutation>, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) {
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
const auto tstate = *_topology_state_machine._topology.tstate;
|
||||
@@ -6204,7 +6217,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
|
||||
rtlogger.debug("transit_tablet(): topology state machine is busy: {}", tstate);
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
}
|
||||
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
@@ -6226,7 +6239,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("transit_tablet(): concurrent modification, retrying");
|
||||
@@ -6251,7 +6264,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
@@ -6263,7 +6276,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
|
||||
@@ -6404,7 +6417,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
|
||||
const auto& rs = p->second;
|
||||
@@ -6458,7 +6471,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
try {
|
||||
// Make replaced node and ignored nodes non voters earlier for better HA
|
||||
co_await _group0->make_nonvoters(ignored_nodes_from_join_params(params), _group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("join_node_request: concurrent operation is detected, retrying.");
|
||||
|
||||
@@ -294,7 +294,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
};
|
||||
|
||||
future<group0_guard> start_operation() {
|
||||
auto guard = co_await _group0.client().start_operation(&_as);
|
||||
rtlogger.debug("obtaining group 0 guard...");
|
||||
auto guard = co_await _group0.client().start_operation(_as);
|
||||
rtlogger.debug("guard taken, prev_state_id: {}, new_state_id: {}, coordinator term: {}, current Raft term: {}",
|
||||
guard.observed_group0_state_id(), guard.new_group0_state_id(), _term, _raft.get_current_term());
|
||||
|
||||
if (_term != _raft.get_current_term()) {
|
||||
throw term_changed_error{};
|
||||
@@ -337,7 +340,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.trace("update_topology_state mutations: {}", updates);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("race while changing state: {}. Retrying", reason);
|
||||
throw;
|
||||
@@ -763,8 +766,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.info("keyspace_rf_change requested");
|
||||
while (true) {
|
||||
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
std::unordered_map<sstring, sstring> saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
|
||||
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
|
||||
|
||||
@@ -773,35 +774,41 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
utils::UUID req_uuid = *_topo_sm._topology.global_request_id;
|
||||
std::vector<canonical_mutation> updates;
|
||||
sstring error;
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
if (_db.has_keyspace(ks_name)) {
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
size_t unimportant_init_tablet_count = 2; // must be a power of 2
|
||||
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
|
||||
|
||||
for (const auto& table : ks.metadata()->tables()) {
|
||||
try {
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table->id());
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table, tmptr, old_tablets);
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
"desired new ks opts: {}", error, new_ks_props.get_replication_options());
|
||||
updates.clear(); // remove all tablets mutations ...
|
||||
break; // ... and only create mutations deleting the global req
|
||||
for (const auto& table : ks.metadata()->tables()) {
|
||||
try {
|
||||
locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table->id());
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table, tmptr, old_tablets);
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
"desired new ks opts: {}", error, new_ks_props.get_replication_options());
|
||||
updates.clear(); // remove all tablets mutations ...
|
||||
break; // ... and only create mutations deleting the global req
|
||||
}
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::tablet_transition_kind::rebuild)
|
||||
.build()
|
||||
));
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
}
|
||||
|
||||
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table->id());
|
||||
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
|
||||
auto last_token = new_tablet_map.get_last_token(tablet_id);
|
||||
updates.emplace_back(co_await make_canonical_mutation_gently(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), table->id())
|
||||
.set_new_replicas(last_token, tablet_info.replicas)
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::tablet_transition_kind::rebuild)
|
||||
.build()
|
||||
));
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
} else {
|
||||
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
|
||||
}
|
||||
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
@@ -829,7 +836,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("handle_global_request(): concurrent modification, retrying");
|
||||
@@ -2338,7 +2345,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp());
|
||||
builder.with_node(id).set("cleanup_status", cleanup_status::needed);
|
||||
muts.emplace_back(builder.build());
|
||||
rtlogger.trace("mark node {} as needed cleanup", id);
|
||||
rtlogger.debug("mark node {} as needed for cleanup", id);
|
||||
}
|
||||
}
|
||||
return muts;
|
||||
@@ -2359,7 +2366,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.with_node(id).set("cleanup_status", cleanup_status::running);
|
||||
muts.emplace_back(builder.build());
|
||||
rtlogger.trace("mark node {} as cleanup running", id);
|
||||
rtlogger.debug("mark node {} as cleanup running", id);
|
||||
}
|
||||
}
|
||||
if (!muts.empty()) {
|
||||
@@ -2604,7 +2611,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
|
||||
if (auth_version < db::system_keyspace::auth_version_t::v2) {
|
||||
rtlogger.info("migrating system_auth keyspace data");
|
||||
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
|
||||
[this] (abort_source*) { return start_operation();}, _as);
|
||||
[this] (abort_source&) { return start_operation();}, _as);
|
||||
}
|
||||
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
@@ -2894,6 +2901,12 @@ future<> topology_coordinator::run() {
|
||||
co_await await_event();
|
||||
rtlogger.debug("topology coordinator fiber got an event");
|
||||
}
|
||||
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", [] (auto& handler) -> future<> {
|
||||
rtlogger.info("wait-after-topology-coordinator-gets-event injection hit");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
|
||||
rtlogger.info("wait-after-topology-coordinator-gets-event injection done");
|
||||
});
|
||||
|
||||
} catch (...) {
|
||||
sleep = handle_topology_coordinator_error(std::current_exception());
|
||||
}
|
||||
|
||||
@@ -558,29 +558,18 @@ bool sstable_directory::compare_sstable_storage_prefix(const sstring& prefix_a,
|
||||
return size_a == size_b && sstring::traits_type::compare(prefix_a.begin(), prefix_b.begin(), size_a) == 0;
|
||||
}
|
||||
|
||||
future<std::pair<sstring, sstring>> sstable_directory::create_pending_deletion_log(const std::vector<shared_sstable>& ssts) {
|
||||
future<std::unordered_map<sstring, sstring>> sstable_directory::create_pending_deletion_log(const std::vector<shared_sstable>& ssts) {
|
||||
return seastar::async([&ssts] {
|
||||
shared_sstable first = nullptr;
|
||||
min_max_tracker<generation_type> gen_tracker;
|
||||
std::unordered_map<sstring, min_max_tracker<generation_type>> gen_trackers;
|
||||
std::unordered_map<sstring, sstring> res;
|
||||
|
||||
for (const auto& sst : ssts) {
|
||||
gen_tracker.update(sst->generation());
|
||||
|
||||
if (first == nullptr) {
|
||||
first = sst;
|
||||
} else {
|
||||
// All sstables are assumed to be in the same column_family, hence
|
||||
// sharing their base directory. Since lexicographical comparison of
|
||||
// paths is not the same as their actually equivalence, this should
|
||||
// rather check for fs::equivalent call on _storage.prefix()-s. But
|
||||
// since we know that the worst thing filesystem storage driver can
|
||||
// do is to prepend/drop the trailing slash, it should be enough to
|
||||
// compare prefixes of both ... prefixes
|
||||
assert(compare_sstable_storage_prefix(first->_storage->prefix(), sst->_storage->prefix()));
|
||||
}
|
||||
auto prefix = sst->_storage->prefix();
|
||||
gen_trackers[prefix].update(sst->generation());
|
||||
}
|
||||
|
||||
sstring pending_delete_dir = first->_storage->prefix() + "/" + sstables::pending_delete_dir;
|
||||
for (const auto& [prefix, gen_tracker] : gen_trackers) {
|
||||
sstring pending_delete_dir = prefix + "/" + sstables::pending_delete_dir;
|
||||
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
|
||||
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
|
||||
sstlog.trace("Writing {}", tmp_pending_delete_log);
|
||||
@@ -611,11 +600,13 @@ future<std::pair<sstring, sstring>> sstable_directory::create_pending_deletion_l
|
||||
dir_f.flush().get();
|
||||
close_dir.close_now();
|
||||
sstlog.debug("{} written successfully.", pending_delete_log);
|
||||
res.emplace(std::move(pending_delete_dir), std::move(pending_delete_log));
|
||||
} catch (...) {
|
||||
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_pair<sstring, sstring>(std::move(pending_delete_log), first->_storage->prefix());
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -287,8 +287,9 @@ public:
|
||||
|
||||
// Creates the deletion log for atomic deletion of sstables (helper for the
|
||||
// above function that's also used by tests)
|
||||
// Returns a pair of "logilfe name" and "directory with sstables"
|
||||
static future<std::pair<sstring, sstring>> create_pending_deletion_log(const std::vector<shared_sstable>& ssts);
|
||||
// Returns an unordered_map of <directory with sstables, logfile_name> for every sstable prefix.
|
||||
// Currently, atomicity is guranteed only within each unique prefix and not across prefixes (See #18862)
|
||||
static future<std::unordered_map<sstring, sstring>> create_pending_deletion_log(const std::vector<shared_sstable>& ssts);
|
||||
|
||||
static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept;
|
||||
};
|
||||
|
||||
@@ -2986,6 +2986,7 @@ sstable::unlink(storage::sync_dir sync) noexcept {
|
||||
|
||||
co_await std::move(remove_fut);
|
||||
_stats.on_delete();
|
||||
_manager.on_unlink(this);
|
||||
}
|
||||
|
||||
thread_local sstables_stats::stats sstables_stats::_shard_stats;
|
||||
|
||||
@@ -323,6 +323,11 @@ void sstables_manager::validate_new_keyspace_storage_options(const data_dictiona
|
||||
}, so.value);
|
||||
}
|
||||
|
||||
void sstables_manager::on_unlink(sstable* sst) {
|
||||
// Remove the sst from manager's reclaimed list to prevent any attempts to reload its components.
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
sstables_registry::~sstables_registry() = default;
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -188,6 +188,9 @@ public:
|
||||
|
||||
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
|
||||
|
||||
// To be called by the sstable to signal its unlinking
|
||||
void on_unlink(sstable* sst);
|
||||
|
||||
private:
|
||||
void add(sstable* sst);
|
||||
// Transition the sstable to the "inactive" state. It has no
|
||||
|
||||
@@ -470,32 +470,27 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
class filesystem_atomic_delete_ctx : public atomic_delete_context_impl {
|
||||
public:
|
||||
sstring log;
|
||||
sstring directory;
|
||||
filesystem_atomic_delete_ctx(sstring l, sstring dir) noexcept : log(std::move(l)), directory(std::move(dir)) {}
|
||||
};
|
||||
|
||||
future<atomic_delete_context> filesystem_storage::atomic_delete_prepare(const std::vector<shared_sstable>& ssts) const {
|
||||
auto [ pending_delete_log, sst_directory ] = co_await sstable_directory::create_pending_deletion_log(ssts);
|
||||
co_return std::make_unique<filesystem_atomic_delete_ctx>(std::move(pending_delete_log), std::move(sst_directory));
|
||||
return sstable_directory::create_pending_deletion_log(ssts);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::atomic_delete_complete(atomic_delete_context ctx_) const {
|
||||
auto& ctx = static_cast<filesystem_atomic_delete_ctx&>(*ctx_);
|
||||
future<> filesystem_storage::atomic_delete_complete(atomic_delete_context ctx) const {
|
||||
co_await coroutine::parallel_for_each(ctx, [] (const auto& x) -> future<> {
|
||||
const auto& dir = x.first;
|
||||
const auto& log = x.second;
|
||||
|
||||
co_await sync_directory(ctx.directory);
|
||||
co_await sync_directory(dir);
|
||||
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
try {
|
||||
co_await remove_file(ctx.log);
|
||||
sstlog.debug("{} removed.", ctx.log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error removing {}: {}. Ignoring.", ctx.log, std::current_exception());
|
||||
}
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
try {
|
||||
co_await remove_file(log);
|
||||
sstlog.debug("{} removed.", log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error removing {}: {}. Ignoring.", log, std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> filesystem_storage::remove_by_registry_entry(entry_descriptor desc) {
|
||||
@@ -610,7 +605,7 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
|
||||
|
||||
future<atomic_delete_context> s3_storage::atomic_delete_prepare(const std::vector<shared_sstable>&) const {
|
||||
// FIXME -- need atomicity, see #13567
|
||||
co_return nullptr;
|
||||
co_return atomic_delete_context{};
|
||||
}
|
||||
|
||||
future<> s3_storage::atomic_delete_complete(atomic_delete_context ctx) const {
|
||||
|
||||
@@ -32,11 +32,8 @@ class delayed_commit_changes;
|
||||
class sstable;
|
||||
class sstables_manager;
|
||||
class entry_descriptor;
|
||||
class atomic_delete_context_impl {
|
||||
public:
|
||||
virtual ~atomic_delete_context_impl() {}
|
||||
};
|
||||
using atomic_delete_context = std::unique_ptr<atomic_delete_context_impl>;
|
||||
|
||||
using atomic_delete_context = std::unordered_map<sstring, sstring>;
|
||||
|
||||
class storage {
|
||||
friend class test;
|
||||
|
||||
126
test/alternator/test_cql_rbac.py
Normal file
126
test/alternator/test_cql_rbac.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# Copyright 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
# Tests for how CQL's Role-Based Access Control (RBAC) commands - CREATE ROLE,
|
||||
# GRANT, REVOKE, etc., can be used on Alternator for authentication and for
|
||||
# authorization. For example if the low-level name of an Alternator table "x"
|
||||
# is alternator_x.x, and a certain user is not granted permission to "modify"
|
||||
# keyspace alternator_x, Alternator write requests (PutItem, UpdateItem,
|
||||
# DeleteItem, BatchWriteItem) by that user will be denied.
|
||||
#
|
||||
# Because this file is all about testing the Scylla-only CQL-based RBAC,
|
||||
# all tests in this file are skipped when running against Amazon DynamoDB.
|
||||
|
||||
import pytest
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from test.alternator.util import is_aws, unique_table_name
|
||||
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
|
||||
from cassandra.policies import RoundRobinPolicy
|
||||
import re
|
||||
|
||||
# This file is all about testing RBAC as configured via CQL, so we need to
|
||||
# connect to CQL to set these tests up. The "cql" fixture below enables that.
|
||||
# If we're not testing Scylla, or the CQL port is not available on the same
|
||||
# IP address as the Alternator IP address, a test using this fixture will
|
||||
# be skipped with a message about the CQL API not being available.
|
||||
@pytest.fixture(scope="module")
|
||||
def cql(dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only CQL API not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
host, = re.search(r'.*://([^:]*):', url).groups()
|
||||
profile = ExecutionProfile(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
||||
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
|
||||
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
|
||||
contact_points=[host],
|
||||
port=9042,
|
||||
protocol_version=4,
|
||||
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
|
||||
)
|
||||
try:
|
||||
ret = cluster.connect()
|
||||
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
|
||||
ret.execute("BEGIN BATCH APPLY BATCH")
|
||||
except NoHostAvailable:
|
||||
pytest.skip('Could not connect to Scylla-only CQL API')
|
||||
yield ret
|
||||
cluster.shutdown()
|
||||
|
||||
# new_role() is a context manager for temporarily creating a new role with
|
||||
# a unique name and returning its name and the secret key needed to connect
|
||||
# to it with the DynamoDB API.
|
||||
# The "login" and "superuser" flags are passed to the CREATE ROLE statement.
|
||||
@contextmanager
|
||||
def new_role(cql, login=True, superuser=False):
|
||||
# The role name is not a table's name but it doesn't matter. Because our
|
||||
# unique_table_name() uses (deliberately) a non-lower-case character, the
|
||||
# role name has to be quoted in double quotes when used in CQL below.
|
||||
role = unique_table_name()
|
||||
# The password set for the new role is identical to the user name (not
|
||||
# very secure ;-)) - but we later need to retrieve the "salted hash" of
|
||||
# this password, which serves in Alternator as the secret key of the role.
|
||||
cql.execute(f"CREATE ROLE \"{role}\" WITH PASSWORD = '{role}' AND SUPERUSER = {superuser} AND LOGIN = {login}")
|
||||
# Newer Scylla places the "roles" table in the "system" keyspace, but
|
||||
# older versions used "system_auth_v2" or "system_auth"
|
||||
key = None
|
||||
for ks in ['system', 'system_auth_v2', 'system_auth']:
|
||||
try:
|
||||
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))
|
||||
if e != []:
|
||||
key = e[0].salted_hash
|
||||
if key is not None:
|
||||
break
|
||||
except:
|
||||
pass
|
||||
assert key is not None
|
||||
try:
|
||||
yield (role, key)
|
||||
finally:
|
||||
cql.execute(f'DROP ROLE "{role}"')
|
||||
|
||||
# Create a new DynamoDB API resource (connection object) similar to the
|
||||
# existing "dynamodb" resource - but authenticating with the given role
|
||||
# and key.
|
||||
@contextmanager
|
||||
def new_dynamodb(dynamodb, role, key):
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
config = dynamodb.meta.client._client_config
|
||||
verify = not url.startswith('https')
|
||||
ret = boto3.resource('dynamodb', endpoint_url=url, verify=verify,
|
||||
aws_access_key_id=role, aws_secret_access_key=key,
|
||||
region_name='us-east-1', config=config)
|
||||
try:
|
||||
yield ret
|
||||
finally:
|
||||
ret.meta.client.close()
|
||||
|
||||
# A basic test for creating a new role. The ListTables operation is allowed
|
||||
# to any role, so it should work in the new role when given the right password
|
||||
# and fail with the wrong password.
|
||||
def test_new_role(dynamodb, cql):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
# ListTables should not fail (we don't care what is the result)
|
||||
d.meta.client.list_tables()
|
||||
# Trying to use the wrong key for the new role should fail to perform
|
||||
# any request. The new_dynamodb() function can't detect the error,
|
||||
# it is detected when attempting to perform a request with it.
|
||||
with new_dynamodb(dynamodb, role, 'wrongkey') as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException'):
|
||||
d.meta.client.list_tables()
|
||||
|
||||
# A role without "login" permissions cannot be used to authenticate requests.
|
||||
# Reproduces #19735.
|
||||
def test_login_false(dynamodb, cql):
|
||||
with new_role(cql, login=False) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException.*login=false'):
|
||||
d.meta.client.list_tables()
|
||||
@@ -14,7 +14,7 @@ import pytest
|
||||
from boto3.dynamodb.types import TypeDeserializer
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.util import unique_table_name, create_test_table, new_test_table, random_string, freeze
|
||||
from test.alternator.util import unique_table_name, create_test_table, new_test_table, random_string, freeze, list_tables
|
||||
|
||||
# All tests in this file are expected to fail with tablets due to #16317.
|
||||
# To ensure that Alternator Streams is still being tested, instead of
|
||||
@@ -1581,6 +1581,26 @@ def test_stream_arn_unchanging(dynamodb, dynamodbstreams):
|
||||
assert len(streams['Streams']) == 1
|
||||
assert streams['Streams'][0]['StreamArn'] == arn
|
||||
|
||||
# Enabling a stream shouldn't cause any extra table to appear in ListTables.
|
||||
# In issue #19911, enabling streams on a table called xyz caused the name
|
||||
# "xyz_scylla_cdc_log" to appear in ListTables. The following test creates
|
||||
# a table with a long unique name, and ensures that only one table containing
|
||||
# this name as a substring is listed.
|
||||
# In test_gsi.py and test_lsi.py we have similar tests for GSI and LSI.
|
||||
# Reproduces #19911
|
||||
def test_stream_list_tables(dynamodb):
|
||||
with new_test_table(dynamodb,
|
||||
Tags=TAGS,
|
||||
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
||||
) as table:
|
||||
# Check that the long and unique table name (created by
|
||||
# unique_table_name()) isn't a substring of any table name,
|
||||
# except of course the table itself:
|
||||
for listed_name in list_tables(dynamodb):
|
||||
assert table.name == listed_name or table.name not in listed_name
|
||||
|
||||
# TODO: tests on multiple partitions
|
||||
# TODO: write a test that disabling the stream and re-enabling it works, but
|
||||
# requires the user to wait for the first stream to become DISABLED before
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "utils/bloom_filter.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components_and_reload_reclaimed_components) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
@@ -52,6 +53,11 @@ std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env
|
||||
return {sst, sst_bf_memory};
|
||||
}
|
||||
|
||||
void dispose_and_stop_tracking_bf_memory(shared_sstable&& sst, test_env_sstables_manager& mgr) {
|
||||
mgr.remove_sst_from_reclaimed(sst.get());
|
||||
shared_sstable::dispose(sst.release().release());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
@@ -89,7 +95,7 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
|
||||
|
||||
// Test auto reload - disposing sst3 should trigger reload of the
|
||||
// smallest filter in the reclaimed list, which is sst1's bloom filter.
|
||||
shared_sstable::dispose(sst3.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst3), sst_mgr);
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
// only sst4's bloom filter memory should be reported as reclaimed
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst4_bf_memory);
|
||||
@@ -154,7 +160,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
|
||||
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
|
||||
|
||||
// dispose sst2 to trigger reload of sst1's bloom filter
|
||||
shared_sstable::dispose(sst2.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst2), sst_mgr);
|
||||
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
|
||||
|
||||
@@ -170,8 +176,8 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
|
||||
// resume reloading sst1 filter
|
||||
utils::get_local_injector().receive_message("reload_reclaimed_components/pause");
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst3_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst3_bf_memory);
|
||||
|
||||
utils::get_local_injector().disable("reload_reclaimed_components/pause");
|
||||
}, {
|
||||
@@ -223,3 +229,57 @@ SEASTAR_TEST_CASE(test_bloom_filters_with_bad_partition_estimate) {
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_bloom_filter_reload_after_unlink) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return;
|
||||
#endif
|
||||
simple_schema ss;
|
||||
auto schema = ss.schema();
|
||||
|
||||
auto mut = mutation(schema, ss.make_pkey(1));
|
||||
mut.partition().apply_insert(*schema, ss.make_ckey(1), ss.new_timestamp());
|
||||
|
||||
// bloom filter will be reclaimed automatically due to low memory
|
||||
auto sst = make_sstable_containing(env.make_sstable(schema), {mut});
|
||||
auto& sst_mgr = env.manager();
|
||||
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
|
||||
auto memory_reclaimed = sst_mgr.get_total_memory_reclaimed();
|
||||
|
||||
// manager's reclaimed set has the sst now
|
||||
auto& reclaimed_set = sst_mgr.get_reclaimed_set();
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.begin()->get_filename(), sst->get_filename());
|
||||
|
||||
// hold a copy of shared sst object in async thread to test reload after unlink
|
||||
utils::get_local_injector().enable("test_bloom_filter_reload_after_unlink");
|
||||
auto async_sst_holder = seastar::async([sst] {
|
||||
// do nothing just hold a copy of sst and wait for message signalling test completion
|
||||
utils::get_local_injector().inject("test_bloom_filter_reload_after_unlink", [] (auto& handler) {
|
||||
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
|
||||
return ret;
|
||||
}).get();
|
||||
});
|
||||
|
||||
// unlink the sst and release the object
|
||||
sst->unlink().get();
|
||||
sst.release();
|
||||
|
||||
// reclaimed set should be now empty but the total memory reclaimed should
|
||||
// be still the same as the sst object is not deactivated yet due to a copy
|
||||
// being alive in the async thread.
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_reclaimed_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), memory_reclaimed);
|
||||
|
||||
// message async thread to complete waiting and thus release its copy of sst, triggering deactivation
|
||||
utils::get_local_injector().receive_message("test_bloom_filter_reload_after_unlink");
|
||||
async_sst_holder.get();
|
||||
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
|
||||
}, {
|
||||
// set available memory = 0 to force reclaim the bloom filter
|
||||
.available_memory = 0
|
||||
});
|
||||
};
|
||||
|
||||
@@ -252,7 +252,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
|
||||
};
|
||||
|
||||
auto do_transaction = [&] (std::function<future<>(service::group0_batch&)> f) -> future<> {
|
||||
auto guard = co_await rclient.start_operation(&as);
|
||||
auto guard = co_await rclient.start_operation(as);
|
||||
service::group0_batch mc(std::move(guard));
|
||||
co_await f(mc);
|
||||
co_await std::move(mc).commit(rclient, as, ::service::raft_timeout{});
|
||||
@@ -273,7 +273,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
|
||||
|
||||
// test extract
|
||||
{
|
||||
auto guard = co_await rclient.start_operation(&as);
|
||||
auto guard = co_await rclient.start_operation(as);
|
||||
service::group0_batch mc(std::move(guard));
|
||||
mc.add_mutation(co_await insert_mut(1, 2));
|
||||
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/smp.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/util/file.hh>
|
||||
@@ -145,7 +146,7 @@ static void with_sstable_directory(
|
||||
wrapped_test_env env_wrap,
|
||||
noncopyable_function<void (sharded<sstable_directory>&)> func) {
|
||||
|
||||
testlog.debug("with_sstable_directory: {}", path);
|
||||
testlog.debug("with_sstable_directory: {}/{}", path, state);
|
||||
|
||||
sharded<sstables::directory_semaphore> sstdir_sem;
|
||||
sstdir_sem.start(1).get();
|
||||
@@ -782,14 +783,24 @@ SEASTAR_THREAD_TEST_CASE(test_system_datadir_layout) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
|
||||
return sstables::test_env::do_with_sharded_async([] (auto& env) {
|
||||
for (auto state : {sstables::sstable_state::normal, sstables::sstable_state::staging}) {
|
||||
auto base = env.local().tempdir().path() / fmt::to_string(table_id::create_random_id());
|
||||
auto dir = base / fmt::to_string(state);
|
||||
recursive_touch_directory(dir.native()).get();
|
||||
|
||||
auto new_sstable = [&] {
|
||||
return env.local().make_sstable(test_table_schema(), dir.native());
|
||||
};
|
||||
std::vector<shared_sstable> ssts_to_keep;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
ssts_to_keep.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))));
|
||||
ssts_to_keep.emplace_back(make_sstable_for_this_shard(new_sstable));
|
||||
}
|
||||
testlog.debug("SSTables to keep: {}", ssts_to_keep);
|
||||
std::vector<shared_sstable> ssts_to_remove;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ssts_to_remove.emplace_back(make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))));
|
||||
ssts_to_remove.emplace_back(make_sstable_for_this_shard(new_sstable));
|
||||
}
|
||||
testlog.debug("SSTables to remove: {}", ssts_to_remove);
|
||||
|
||||
// Now start atomic deletion -- create the pending deletion log for all
|
||||
// three sstables, move TOC file for one of them into temporary-TOC, and
|
||||
@@ -799,7 +810,16 @@ SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
|
||||
rename_file(test(ssts_to_remove[2]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[2]).filename(sstables::component_type::TemporaryTOC).native()).get();
|
||||
remove_file(test(ssts_to_remove[2]).filename(sstables::component_type::Data).native()).get();
|
||||
|
||||
with_sstable_directory(env, [&] (sharded<sstables::sstable_directory>& sstdir) {
|
||||
// mimic distributed_loader table_populator::start order
|
||||
// as the pending_delete_dir is now shared, at the table base directory
|
||||
if (state != sstables::sstable_state::normal) {
|
||||
with_sstable_directory(base, sstables::sstable_state::normal, env, [&] (sharded<sstables::sstable_directory>& sstdir) {
|
||||
auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true });
|
||||
BOOST_REQUIRE_NO_THROW(expect_ok.get());
|
||||
});
|
||||
}
|
||||
|
||||
with_sstable_directory(base, state, env, [&] (sharded<sstables::sstable_directory>& sstdir) {
|
||||
auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true });
|
||||
BOOST_REQUIRE_NO_THROW(expect_ok.get());
|
||||
|
||||
@@ -827,5 +847,6 @@ SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
|
||||
|
||||
BOOST_REQUIRE_EQUAL(expected, collected);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -44,5 +44,7 @@ custom_args:
|
||||
- '-c1 -m256M'
|
||||
commitlog_cleanup_test:
|
||||
- '-c1 -m2G'
|
||||
bloom_filter_test:
|
||||
- '-c1'
|
||||
run_in_debug:
|
||||
- logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
@@ -1315,6 +1315,65 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) {
|
||||
// Verifies that new table doesn't get tablets allocated on a node being decommissioned
|
||||
// which may leave them on replicas absent in topology post decommission.
|
||||
do_with_cql_env_thread([](auto& e) {
|
||||
inet_address ip1("192.168.0.1");
|
||||
inet_address ip2("192.168.0.2");
|
||||
inet_address ip3("192.168.0.3");
|
||||
inet_address ip4("192.168.0.4");
|
||||
|
||||
auto host1 = host_id(next_uuid());
|
||||
auto host2 = host_id(next_uuid());
|
||||
auto host3 = host_id(next_uuid());
|
||||
auto host4 = host_id(next_uuid());
|
||||
locator::endpoint_dc_rack dcrack = { "datacenter1", "rack1" };
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
|
||||
locator::topology::config {
|
||||
.this_endpoint = ip1,
|
||||
.local_dc_rack = dcrack
|
||||
}
|
||||
});
|
||||
|
||||
const unsigned shard_count = 1;
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
tm.update_host_id(host4, ip4);
|
||||
tm.update_topology(host1, dcrack, std::nullopt, shard_count);
|
||||
tm.update_topology(host2, dcrack, std::nullopt, shard_count);
|
||||
tm.update_topology(host3, dcrack, node::state::being_decommissioned, shard_count);
|
||||
tm.update_topology(host4, dcrack, node::state::left, shard_count);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
sstring ks_name = "test_ks";
|
||||
sstring table_name = "table1";
|
||||
e.execute_cql(format("create keyspace {} with replication = "
|
||||
"{{'class': 'NetworkTopologyStrategy', '{}': 1}} "
|
||||
"and tablets = {{'enabled': true, 'initial': 8}}", ks_name, dcrack.dc)).get();
|
||||
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
|
||||
auto s = e.local_db().find_schema(ks_name, table_name);
|
||||
|
||||
auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware();
|
||||
BOOST_REQUIRE(rs);
|
||||
auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 8).get();
|
||||
|
||||
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
|
||||
for (auto& replica : tinfo.replicas) {
|
||||
BOOST_REQUIRE_NE(replica.host, host3);
|
||||
BOOST_REQUIRE_NE(replica.host, host4);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}, tablet_cql_test_config()).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
|
||||
// Verifies that load balancer moves tablets out of the decommissioned node.
|
||||
// The scenario is such that replication constraints of tablets can be satisfied after decommission.
|
||||
|
||||
@@ -644,7 +644,10 @@ private:
|
||||
_sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
|
||||
|
||||
_sys_ks.start(std::ref(_qp), std::ref(_db)).get();
|
||||
auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); });
|
||||
auto stop_sys_kd = defer([this] {
|
||||
_sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
|
||||
_sys_ks.stop().get();
|
||||
});
|
||||
|
||||
replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db).get();
|
||||
_db.local().init_schema_commitlog();
|
||||
@@ -848,9 +851,6 @@ private:
|
||||
}
|
||||
|
||||
group0_client.init().get();
|
||||
auto stop_system_keyspace = defer([this] {
|
||||
_sys_ks.invoke_on_all(&db::system_keyspace::shutdown).get();
|
||||
});
|
||||
|
||||
auto shutdown_db = defer([this] {
|
||||
_db.invoke_on_all(&replica::database::shutdown).get();
|
||||
@@ -985,7 +985,7 @@ private:
|
||||
config.is_superuser = true;
|
||||
config.can_login = true;
|
||||
|
||||
auto as = &abort_sources.local();
|
||||
auto& as = abort_sources.local();
|
||||
auto guard = group0_client.start_operation(as).get();
|
||||
service::group0_batch mc{std::move(guard)};
|
||||
auth::create_role(
|
||||
@@ -994,7 +994,7 @@ private:
|
||||
config,
|
||||
auth::authentication_options(),
|
||||
mc).get();
|
||||
std::move(mc).commit(group0_client, *as, ::service::raft_timeout{}).get();
|
||||
std::move(mc).commit(group0_client, as, ::service::raft_timeout{}).get();
|
||||
} catch (const auth::role_already_exists&) {
|
||||
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
|
||||
}
|
||||
@@ -1060,7 +1060,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
|
||||
void do_with_mc(cql_test_env& env, std::function<void(service::group0_batch&)> func) {
|
||||
seastar::abort_source as;
|
||||
auto& g0 = env.get_raft_group0_client();
|
||||
auto guard = g0.start_operation(&as).get();
|
||||
auto guard = g0.start_operation(as).get();
|
||||
auto mc = service::group0_batch(std::move(guard));
|
||||
func(mc);
|
||||
std::move(mc).commit(g0, as, std::nullopt).get();
|
||||
|
||||
@@ -57,6 +57,14 @@ public:
|
||||
size_t get_total_reclaimable_memory() {
|
||||
return _total_reclaimable_memory;
|
||||
}
|
||||
|
||||
void remove_sst_from_reclaimed(sstable* sst) {
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
auto& get_reclaimed_set() {
|
||||
return _reclaimed;
|
||||
}
|
||||
};
|
||||
|
||||
class test_env_compaction_manager {
|
||||
|
||||
@@ -149,6 +149,7 @@ SCYLLA_CMDLINE_OPTIONS = [
|
||||
'--abort-on-ebadf', '1',
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
'--logger-log-level', 'query_processor=debug',
|
||||
'--logger-log-level', 'group0_raft_sm=trace',
|
||||
]
|
||||
|
||||
# [--smp, 1], [--smp, 2] -> [--smp, 2]
|
||||
|
||||
@@ -70,6 +70,8 @@ def check_child_parent_relationship(rest_api, status_tree, parent, allow_no_chil
|
||||
def drain_module_tasks(rest_api, module_name):
|
||||
tasks = [task for task in list_tasks(rest_api, module_name, True)]
|
||||
for task in tasks:
|
||||
# Wait for task and unregister it.
|
||||
resp = rest_api.send("GET", f"task_manager/wait_task/{task['task_id']}")
|
||||
resp = rest_api.send("GET", f"task_manager/task_status/{task['task_id']}")
|
||||
# The task may be already unregistered.
|
||||
assert resp.status_code == requests.codes.ok or resp.status_code == requests.codes.bad_request, "Invalid status code"
|
||||
|
||||
@@ -5,3 +5,9 @@
|
||||
def test_system_uptime_ms(rest_api):
|
||||
resp = rest_api.send('GET', "system/uptime_ms")
|
||||
resp.raise_for_status()
|
||||
|
||||
|
||||
def test_system_highest_sstable_format(rest_api):
|
||||
resp = rest_api.send('GET', "system/highest_supported_sstable_version")
|
||||
resp.raise_for_status()
|
||||
assert resp.json() == "me"
|
||||
|
||||
@@ -108,8 +108,6 @@ def test_task_manager_wait(rest_api):
|
||||
|
||||
x.join()
|
||||
|
||||
assert_task_does_not_exist(rest_api, task0)
|
||||
|
||||
def test_task_manager_ttl(rest_api):
|
||||
with new_test_module(rest_api):
|
||||
args0 = {"keyspace": "keyspace0", "table": "table0"}
|
||||
|
||||
@@ -48,8 +48,8 @@ async def test_coordinator_queue_management(manager: ManagerClient):
|
||||
|
||||
await wait_for_first_completed([l.wait_for("received request to join from host_id", m) for l, m in zip(logs[:3], marks[:3])])
|
||||
|
||||
marks[0] = await logs[0].wait_for("raft_topology - removenode: wait for completion", marks[0])
|
||||
marks[0] = await logs[0].wait_for("raft_topology - removenode: wait for completion", marks[0])
|
||||
marks[0] = await logs[0].wait_for("raft_topology - removenode: waiting for completion", marks[0])
|
||||
marks[0] = await logs[0].wait_for("raft_topology - removenode: waiting for completion", marks[0])
|
||||
|
||||
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]]
|
||||
|
||||
@@ -68,10 +68,7 @@ async def test_coordinator_queue_management(manager: ManagerClient):
|
||||
|
||||
await wait_for_first_completed([l.wait_for("received request to join from host_id", m) for l, m in zip(logs[:3], marks[:3])])
|
||||
|
||||
# FIXME: we aren't actually awaiting this log -- this line is missing an `await`.
|
||||
# But this log was actually removed in commit d576ed31dce292997d1cf32af5a9e89768b154d7.
|
||||
# Should we be waiting for something else, or for nothing at all?
|
||||
logs[1].wait_for("raft_topology - decommission: wait for completion", marks[1])
|
||||
await logs[1].wait_for("raft_topology - decommission: waiting for completion", marks[1])
|
||||
|
||||
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]]
|
||||
|
||||
|
||||
37
test/topology_custom/test_lwt_semaphore.py
Normal file
37
test/topology_custom/test_lwt_semaphore.py
Normal file
@@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
import pytest
|
||||
from cassandra.protocol import WriteTimeout
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cas_semaphore(manager):
|
||||
""" This is a regression test for scylladb/scylladb#19698 """
|
||||
servers = await manager.servers_add(1, cmdline=['--smp', '1', '--write-request-timeout-in-ms', '500'])
|
||||
|
||||
host = await wait_for_cql_and_get_hosts(manager.cql, {servers[0]}, time.time() + 60)
|
||||
|
||||
await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)")
|
||||
|
||||
async with inject_error(manager.api, servers[0].ip_addr, 'cas_timeout_after_lock'):
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
try:
|
||||
await asyncio.gather(*res)
|
||||
except WriteTimeout:
|
||||
pass
|
||||
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
await asyncio.gather(*res)
|
||||
|
||||
metrics = await manager.metrics.query(servers[0].ip_addr)
|
||||
contention = metrics.get(name="scylla_storage_proxy_coordinator_cas_write_contention_count")
|
||||
|
||||
assert contention == None
|
||||
72
test/topology_custom/test_mv_tablets_empty_ip.py
Normal file
72
test/topology_custom/test_mv_tablets_empty_ip.py
Normal file
@@ -0,0 +1,72 @@
|
||||
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
import asyncio
|
||||
import pytest
|
||||
import time
|
||||
import logging
|
||||
|
||||
from cassandra.cluster import ConnectionException, NoHostAvailable # type: ignore
|
||||
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Regression test for https://github.com/scylladb/scylladb/issues/19439.
|
||||
# Creates a tabled-enabled keyspace with a base table and a materialized view,
|
||||
# continuously writes to it and replaces one of the nodes. During replacement,
|
||||
# nodes should not crash.
|
||||
#
|
||||
# RF needs to be smaller than the cluster size in order ensure appearance of
|
||||
# remote view updates.
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_mv_tablets_empty_ip(manager: ManagerClient):
|
||||
cfg = {'enable_tablets': True}
|
||||
servers = await manager.servers_add(4, config = cfg)
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}")
|
||||
await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)")
|
||||
await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)")
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
concurrency = 10
|
||||
async def do_writes(start_it) -> int:
|
||||
iteration = start_it
|
||||
while not stop_event.is_set():
|
||||
start_time = time.time()
|
||||
try:
|
||||
await cql.run_async(f"insert into ks.t (pk, v) values ({iteration}, {iteration+1})")
|
||||
except NoHostAvailable as e:
|
||||
for _, err in e.errors.items():
|
||||
# ConnectionException can be raised when the node is shutting down.
|
||||
if not isinstance(err, ConnectionException):
|
||||
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
|
||||
raise
|
||||
iteration += concurrency
|
||||
await asyncio.sleep(0.01)
|
||||
return iteration
|
||||
|
||||
logger.info("Starting to write")
|
||||
tasks = [asyncio.create_task(do_writes(i)) for i in range(concurrency)]
|
||||
|
||||
logger.info("Stopping the last node")
|
||||
await manager.server_stop_gracefully(servers[-1].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[-1].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
|
||||
logger.info("Replacing the last node")
|
||||
await manager.server_add(replace_cfg=replace_cfg, config = cfg)
|
||||
|
||||
logger.info("Stopping writes")
|
||||
stop_event.set()
|
||||
await asyncio.gather(*tasks)
|
||||
@@ -12,8 +12,8 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import reconnect_driver, enter_recovery_state, wait_for_upgrade_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
from test.topology.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
|
||||
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -77,8 +77,7 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
@@ -100,11 +99,11 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
logging.info(f"Removing {srv1} using {others[0]}")
|
||||
await manager.remove_node(others[0].server_id, srv1.server_id)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
|
||||
68
test/topology_custom/test_tablets_cql.py
Normal file
68
test/topology_custom/test_tablets_cql.py
Normal file
@@ -0,0 +1,68 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from cassandra.protocol import InvalidRequest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
|
||||
config = {
|
||||
'enable_tablets': 'true'
|
||||
}
|
||||
|
||||
logger.info("starting a node (the leader)")
|
||||
servers = [await manager.server_add(config=config)]
|
||||
|
||||
logger.info("starting a second node (the follower)")
|
||||
servers += [await manager.server_add(config=config)]
|
||||
|
||||
await manager.get_cql().run_async("create keyspace ks with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
|
||||
"tablets = {'enabled': true}")
|
||||
await manager.get_cql().run_async("create table ks.t (pk int primary key)")
|
||||
|
||||
logger.info(f"injecting wait-after-topology-coordinator-gets-event into the leader node {servers[0]}")
|
||||
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
|
||||
'wait-after-topology-coordinator-gets-event')
|
||||
|
||||
async def alter_tablets_ks_without_waiting_to_complete():
|
||||
res = await manager.get_cql().run_async("select data_center from system.local")
|
||||
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
|
||||
this_dc = res[0].data_center
|
||||
await manager.get_cql().run_async("alter keyspace ks "
|
||||
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 1}}")
|
||||
|
||||
# by creating a task this way we ensure it's immediately executed, but we won't wait until it's completed
|
||||
task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete())
|
||||
|
||||
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
|
||||
leader_log_file = await manager.server_open_log(servers[0].server_id)
|
||||
await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event injection hit", timeout=10)
|
||||
|
||||
logger.info(f"dropping KS from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
|
||||
f"wakes up with the drop applied")
|
||||
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
|
||||
await manager.get_cql().run_async("drop keyspace ks", host=host)
|
||||
|
||||
logger.info("Waking up the leader to continue processing ALTER with KS that doesn't exist (has been just dropped)")
|
||||
await injection_handler.message()
|
||||
|
||||
matches = await leader_log_file.grep("topology change coordinator fiber got error "
|
||||
"data_dictionary::no_such_keyspace \(Can't find a keyspace ks\)")
|
||||
assert not matches
|
||||
|
||||
with pytest.raises(InvalidRequest, match="Can't ALTER keyspace ks, keyspace doesn't exist") as e:
|
||||
await task
|
||||
@@ -23,6 +23,8 @@ import requests
|
||||
import json
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -206,13 +208,102 @@ async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
|
||||
}
|
||||
servers = await manager.servers_add(2, config=config)
|
||||
for server in servers:
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
response = requests.get(url, verify=False)
|
||||
assert response.ok
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
# We expect /localnodes to return ["1.2.3.4", "1.2.3.4"]
|
||||
# (since we configured both nodes with the same broadcast_rpc_address):
|
||||
assert j == ['1.2.3.4', '1.2.3.4']
|
||||
# (since we configured both nodes with the same broadcast_rpc_address).
|
||||
# We need the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
timeout = time.time() + 60
|
||||
while True:
|
||||
assert time.time() < timeout
|
||||
response = requests.get(url, verify=False)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if j == ['1.2.3.4', '1.2.3.4']:
|
||||
break # done
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_localnodes_drained_node(manager: ManagerClient):
|
||||
"""Test that if in a cluster one node is brought down with "nodetool drain"
|
||||
a "/localnodes" request should NOT return that node. This test does
|
||||
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
|
||||
and even before the fix of that issue, "/localnodes" didn't return it.
|
||||
"""
|
||||
# Start a cluster with two nodes and verify that at this point,
|
||||
# "/localnodes" on the first node returns both nodes.
|
||||
# We the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
servers = await manager.servers_add(2, config=alternator_config)
|
||||
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
async def check_localnodes_two():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return True
|
||||
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
|
||||
return None # try again
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_two, time.time() + 60)
|
||||
# Now "nodetool" drain on the second node, leaving the second node
|
||||
# in DRAINED state.
|
||||
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
|
||||
# After that, "/localnodes" should no longer return the second node.
|
||||
# It might take a short while until the first node learns what happened
|
||||
# to node 1, so we may need to retry for a while
|
||||
async def check_localnodes_one():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return None # try again
|
||||
elif set(j) == {servers[0].ip_addr}:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_one, time.time() + 60)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_localnodes_joining_nodes(manager: ManagerClient):
|
||||
"""Test that if a cluster is being enlarged and a node is coming up but
|
||||
not yet responsive, a "/localnodes" request should NOT return that node.
|
||||
Reproduces issue #19694.
|
||||
"""
|
||||
# Start a cluster with one node, and then bring up a second node,
|
||||
# pausing its bootstrap (with an injection) in JOINING state.
|
||||
# We need to start the second node in the background, because server_add()
|
||||
# will wait for the bootstrap to complete - which we don't want to do.
|
||||
server = await manager.server_add(config=alternator_config)
|
||||
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_120s']}))
|
||||
# Sleep until the first node knows of the second one as a "live node"
|
||||
# (we check this with the REST API's /gossiper/endpoint/live.
|
||||
async def check_two_live_nodes():
|
||||
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
||||
if len(j) == 1:
|
||||
return None # try again
|
||||
elif len(j) == 2:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_two_live_nodes, time.time() + 60)
|
||||
|
||||
# At this point the second node is live, but hasn't finished bootstrapping
|
||||
# (we delayed that with the injection). So the "/localnodes" should still
|
||||
# return just one node - not both. Reproduces #19694 (two nodes used to
|
||||
# be returned)
|
||||
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
assert len(j) == 1
|
||||
# Ending the test here will kill both servers. We don't wait for the
|
||||
# second server to finish its long injection-caused bootstrap delay,
|
||||
# so we don't check here that when the second server finally comes up,
|
||||
# both nodes will finally be visible in /localnodes. This case is checked
|
||||
# in other tests, where bootstrap finishes normally - we don't need to
|
||||
# check this case again here.
|
||||
task.cancel()
|
||||
|
||||
# TODO: add a more thorough test for /localnodes, creating a cluster with
|
||||
# multiple nodes in multiple data centers, and check that we can get a list
|
||||
|
||||
Submodule tools/python3 updated: 18fa79ee96...ea49f0caeb
Reference in New Issue
Block a user