Compare commits
38 Commits
debug_form
...
scylla-5.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
908a82bea0 | ||
|
|
39158f55d0 | ||
|
|
22c1685b3d | ||
|
|
9ba6fc73f1 | ||
|
|
f2e2c0127a | ||
|
|
363ea87f51 | ||
|
|
c49fd6f176 | ||
|
|
3114589a30 | ||
|
|
34f68a4c0f | ||
|
|
b336e11f59 | ||
|
|
9ef73d7e36 | ||
|
|
8700a72b4c | ||
|
|
886dd3e1d2 | ||
|
|
f565f3de06 | ||
|
|
76ff6d981c | ||
|
|
f924f59055 | ||
|
|
d5cef05810 | ||
|
|
e0f4e99e9b | ||
|
|
6795715011 | ||
|
|
aa9e91c376 | ||
|
|
ddfb9ebab2 | ||
|
|
d58a3e4d16 | ||
|
|
2ebac52d2d | ||
|
|
b536614913 | ||
|
|
85df0fd2b1 | ||
|
|
cdf9fe7023 | ||
|
|
8ff4717fd0 | ||
|
|
291b1f6e7f | ||
|
|
b2699743cc | ||
|
|
50ae73a4bd | ||
|
|
c3dd4a2b87 | ||
|
|
0f9fe61d91 | ||
|
|
59d30ff241 | ||
|
|
fb82dff89e | ||
|
|
b588b19620 | ||
|
|
608ef92a71 | ||
|
|
d2732b2663 | ||
|
|
34ab98e1be |
@@ -72,7 +72,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.2.0-dev
|
||||
VERSION=5.2.0-rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -145,19 +145,24 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
auto table = find_table(_proxy, request);
|
||||
auto db = _proxy.data_dictionary();
|
||||
auto cfs = db.get_tables();
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (limit < 1) {
|
||||
throw api_error::validation("Limit must be 1 or more");
|
||||
}
|
||||
|
||||
// TODO: the unordered_map here is not really well suited for partial
|
||||
// querying - we're sorting on local hash order, and creating a table
|
||||
// between queries may or may not miss info. But that should be rare,
|
||||
// and we can probably expect this to be a single call.
|
||||
// # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never
|
||||
// generate duplicates in a paged listing here. Can obviously miss things if they
|
||||
// are added between paged calls and end up with a "smaller" UUID/ARN, but that
|
||||
// is to be expected.
|
||||
std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
||||
return t1.schema()->id().uuid() < t2.schema()->id().uuid();
|
||||
});
|
||||
|
||||
auto i = cfs.begin();
|
||||
auto e = cfs.end();
|
||||
|
||||
if (streams_start) {
|
||||
i = std::find_if(i, e, [&](data_dictionary::table t) {
|
||||
i = std::find_if(i, e, [&](const data_dictionary::table& t) {
|
||||
return t.schema()->id().uuid() == streams_start
|
||||
&& cdc::get_base_table(db.real_database(), *t.schema())
|
||||
&& is_alternator_keyspace(t.schema()->ks_name())
|
||||
|
||||
@@ -409,7 +409,9 @@ public:
|
||||
l0_old_ssts.push_back(std::move(sst));
|
||||
}
|
||||
}
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
if (l0_old_ssts.size() || l0_new_ssts.size()) {
|
||||
_l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -553,4 +553,16 @@ murmur3_partitioner_ignore_msb_bits: 12
|
||||
# WARNING: It's unsafe to set this to false if the node previously booted
|
||||
# with the schema commit log enabled. In such case, some schema changes
|
||||
# may be lost if the node was not cleanly stopped.
|
||||
force_schema_commit_log: true
|
||||
force_schema_commit_log: true
|
||||
|
||||
# Use Raft to consistently manage schema information in the cluster.
|
||||
# Refer to https://docs.scylladb.com/master/architecture/raft.html for more details.
|
||||
# The 'Handling Failures' section is especially important.
|
||||
#
|
||||
# Once enabled in a cluster, this cannot be turned off.
|
||||
# If you want to bootstrap a new cluster without Raft, make sure to set this to `false`
|
||||
# before starting your nodes for the first time.
|
||||
#
|
||||
# A cluster not using Raft can be 'upgraded' to use Raft. Refer to the aforementioned
|
||||
# documentation, section 'Enabling Raft in ScyllaDB 5.2 and further', for the procedure.
|
||||
consistent_cluster_management: true
|
||||
|
||||
@@ -80,7 +80,7 @@ public:
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
auto&& name = _type->field_name(_field);
|
||||
auto sname = sstring(reinterpret_cast<const char*>(name.begin(), name.size()));
|
||||
auto sname = std::string_view(reinterpret_cast<const char*>(name.data()), name.size());
|
||||
return format("{}.{}", _selected, sname);
|
||||
}
|
||||
|
||||
|
||||
@@ -2116,6 +2116,9 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
clogger.debug("Discarding segments {}", ftd);
|
||||
|
||||
for (auto& [f, mode] : ftd) {
|
||||
// `f.remove_file()` resets known_size to 0, so remember the size here,
|
||||
// in order to subtract it from total_size_on_disk accurately.
|
||||
size_t size = f.known_size();
|
||||
try {
|
||||
if (f) {
|
||||
co_await f.close();
|
||||
@@ -2132,7 +2135,6 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
}
|
||||
}
|
||||
|
||||
auto size = f.known_size();
|
||||
auto usage = totals.total_size_on_disk;
|
||||
auto next_usage = usage - size;
|
||||
|
||||
@@ -2165,7 +2167,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
// or had such an exception that we consider the file dead
|
||||
// anyway. In either case we _remove_ the file size from
|
||||
// footprint, because it is no longer our problem.
|
||||
totals.total_size_on_disk -= f.known_size();
|
||||
totals.total_size_on_disk -= size;
|
||||
}
|
||||
|
||||
// #8376 - if we had an error in recycling (disk rename?), and no elements
|
||||
|
||||
@@ -401,6 +401,10 @@ public:
|
||||
named_value<uint64_t> wasm_udf_yield_fuel;
|
||||
named_value<uint64_t> wasm_udf_total_fuel;
|
||||
named_value<size_t> wasm_udf_memory_limit;
|
||||
// wasm_udf_reserved_memory is static because the options in db::config
|
||||
// are parsed using seastar::app_template, while this option is used for
|
||||
// configuring the Seastar memory subsystem.
|
||||
static constexpr size_t wasm_udf_reserved_memory = 50 * 1024 * 1024;
|
||||
|
||||
seastar::logging_settings logging_settings(const log_cli::options&) const;
|
||||
|
||||
|
||||
@@ -2276,7 +2276,10 @@ public:
|
||||
add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability()));
|
||||
co_await add_partition(mutation_sink, "memory", [this] () {
|
||||
struct stats {
|
||||
uint64_t total = 0;
|
||||
// take the pre-reserved memory into account, as seastar only returns
|
||||
// the stats of memory managed by the seastar allocator, but we instruct
|
||||
// it to reserve addition memory for system.
|
||||
uint64_t total = db::config::wasm_udf_reserved_memory;
|
||||
uint64_t free = 0;
|
||||
static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free}; }
|
||||
};
|
||||
|
||||
@@ -85,29 +85,25 @@ future<row_locker::lock_holder>
|
||||
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
|
||||
mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk);
|
||||
auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_row : stats.shared_row);
|
||||
auto ck = cpk;
|
||||
// Create a two-level lock entry for the partition if it doesn't exist already.
|
||||
auto i = _two_level_locks.try_emplace(pk, this).first;
|
||||
// The two-level lock entry we've just created is guaranteed to be kept alive as long as it's locked.
|
||||
// Initiating read locking in the background below ensures that even if the two-level lock is currently
|
||||
// write-locked, releasing the write-lock will synchronously engage any waiting
|
||||
// locks and will keep the entry alive.
|
||||
future<lock_type::holder> lock_partition = i->second._partition_lock.hold_read_lock(timeout);
|
||||
auto j = i->second._row_locks.find(cpk);
|
||||
if (j == i->second._row_locks.end()) {
|
||||
// Not yet locked, need to create the lock. This makes a copy of cpk.
|
||||
try {
|
||||
j = i->second._row_locks.emplace(cpk, lock_type()).first;
|
||||
} catch(...) {
|
||||
// If this emplace() failed, e.g., out of memory, we fail. We
|
||||
// could do nothing - the partition lock we already started
|
||||
// taking will be unlocked automatically after being locked.
|
||||
// But it's better form to wait for the work we started, and it
|
||||
// will also allow us to remove the hash-table row we added.
|
||||
return lock_partition.then([ex = std::current_exception()] (auto lock) {
|
||||
// The lock is automatically released when "lock" goes out of scope.
|
||||
// TODO: unlock (lock = {}) now, search for the partition in the
|
||||
// hash table (we know it's still there, because we held the lock until
|
||||
// now) and remove the unused lock from the hash table if still unused.
|
||||
return make_exception_future<row_locker::lock_holder>(std::current_exception());
|
||||
});
|
||||
return lock_partition.then([this, pk = &i->first, row_locks = &i->second._row_locks, ck = std::move(ck), exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable {
|
||||
auto j = row_locks->find(ck);
|
||||
if (j == row_locks->end()) {
|
||||
// Not yet locked, need to create the lock.
|
||||
j = row_locks->emplace(std::move(ck), lock_type()).first;
|
||||
}
|
||||
}
|
||||
return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable {
|
||||
auto* cpk = &j->first;
|
||||
auto& row_lock = j->second;
|
||||
// Like to the two-level lock entry above, the row_lock entry we've just created
|
||||
// is guaranteed to be kept alive as long as it's locked.
|
||||
// Initiating read/write locking in the background below ensures that.
|
||||
auto lock_row = exclusive ? row_lock.hold_write_lock(timeout) : row_lock.hold_read_lock(timeout);
|
||||
return lock_row.then([this, pk, cpk, exclusive, tracker = std::move(tracker), lock1 = std::move(lock1)] (auto lock2) mutable {
|
||||
lock1.release();
|
||||
|
||||
3
dist/common/scripts/scylla_coredump_setup
vendored
3
dist/common/scripts/scylla_coredump_setup
vendored
@@ -42,7 +42,8 @@ if __name__ == '__main__':
|
||||
if systemd_unit.available('systemd-coredump@.service'):
|
||||
dropin = '''
|
||||
[Service]
|
||||
TimeoutStartSec=infinity
|
||||
RuntimeMaxSec=infinity
|
||||
TimeoutSec=infinity
|
||||
'''[1:-1]
|
||||
os.makedirs('/etc/systemd/system/systemd-coredump@.service.d', exist_ok=True)
|
||||
with open('/etc/systemd/system/systemd-coredump@.service.d/timeout.conf', 'w') as f:
|
||||
|
||||
@@ -1112,14 +1112,14 @@ tls-ssl/index.html: /stable/operating-scylla/security
|
||||
/using-scylla/integrations/integration_kairos/index.html: /stable/using-scylla/integrations/integration-kairos
|
||||
/upgrade/ami_upgrade/index.html: /stable/upgrade/ami-upgrade
|
||||
|
||||
/scylla-cloud/cloud-setup/gcp-vpc-peering/index.html: /stable/scylla-cloud/cloud-setup/GCP/gcp-vpc-peering
|
||||
/scylla-cloud/cloud-setup/GCP/gcp-vcp-peering/index.html: /stable/scylla-cloud/cloud-setup/GCP/gcp-vpc-peering
|
||||
/scylla-cloud/cloud-setup/gcp-vpc-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/gcp-vpc-peering.html
|
||||
/scylla-cloud/cloud-setup/GCP/gcp-vcp-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/gcp-vpc-peering.html
|
||||
|
||||
# move scylla cloud for AWS to dedicated directory
|
||||
/scylla-cloud/cloud-setup/aws-vpc-peering/index.html: /stable/scylla-cloud/cloud-setup/AWS/aws-vpc-peering
|
||||
/scylla-cloud/cloud-setup/cloud-prom-proxy/index.html: /stable/scylla-cloud/cloud-setup/AWS/cloud-prom-proxy
|
||||
/scylla-cloud/cloud-setup/outposts/index.html: /stable/scylla-cloud/cloud-setup/AWS/outposts
|
||||
/scylla-cloud/cloud-setup/scylla-cloud-byoa/index.html: /stable/scylla-cloud/cloud-setup/AWS/scylla-cloud-byoa
|
||||
/scylla-cloud/cloud-setup/aws-vpc-peering/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/aws-vpc-peering.html
|
||||
/scylla-cloud/cloud-setup/cloud-prom-proxy/index.html: https://cloud.docs.scylladb.com/stable/monitoring/cloud-prom-proxy.html
|
||||
/scylla-cloud/cloud-setup/outposts/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/outposts.html
|
||||
/scylla-cloud/cloud-setup/scylla-cloud-byoa/index.html: https://cloud.docs.scylladb.com/stable/cloud-setup/scylla-cloud-byoa.html
|
||||
/scylla-cloud/cloud-services/scylla_cloud_costs/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-costs
|
||||
/scylla-cloud/cloud-services/scylla_cloud_managin_versions/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-managin-versions
|
||||
/scylla-cloud/cloud-services/scylla_cloud_support_alerts_sla/index.html: /stable/scylla-cloud/cloud-services/scylla-cloud-support-alerts-sla
|
||||
|
||||
@@ -25,7 +25,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/>`_ - Links to the ScyllaDB Download Center
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
|
||||
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
|
||||
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
|
||||
@@ -20,7 +20,7 @@ Install ScyllaDB
|
||||
|
||||
Keep your versions up-to-date. The two latest versions are supported. Also always install the latest patches for your version.
|
||||
|
||||
* Download and install ScyllaDB Server, Drivers and Tools in `Scylla Download Center <https://www.scylladb.com/download/#server/>`_
|
||||
* Download and install ScyllaDB Server, Drivers and Tools in `ScyllaDB Download Center <https://www.scylladb.com/download/#core>`_
|
||||
* :doc:`ScyllaDB Web Installer for Linux <scylla-web-installer>`
|
||||
* :doc:`ScyllaDB Unified Installer (relocatable executable) <unified-installer>`
|
||||
* :doc:`Air-gapped Server Installation <air-gapped-install>`
|
||||
|
||||
@@ -4,7 +4,7 @@ ScyllaDB Web Installer for Linux
|
||||
|
||||
ScyllaDB Web Installer is a platform-agnostic installation script you can run with ``curl`` to install ScyllaDB on Linux.
|
||||
|
||||
See `ScyllaDB Download Center <https://www.scylladb.com/download/#server>`_ for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
See `ScyllaDB Download Center <https://www.scylladb.com/download/#core>`_ for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
|
||||
Prerequisites
|
||||
--------------
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
* endpoint_snitch - ``grep endpoint_snitch /etc/scylla/scylla.yaml``
|
||||
* Scylla version - ``scylla --version``
|
||||
* Authenticator - ``grep authenticator /etc/scylla/scylla.yaml``
|
||||
* consistent_cluster_management - ``grep consistent_cluster_management /etc/scylla/scylla.yaml``
|
||||
|
||||
.. Note::
|
||||
|
||||
|
||||
@@ -119,6 +119,7 @@ Add New DC
|
||||
* **listen_address** - IP address that Scylla used to connect to the other Scylla nodes in the cluster.
|
||||
* **endpoint_snitch** - Set the selected snitch.
|
||||
* **rpc_address** - Address for client connections (Thrift, CQL).
|
||||
* **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
The parameters ``seeds``, ``cluster_name`` and ``endpoint_snitch`` need to match the existing cluster.
|
||||
|
||||
|
||||
@@ -54,6 +54,8 @@ Procedure
|
||||
|
||||
* **seeds** - Specifies the IP address of an existing node in the cluster. The new node will use this IP to connect to the cluster and learn the cluster topology and state.
|
||||
|
||||
* **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
.. note::
|
||||
|
||||
In earlier versions of ScyllaDB, seed nodes assisted in gossip. Starting with Scylla Open Source 4.3 and Scylla Enterprise 2021.1, the seed concept in gossip has been removed. If you are using an earlier version of ScyllaDB, you need to configure the seeds parameter in the following way:
|
||||
|
||||
@@ -70,6 +70,7 @@ the file can be found under ``/etc/scylla/``
|
||||
- **listen_address** - IP address that the Scylla use to connect to other Scylla nodes in the cluster
|
||||
- **endpoint_snitch** - Set the selected snitch
|
||||
- **rpc_address** - Address for client connection (Thrift, CQLSH)
|
||||
- **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
3. In the ``cassandra-rackdc.properties`` file, edit the rack and data center information.
|
||||
The file can be found under ``/etc/scylla/``.
|
||||
|
||||
@@ -26,6 +26,7 @@ The file can be found under ``/etc/scylla/``
|
||||
- **listen_address** - IP address that Scylla used to connect to other Scylla nodes in the cluster
|
||||
- **endpoint_snitch** - Set the selected snitch
|
||||
- **rpc_address** - Address for client connection (Thrift, CQL)
|
||||
- **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
3. This step needs to be done **only** if you are using the **GossipingPropertyFileSnitch**. If not, skip this step.
|
||||
In the ``cassandra-rackdc.properties`` file, edit the parameters listed below.
|
||||
|
||||
@@ -63,6 +63,7 @@ Perform the following steps for each node in the new cluster:
|
||||
* **rpc_address** - Address for client connection (Thrift, CQL).
|
||||
* **broadcast_address** - The IP address a node tells other nodes in the cluster to contact it by.
|
||||
* **broadcast_rpc_address** - Default: unset. The RPC address to broadcast to drivers and other Scylla nodes. It cannot be set to 0.0.0.0. If left blank, it will be set to the value of ``rpc_address``. If ``rpc_address`` is set to 0.0.0.0, ``broadcast_rpc_address`` must be explicitly configured.
|
||||
* **consistent_cluster_management** - ``true`` by default, can be set to ``false`` if you don't want to use Raft for consistent schema management in this cluster (will be mandatory in later versions). Check the :doc:`Raft in ScyllaDB document</architecture/raft/>` to learn more.
|
||||
|
||||
#. After you have installed and configured Scylla and edited ``scylla.yaml`` file on all the nodes, start the node specified with the ``seeds`` parameter. Then start the rest of the nodes in your cluster, one at a time, using
|
||||
``sudo systemctl start scylla-server``.
|
||||
|
||||
@@ -25,6 +25,7 @@ Login to one of the nodes in the cluster with (UN) status, collect the following
|
||||
* seeds - ``cat /etc/scylla/scylla.yaml | grep seeds:``
|
||||
* endpoint_snitch - ``cat /etc/scylla/scylla.yaml | grep endpoint_snitch``
|
||||
* Scylla version - ``scylla --version``
|
||||
* consistent_cluster_management - ``grep consistent_cluster_management /etc/scylla/scylla.yaml``
|
||||
|
||||
Procedure
|
||||
---------
|
||||
|
||||
@@ -66,6 +66,8 @@ Procedure
|
||||
|
||||
- **rpc_address** - Address for client connection (Thrift, CQL)
|
||||
|
||||
- **consistent_cluster_management** - set to the same value as used by your existing nodes.
|
||||
|
||||
#. Add the ``replace_node_first_boot`` parameter to the ``scylla.yaml`` config file on the new node. This line can be added to any place in the config file. After a successful node replacement, there is no need to remove it from the ``scylla.yaml`` file. (Note: The obsolete parameters "replace_address" and "replace_address_first_boot" are not supported and should not be used). The value of the ``replace_node_first_boot`` parameter should be the Host ID of the node to be replaced.
|
||||
|
||||
For example (using the Host ID of the failed node from above):
|
||||
|
||||
@@ -68,7 +68,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
@@ -92,13 +92,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
1. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
2. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
3. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
3. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
4. Check again after 2 minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -130,7 +130,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
@@ -164,7 +164,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -66,7 +66,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
@@ -16,13 +16,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
#. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check again after 2 minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -54,7 +54,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
@@ -88,7 +88,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -69,7 +69,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
@@ -36,13 +36,13 @@ A new io.conf format was introduced in Scylla 2.3 and 2019.1. If your io.conf do
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version.
|
||||
#. Check scylla-enterprise-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check scylla-server log (by ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no errors.
|
||||
#. Check again after two minutes to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in the cluster.
|
||||
@@ -75,7 +75,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
@@ -120,7 +120,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -102,7 +102,7 @@ Gracefully stop the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
.. _upgrade-debian-ubuntu-enterprise-2022.2:
|
||||
|
||||
@@ -138,7 +138,7 @@ Download and install the new release
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla-enterprise-server
|
||||
sudo apt-get dist-upgrade scylla-enterprise
|
||||
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
@@ -213,13 +213,13 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including the one you just upgraded, are in ``UN`` status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"`` to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
|
||||
#. Check scylla-enterprise-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check scylla-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog`` to validate there are no new errors in the log.
|
||||
#. Check again after two minutes, to validate no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade was successful, move to the next node in the cluster.
|
||||
@@ -260,7 +260,7 @@ Drain and gracefully stop the node
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service scylla-enterprise-server stop
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the old release
|
||||
------------------------------------
|
||||
@@ -359,7 +359,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-enterprise-server start
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -63,7 +63,7 @@ Stop ScyllaDB
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl stop scylla-enterprise-server
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
@@ -84,7 +84,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl start scylla-enterprise-server
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
@@ -125,7 +125,7 @@ Gracefully shutdown ScyllaDB
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo systemctl stop scylla-enterprise-server
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
Downgrade to the previous release
|
||||
-----------------------------------
|
||||
@@ -149,7 +149,7 @@ Start the node
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo systemctl start scylla-enterprise-server
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
Scylla Metric Update - Scylla 5.1 to 5.2
|
||||
========================================
|
||||
ScyllaDB Metric Update - Scylla 5.1 to 5.2
|
||||
============================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
@@ -7,8 +7,8 @@ Scylla Metric Update - Scylla 5.1 to 5.2
|
||||
|
||||
Scylla 5.2 Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
The following metrics are new in Scylla 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The following metrics are new in ScyllaDB 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
@@ -16,5 +16,42 @@ The following metrics are new in Scylla 5.2
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - TODO
|
||||
- TODO
|
||||
* - scylla_database_disk_reads
|
||||
- Holds the number of currently active disk read operations.
|
||||
* - scylla_database_sstables_read
|
||||
- Holds the number of currently read sstables.
|
||||
* - scylla_memory_malloc_failed
|
||||
- Total count of failed memory allocations
|
||||
* - scylla_raft_group0_status
|
||||
- status of the raft group, 0 - disabled, 1 - normal, 2 - aborted
|
||||
* - scylla_storage_proxy_coordinator_cas_read_latency_summary
|
||||
- CAS read latency summary
|
||||
* - scylla_storage_proxy_coordinator_cas_write_latency_summary
|
||||
- CAS write latency summary
|
||||
* - scylla_storage_proxy_coordinator_read_latency_summary
|
||||
- Read latency summary
|
||||
* - scylla_storage_proxy_coordinator_write_latency_summary
|
||||
- Write latency summary
|
||||
* - scylla_streaming_finished_percentage
|
||||
- Finished percentage of node operation on this shard
|
||||
* - scylla_view_update_generator_sstables_pending_work
|
||||
- Number of bytes remaining to be processed from SSTables for view updates
|
||||
|
||||
|
||||
The following metrics are renamed in ScyllaDB 5.2
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - 5.1
|
||||
- 5.2
|
||||
* - scylla_database_active_reads_memory_consumption
|
||||
- scylla_database_reads_memory_consumption
|
||||
* - scylla_memory_regular_virtual_dirty_bytes
|
||||
- scylla_memory_regular_unspooled_dirty_bytes
|
||||
* - scylla_memory_system_virtual_dirty_bytes
|
||||
- scylla_memory_system_unspooled_dirty_bytes
|
||||
* - scylla_memory_virtual_dirty_bytes
|
||||
- scylla_memory_unspooled_dirty_bytes
|
||||
|
||||
@@ -67,7 +67,11 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
If you enabled consistent cluster management in each node's configuration file, then as soon as every node has been upgraded to the new version, the cluster will start a procedure which initializes the Raft algorithm for consistent cluster metadata management.
|
||||
You must then :ref:`verify <validate-raft-setup>` that this procedure successfully finishes.
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note::
|
||||
|
||||
If you use the `ScyllaDB Monitoring Stack <https://monitoring.docs.scylladb.com/>`_, we recommend upgrading the Monitoring Stack to the latest version **before** upgrading ScyllaDB.
|
||||
|
||||
For ScyllaDB 5.2, you MUST upgrade the Monitoring Stack to version 4.3 or later.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
2
main.cc
2
main.cc
@@ -476,7 +476,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// We need to have the entire app config to run the app, but we need to
|
||||
// run the app to read the config file with UDF specific options so that
|
||||
// we know whether we need to reserve additional memory for UDFs.
|
||||
app_cfg.reserve_additional_memory = 50 * 1024 * 1024;
|
||||
app_cfg.reserve_additional_memory = db::config::wasm_udf_reserved_memory;
|
||||
app_template app(std::move(app_cfg));
|
||||
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
|
||||
@@ -177,7 +177,6 @@ private:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
|
||||
stop_iteration gc_consumer_stop = stop_iteration::no;
|
||||
stop_iteration consumer_stop = stop_iteration::no;
|
||||
if (rtc.tombstone() <= _partition_tombstone) {
|
||||
@@ -199,6 +198,7 @@ private:
|
||||
partition_is_not_empty(consumer);
|
||||
_current_emitted_tombstone = rtc.tombstone();
|
||||
consumer_stop = consumer.consume(std::move(rtc));
|
||||
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
|
||||
}
|
||||
return gc_consumer_stop || consumer_stop;
|
||||
}
|
||||
|
||||
@@ -1144,7 +1144,7 @@ future<> server_impl::applier_fiber() {
|
||||
co_await _state_machine->apply(std::move(commands));
|
||||
} catch (abort_requested_exception& e) {
|
||||
logger.info("[{}] applier fiber stopped because state machine was aborted: {}", _id, e);
|
||||
co_return;
|
||||
throw stop_apply_fiber{};
|
||||
} catch (...) {
|
||||
std::throw_with_nested(raft::state_machine_error{});
|
||||
}
|
||||
|
||||
@@ -1253,10 +1253,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
|
||||
|
||||
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(t._schema));
|
||||
cg.main_sstables()->for_each_sstable([this] (const sstables::shared_sstable& s) {
|
||||
add_sstable_to_backlog_tracker(new_bt, s);
|
||||
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
|
||||
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->all()->size());
|
||||
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
|
||||
new_sstables->insert(s);
|
||||
new_sstables_for_backlog_tracker.push_back(s);
|
||||
});
|
||||
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
|
||||
}
|
||||
|
||||
void execute() noexcept {
|
||||
|
||||
@@ -63,4 +63,15 @@ MemoryLimit=$MEMORY_LIMIT
|
||||
EOS
|
||||
fi
|
||||
|
||||
if [ -e /etc/systemd/system/systemd-coredump@.service.d/timeout.conf ]; then
|
||||
COREDUMP_RUNTIME_MAX=$(grep RuntimeMaxSec /etc/systemd/system/systemd-coredump@.service.d/timeout.conf)
|
||||
if [ -z $COREDUMP_RUNTIME_MAX ]; then
|
||||
cat << EOS > /etc/systemd/system/systemd-coredump@.service.d/timeout.conf
|
||||
[Service]
|
||||
RuntimeMaxSec=infinity
|
||||
TimeoutSec=infinity
|
||||
EOS
|
||||
fi
|
||||
fi
|
||||
|
||||
systemctl --system daemon-reload >/dev/null || true
|
||||
|
||||
@@ -103,7 +103,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
|
||||
auto ex = f2.get_exception();
|
||||
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
|
||||
}
|
||||
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) mutable {
|
||||
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) {
|
||||
if (!p || p->update.schema_version() == schema->version()) {
|
||||
return make_ready_future<std::optional<proposal>>(std::move(p));
|
||||
}
|
||||
@@ -115,7 +115,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
|
||||
// for that version and upgrade the mutation with it.
|
||||
logger.debug("Stored mutation references outdated schema version. "
|
||||
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
|
||||
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema = std::move(schema), p = std::move(p)] (const column_mapping& cm) {
|
||||
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
|
||||
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
|
||||
});
|
||||
};
|
||||
|
||||
@@ -969,7 +969,11 @@ with_timeout(abort_source& as, db::timeout_clock::duration d, F&& fun) {
|
||||
// FIXME: using lambda as workaround for clang bug #50345 (miscompiling coroutine templates).
|
||||
auto impl = [] (abort_source& as, db::timeout_clock::duration d, F&& fun) -> future_t {
|
||||
abort_source timeout_src;
|
||||
auto sub = as.subscribe([&timeout_src] () noexcept { timeout_src.request_abort(); });
|
||||
auto sub = as.subscribe([&timeout_src] () noexcept {
|
||||
if (!timeout_src.abort_requested()) {
|
||||
timeout_src.request_abort();
|
||||
}
|
||||
});
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
|
||||
@@ -136,7 +136,9 @@ struct sstable_open_config {
|
||||
// fields respectively. Problematic sstables might fail to load. Set to
|
||||
// false if you want to disable this, to be able to read such sstables.
|
||||
// Should only be disabled for diagnostics purposes.
|
||||
bool load_first_and_last_position_metadata = true;
|
||||
// FIXME: Enable it by default once the root cause of large allocation when reading sstable in reverse is fixed.
|
||||
// Ref: https://github.com/scylladb/scylladb/issues/11642
|
||||
bool load_first_and_last_position_metadata = false;
|
||||
};
|
||||
|
||||
class sstable : public enable_lw_shared_from_this<sstable> {
|
||||
|
||||
60
test.py
60
test.py
@@ -343,7 +343,16 @@ class PythonTestSuite(TestSuite):
|
||||
pool_size = cfg.get("pool_size", 2)
|
||||
|
||||
self.create_cluster = self.get_cluster_factory(cluster_size)
|
||||
self.clusters = Pool(pool_size, self.create_cluster)
|
||||
async def recycle_cluster(cluster: ScyllaCluster) -> None:
|
||||
"""When a dirty cluster is returned to the cluster pool,
|
||||
stop it and release the used IPs. We don't necessarily uninstall() it yet,
|
||||
which would delete the log file and directory - we might want to preserve
|
||||
these if it came from a failed test.
|
||||
"""
|
||||
await cluster.stop()
|
||||
await cluster.release_ips()
|
||||
|
||||
self.clusters = Pool(pool_size, self.create_cluster, recycle_cluster)
|
||||
|
||||
def get_cluster_factory(self, cluster_size: int) -> Callable[..., Awaitable]:
|
||||
def create_server(create_cfg: ScyllaCluster.CreateServerParams):
|
||||
@@ -686,7 +695,8 @@ class CQLApprovalTest(Test):
|
||||
if self.server_log is not None:
|
||||
logger.info("Server log:\n%s", self.server_log)
|
||||
|
||||
async with self.suite.clusters.instance(logger) as cluster:
|
||||
# TODO: consider dirty_on_exception=True
|
||||
async with self.suite.clusters.instance(False, logger) as cluster:
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
@@ -842,26 +852,32 @@ class PythonTest(Test):
|
||||
|
||||
loggerPrefix = self.mode + '/' + self.uname
|
||||
logger = LogPrefixAdapter(logging.getLogger(loggerPrefix), {'prefix': loggerPrefix})
|
||||
async with self.suite.clusters.instance(logger) as cluster:
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
self.args.insert(0, "--host={}".format(cluster.endpoint()))
|
||||
self.is_before_test_ok = True
|
||||
cluster.take_log_savepoint()
|
||||
status = await run_test(self, options)
|
||||
cluster.after_test(self.uname)
|
||||
self.is_after_test_ok = True
|
||||
self.success = status
|
||||
except Exception as e:
|
||||
self.server_log = cluster.read_server_log()
|
||||
self.server_log_filename = cluster.server_log_filename()
|
||||
if self.is_before_test_ok is False:
|
||||
print("Test {} pre-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
# Don't try to continue if the cluster is broken
|
||||
raise
|
||||
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
||||
cluster = await self.suite.clusters.get(logger)
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
self.args.insert(0, "--host={}".format(cluster.endpoint()))
|
||||
self.is_before_test_ok = True
|
||||
cluster.take_log_savepoint()
|
||||
status = await run_test(self, options)
|
||||
cluster.after_test(self.uname)
|
||||
self.is_after_test_ok = True
|
||||
self.success = status
|
||||
except Exception as e:
|
||||
self.server_log = cluster.read_server_log()
|
||||
self.server_log_filename = cluster.server_log_filename()
|
||||
if self.is_before_test_ok is False:
|
||||
print("Test {} pre-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
logger.info(f"Discarding cluster after failed start for test %s...", self.name)
|
||||
elif self.is_after_test_ok is False:
|
||||
print("Test {} post-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
logger.info(f"Discarding cluster after failed test %s...", self.name)
|
||||
await self.suite.clusters.put(cluster, is_dirty=True)
|
||||
else:
|
||||
await self.suite.clusters.put(cluster, is_dirty=False)
|
||||
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
||||
return self
|
||||
|
||||
def write_junit_failure_report(self, xml_res: ET.Element) -> None:
|
||||
|
||||
@@ -4926,6 +4926,7 @@ SEASTAR_TEST_CASE(test_large_partition_splitting_on_compaction) {
|
||||
position_in_partition::tri_compare pos_tri_cmp(*s);
|
||||
|
||||
for (auto& sst : ret.new_sstables) {
|
||||
sst = env.reusable_sst(s, tmp.path().string(), sst->generation().value()).get0();
|
||||
BOOST_REQUIRE(sst->may_have_partition_tombstones());
|
||||
|
||||
auto reader = sstable_reader(sst, s, env.make_reader_permit());
|
||||
|
||||
@@ -205,6 +205,7 @@ def run_scylla_cmd(pid, dir):
|
||||
'--max-networking-io-control-blocks', '100',
|
||||
'--unsafe-bypass-fsync', '1',
|
||||
'--kernel-page-cache', '1',
|
||||
'--commitlog-use-o-dsync', '0',
|
||||
'--flush-schema-tables-after-modification', 'false',
|
||||
'--api-address', ip,
|
||||
'--rpc-address', ip,
|
||||
|
||||
@@ -106,6 +106,7 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
|
||||
db_config->add_per_partition_rate_limit_extension();
|
||||
|
||||
db_config->flush_schema_tables_after_modification.set(false);
|
||||
db_config->commitlog_use_o_dsync(false);
|
||||
}
|
||||
|
||||
cql_test_config::cql_test_config(const cql_test_config&) = default;
|
||||
|
||||
@@ -85,7 +85,8 @@ public:
|
||||
future<shared_sstable> reusable_sst(schema_ptr schema, sstring dir, unsigned long generation,
|
||||
sstable::version_types version, sstable::format_types f = sstable::format_types::big) {
|
||||
auto sst = make_sstable(std::move(schema), dir, generation, version, f);
|
||||
return sst->load().then([sst = std::move(sst)] {
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
return sst->load(default_priority_class(), cfg).then([sst = std::move(sst)] {
|
||||
return make_ready_future<shared_sstable>(std::move(sst));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -43,7 +43,8 @@ sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_
|
||||
}
|
||||
}
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst->open_data().get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
for (auto&& m : muts) {
|
||||
|
||||
@@ -72,7 +72,11 @@ class HostRegistry:
|
||||
self.next_host_id += 1
|
||||
return Host(self.subnet.format(self.next_host_id))
|
||||
|
||||
self.pool = Pool[Host](254, create_host)
|
||||
async def destroy_host(h: Host) -> None:
|
||||
# Doesn't matter, we never return hosts to the pool as 'dirty'.
|
||||
pass
|
||||
|
||||
self.pool = Pool[Host](254, create_host, destroy_host)
|
||||
|
||||
async def cleanup() -> None:
|
||||
if self.lock_filename:
|
||||
@@ -85,5 +89,5 @@ class HostRegistry:
|
||||
return await self.pool.get()
|
||||
|
||||
async def release_host(self, host: Host) -> None:
|
||||
return await self.pool.put(host)
|
||||
return await self.pool.put(host, is_dirty=False)
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final, Optional
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
@@ -10,12 +10,15 @@ class Pool(Generic[T]):
|
||||
on demand, so that if you use less, you don't create anything upfront.
|
||||
If there is no object in the pool and all N objects are in use, you want
|
||||
to wait until one of the object is returned to the pool. Expects a
|
||||
builder async function to build a new object.
|
||||
builder async function to build a new object and a destruction async
|
||||
function to clean up after a 'dirty' object (see below).
|
||||
|
||||
Usage example:
|
||||
async def start_server():
|
||||
return Server()
|
||||
pool = Pool(4, start_server)
|
||||
async def destroy_server(server):
|
||||
await server.free_resources()
|
||||
pool = Pool(4, start_server, destroy_server)
|
||||
|
||||
server = await pool.get()
|
||||
try:
|
||||
@@ -24,25 +27,51 @@ class Pool(Generic[T]):
|
||||
await pool.put(server)
|
||||
|
||||
Alternatively:
|
||||
async with pool.instance() as server:
|
||||
async with pool.instance(dirty_on_exception=False) as server:
|
||||
await run_test(test, server)
|
||||
|
||||
|
||||
If the object is considered no longer usable by other users of the pool
|
||||
you can 'steal' it, which frees up space in the pool.
|
||||
you can pass `is_dirty=True` flag to `put`, which will cause the object
|
||||
to be 'destroyed' (by calling the provided `destroy` function on it) and
|
||||
will free up space in the pool.
|
||||
server = await.pool.get()
|
||||
dirty = True
|
||||
try:
|
||||
dirty = await run_test(test, server)
|
||||
finally:
|
||||
if dirty:
|
||||
await pool.steal()
|
||||
else:
|
||||
await pool.put(server)
|
||||
await pool.put(server, is_dirty=dirty)
|
||||
|
||||
Alternatively:
|
||||
async with (cm := pool.instance(dirty_on_exception=True)) as server:
|
||||
cm.dirty = await run_test(test, server)
|
||||
# It will also be considered dirty if run_test throws an exception
|
||||
|
||||
|
||||
To atomically return a dirty object and use the freed space to obtain
|
||||
another object, you can use `replace_dirty`. This is different from a
|
||||
`put(is_dirty=True)` call followed by a `get` call, where a concurrent
|
||||
waiter might take the space freed up by `put`.
|
||||
server = await.pool.get()
|
||||
dirty = False
|
||||
try:
|
||||
for _ in range(num_runs):
|
||||
if dirty:
|
||||
srv = server
|
||||
server = None
|
||||
server = await pool.replace_dirty(srv)
|
||||
dirty = await run_test(test, server)
|
||||
finally:
|
||||
if server:
|
||||
await pool.put(is_dirty=dirty)
|
||||
"""
|
||||
def __init__(self, max_size: int, build: Callable[..., Awaitable[T]]):
|
||||
def __init__(self, max_size: int,
|
||||
build: Callable[..., Awaitable[T]],
|
||||
destroy: Callable[[T], Awaitable[None]]):
|
||||
assert(max_size >= 0)
|
||||
self.max_size: Final[int] = max_size
|
||||
self.build: Final[Callable[..., Awaitable[T]]] = build
|
||||
self.destroy: Final[Callable[[T], Awaitable]] = destroy
|
||||
self.cond: Final[asyncio.Condition] = asyncio.Condition()
|
||||
self.pool: list[T] = []
|
||||
self.total: int = 0 # len(self.pool) + leased objects
|
||||
@@ -64,6 +93,68 @@ class Pool(Generic[T]):
|
||||
# No object in pool, but total < max_size so we can construct one
|
||||
self.total += 1
|
||||
|
||||
return await self._build_and_get(*args, **kwargs)
|
||||
|
||||
async def put(self, obj: T, is_dirty: bool):
|
||||
"""Return a previously borrowed object to the pool
|
||||
if it's not dirty, otherwise destroy the object
|
||||
and free up space in the pool.
|
||||
"""
|
||||
if is_dirty:
|
||||
await self.destroy(obj)
|
||||
|
||||
async with self.cond:
|
||||
if is_dirty:
|
||||
self.total -= 1
|
||||
else:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
async def replace_dirty(self, obj: T, *args, **kwargs) -> T:
|
||||
"""Atomically `put` a previously borrowed dirty object and `get` another one.
|
||||
The 'atomicity' guarantees that the space freed up by the returned object
|
||||
is used to return another object to the caller. The caller doesn't need
|
||||
to wait for space to be freed by another user of the pool.
|
||||
|
||||
Note: the returned object might have been constructed earlier or it might
|
||||
be built right now, as in `get`.
|
||||
*args and **kwargs are used as in `get`.
|
||||
"""
|
||||
await self.destroy(obj)
|
||||
|
||||
async with self.cond:
|
||||
if self.pool:
|
||||
self.total -= 1
|
||||
return self.pool.pop()
|
||||
|
||||
# Need to construct a new object.
|
||||
# The space for this object is already accounted for in self.total.
|
||||
|
||||
return await self._build_and_get(*args, **kwargs)
|
||||
|
||||
def instance(self, dirty_on_exception: bool, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
class Instance:
|
||||
def __init__(self, pool: Pool[T], dirty_on_exception: bool):
|
||||
self.pool = pool
|
||||
self.dirty = False
|
||||
self.dirty_on_exception = dirty_on_exception
|
||||
|
||||
async def __aenter__(self):
|
||||
self.obj = await self.pool.get(*args, **kwargs)
|
||||
return self.obj
|
||||
|
||||
async def __aexit__(self, exc_type, exc, obj):
|
||||
if self.obj:
|
||||
self.dirty |= self.dirty_on_exception and exc is not None
|
||||
await self.pool.put(self.obj, is_dirty=self.dirty)
|
||||
self.obj = None
|
||||
|
||||
return Instance(self, dirty_on_exception)
|
||||
|
||||
async def _build_and_get(self, *args, **kwargs) -> T:
|
||||
"""Precondition: we allocated space for this object
|
||||
(it's included in self.total).
|
||||
"""
|
||||
try:
|
||||
obj = await self.build(*args, **kwargs)
|
||||
except:
|
||||
@@ -72,33 +163,3 @@ class Pool(Generic[T]):
|
||||
self.cond.notify()
|
||||
raise
|
||||
return obj
|
||||
|
||||
async def steal(self) -> None:
|
||||
"""Take ownership of a previously borrowed object.
|
||||
Frees up space in the pool.
|
||||
"""
|
||||
async with self.cond:
|
||||
self.total -= 1
|
||||
self.cond.notify()
|
||||
|
||||
async def put(self, obj: T):
|
||||
"""Return a previously borrowed object to the pool."""
|
||||
async with self.cond:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
def instance(self, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
class Instance:
|
||||
def __init__(self, pool):
|
||||
self.pool = pool
|
||||
|
||||
async def __aenter__(self):
|
||||
self.obj = await self.pool.get(*args, **kwargs)
|
||||
return self.obj
|
||||
|
||||
async def __aexit__(self, exc_type, exc, obj):
|
||||
if self.obj:
|
||||
await self.pool.put(self.obj)
|
||||
self.obj = None
|
||||
|
||||
return Instance(self)
|
||||
|
||||
@@ -21,14 +21,17 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTTPError(Exception):
|
||||
def __init__(self, uri, code, message):
|
||||
def __init__(self, uri, code, params, json, message):
|
||||
super().__init__(message)
|
||||
self.uri = uri
|
||||
self.code = code
|
||||
self.params = params
|
||||
self.json = json
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return f"HTTP error {self.code}: {self.message}, uri {self.uri}"
|
||||
return f"HTTP error {self.code}, uri: {self.uri}, " \
|
||||
f"params: {self.params}, json: {self.json}, body:\n{self.message}"
|
||||
|
||||
|
||||
# TODO: support ssl and verify_ssl
|
||||
@@ -63,7 +66,7 @@ class RESTClient(metaclass=ABCMeta):
|
||||
params = params, json = json, timeout = client_timeout) as resp:
|
||||
if resp.status != 200:
|
||||
text = await resp.text()
|
||||
raise HTTPError(uri, resp.status, f"{text}, params {params}, json {json}")
|
||||
raise HTTPError(uri, resp.status, params, json, text)
|
||||
if response_type is not None:
|
||||
# Return response.text() or response.json()
|
||||
return await getattr(resp, response_type)()
|
||||
|
||||
@@ -17,8 +17,10 @@ import pathlib
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional, Dict, List, Set, Tuple, Callable, AsyncIterator, NamedTuple, Union
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from io import BufferedWriter
|
||||
from test.pylib.host_registry import Host, HostRegistry
|
||||
from test.pylib.pool import Pool
|
||||
@@ -111,6 +113,7 @@ SCYLLA_CMDLINE_OPTIONS = [
|
||||
'--max-networking-io-control-blocks', '100',
|
||||
'--unsafe-bypass-fsync', '1',
|
||||
'--kernel-page-cache', '1',
|
||||
'--commitlog-use-o-dsync', '0',
|
||||
'--abort-on-lsa-bad-alloc', '1',
|
||||
'--abort-on-seastar-bad-alloc',
|
||||
'--abort-on-internal-error', '1',
|
||||
@@ -173,6 +176,11 @@ def merge_cmdline_options(base: List[str], override: List[str]) -> List[str]:
|
||||
|
||||
return run()
|
||||
|
||||
class CqlUpState(Enum):
|
||||
NOT_CONNECTED = 1,
|
||||
CONNECTED = 2,
|
||||
QUERIED = 3
|
||||
|
||||
class ScyllaServer:
|
||||
"""Starts and handles a single Scylla server, managing logs, checking if responsive,
|
||||
and cleanup when finished."""
|
||||
@@ -295,7 +303,7 @@ class ScyllaServer:
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
return f"Exception when reading server log {self.log_filename}: {exc}"
|
||||
|
||||
async def cql_is_up(self) -> bool:
|
||||
async def cql_is_up(self) -> CqlUpState:
|
||||
"""Test that CQL is serving (a check we use at start up)."""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -310,6 +318,7 @@ class ScyllaServer:
|
||||
# work, so rely on this "side effect".
|
||||
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.ip_addr]),
|
||||
request_timeout=self.START_TIMEOUT)
|
||||
connected = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -321,16 +330,19 @@ class ScyllaServer:
|
||||
protocol_version=4,
|
||||
auth_provider=auth) as cluster:
|
||||
with cluster.connect() as session:
|
||||
session.execute("SELECT * FROM system.local")
|
||||
connected = True
|
||||
# See the comment above about `auth::standard_role_manager`. We execute
|
||||
# a 'real' query to ensure that the auth service has finished initializing.
|
||||
session.execute("SELECT key FROM system.local where key = 'local'")
|
||||
self.control_cluster = Cluster(execution_profiles=
|
||||
{EXEC_PROFILE_DEFAULT: profile},
|
||||
contact_points=[self.ip_addr],
|
||||
auth_provider=auth)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
return True
|
||||
return CqlUpState.QUERIED
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return False
|
||||
return CqlUpState.CONNECTED if connected else CqlUpState.NOT_CONNECTED
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
@@ -363,6 +375,7 @@ class ScyllaServer:
|
||||
|
||||
self.start_time = time.time()
|
||||
sleep_interval = 0.1
|
||||
cql_up_state = CqlUpState.NOT_CONNECTED
|
||||
|
||||
while time.time() < self.start_time + self.START_TIMEOUT:
|
||||
if self.cmd.returncode:
|
||||
@@ -377,20 +390,30 @@ class ScyllaServer:
|
||||
logpath = log_handler.baseFilename # type: ignore
|
||||
else:
|
||||
logpath = "?"
|
||||
raise RuntimeError(f"Failed to start server at host {self.ip_addr}.\n"
|
||||
raise RuntimeError(f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}.\n"
|
||||
"Check the log files:\n"
|
||||
f"{logpath}\n"
|
||||
f"{self.log_filename}")
|
||||
|
||||
if hasattr(self, "host_id") or await self.get_host_id(api):
|
||||
if await self.cql_is_up():
|
||||
cql_up_state = await self.cql_is_up()
|
||||
if cql_up_state == CqlUpState.QUERIED:
|
||||
return
|
||||
|
||||
# Sleep and retry
|
||||
await asyncio.sleep(sleep_interval)
|
||||
|
||||
raise RuntimeError(f"failed to start server {self.ip_addr}, "
|
||||
f"check server log at {self.log_filename}")
|
||||
err = f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}."
|
||||
if hasattr(self, "host_id"):
|
||||
err += f" Managed to obtain the server's Host ID ({self.host_id})"
|
||||
if cql_up_state == CqlUpState.CONNECTED:
|
||||
err += " and to connect the CQL driver, but failed to execute a query."
|
||||
else:
|
||||
err += " but failed to connect the CQL driver."
|
||||
else:
|
||||
err += " Failed to obtain the server's Host ID."
|
||||
err += f"\nCheck server log at {self.log_filename}."
|
||||
raise RuntimeError(err)
|
||||
|
||||
async def force_schema_migration(self) -> None:
|
||||
"""This is a hack to change schema hash on an existing cluster node
|
||||
@@ -705,6 +728,8 @@ class ScyllaCluster:
|
||||
to any specific test, throwing it here would stop a specific
|
||||
test."""
|
||||
if self.start_exception:
|
||||
# Mark as dirty so further test cases don't try to reuse this cluster.
|
||||
self.is_dirty = True
|
||||
raise self.start_exception
|
||||
|
||||
for server in self.running.values():
|
||||
@@ -729,11 +754,14 @@ class ScyllaCluster:
|
||||
if server_id not in self.running:
|
||||
return ScyllaCluster.ActionReturn(success=False, msg=f"Server {server_id} unknown")
|
||||
self.is_dirty = True
|
||||
server = self.running.pop(server_id)
|
||||
server = self.running[server_id]
|
||||
# Remove the server from `running` only after we successfully stop it.
|
||||
# Stopping may fail and if we removed it from `running` now it might leak.
|
||||
if gracefully:
|
||||
await server.stop_gracefully()
|
||||
else:
|
||||
await server.stop()
|
||||
self.running.pop(server_id)
|
||||
self.stopped[server_id] = server
|
||||
return ScyllaCluster.ActionReturn(success=True, msg=f"{server} stopped")
|
||||
|
||||
@@ -753,8 +781,10 @@ class ScyllaCluster:
|
||||
self.is_dirty = True
|
||||
server = self.stopped.pop(server_id)
|
||||
server.seeds = self._seeds()
|
||||
await server.start(self.api)
|
||||
# Put the server in `running` before starting it.
|
||||
# Starting may fail and if we didn't add it now it might leak.
|
||||
self.running[server_id] = server
|
||||
await server.start(self.api)
|
||||
return ScyllaCluster.ActionReturn(success=True, msg=f"{server} started")
|
||||
|
||||
async def server_restart(self, server_id: ServerNum) -> ActionReturn:
|
||||
@@ -817,7 +847,9 @@ class ScyllaClusterManager:
|
||||
self.is_after_test_ok: bool = False
|
||||
# API
|
||||
# NOTE: need to make a safe temp dir as tempfile can't make a safe temp sock name
|
||||
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir=base_dir)
|
||||
# Put the socket in /tmp, not base_dir, to avoid going over the length
|
||||
# limit of UNIX-domain socket addresses (issue #12622).
|
||||
self.manager_dir: str = tempfile.mkdtemp(prefix="manager-", dir="/tmp")
|
||||
self.sock_path: str = f"{self.manager_dir}/api"
|
||||
app = aiohttp.web.Application()
|
||||
self._setup_routes(app)
|
||||
@@ -828,7 +860,8 @@ class ScyllaClusterManager:
|
||||
if self.is_running:
|
||||
self.logger.warning("ScyllaClusterManager already running")
|
||||
return
|
||||
await self._get_cluster()
|
||||
self.cluster = await self.clusters.get(self.logger)
|
||||
self.logger.info("First Scylla cluster: %s", self.cluster)
|
||||
self.cluster.setLogger(self.logger)
|
||||
await self.runner.setup()
|
||||
self.site = aiohttp.web.UnixSite(self.runner, path=self.sock_path)
|
||||
@@ -839,12 +872,10 @@ class ScyllaClusterManager:
|
||||
self.current_test_case_full_name = f'{self.test_uname}::{test_case_name}'
|
||||
self.logger.info("Setting up %s", self.current_test_case_full_name)
|
||||
if self.cluster.is_dirty:
|
||||
self.logger.info(f"Current cluster %s is dirty after last test, stopping...", self.cluster.name)
|
||||
await self.clusters.steal()
|
||||
await self.cluster.stop()
|
||||
await self.cluster.release_ips()
|
||||
self.logger.info(f"Waiting for new cluster for test %s...", self.current_test_case_full_name)
|
||||
await self._get_cluster()
|
||||
self.logger.info(f"Current cluster %s is dirty after test %s, replacing with a new one...",
|
||||
self.cluster.name, self.current_test_case_full_name)
|
||||
self.cluster = await self.clusters.replace_dirty(self.cluster, self.logger)
|
||||
self.logger.info("Got new Scylla cluster: %s", self.cluster.name)
|
||||
self.cluster.setLogger(self.logger)
|
||||
self.logger.info("Leasing Scylla cluster %s for test %s", self.cluster, self.current_test_case_full_name)
|
||||
self.cluster.before_test(self.current_test_case_full_name)
|
||||
@@ -860,44 +891,56 @@ class ScyllaClusterManager:
|
||||
del self.site
|
||||
if not self.cluster.is_dirty:
|
||||
self.logger.info("Returning Scylla cluster %s for test %s", self.cluster, self.test_uname)
|
||||
await self.clusters.put(self.cluster)
|
||||
await self.clusters.put(self.cluster, is_dirty=False)
|
||||
else:
|
||||
self.logger.info("ScyllaManager: Scylla cluster %s is dirty after %s, stopping it",
|
||||
self.cluster, self.test_uname)
|
||||
await self.clusters.steal()
|
||||
await self.cluster.stop()
|
||||
await self.clusters.put(self.cluster, is_dirty=True)
|
||||
del self.cluster
|
||||
if os.path.exists(self.manager_dir):
|
||||
shutil.rmtree(self.manager_dir)
|
||||
self.is_running = False
|
||||
|
||||
async def _get_cluster(self) -> None:
|
||||
self.cluster = await self.clusters.get(self.logger)
|
||||
self.logger.info("Got new Scylla cluster %s", self.cluster)
|
||||
|
||||
|
||||
def _setup_routes(self, app: aiohttp.web.Application) -> None:
|
||||
app.router.add_get('/up', self._manager_up)
|
||||
app.router.add_get('/cluster/up', self._cluster_up)
|
||||
app.router.add_get('/cluster/is-dirty', self._is_dirty)
|
||||
app.router.add_get('/cluster/replicas', self._cluster_replicas)
|
||||
app.router.add_get('/cluster/running-servers', self._cluster_running_servers)
|
||||
app.router.add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr)
|
||||
app.router.add_get('/cluster/host-id/{server_id}', self._cluster_host_id)
|
||||
app.router.add_get('/cluster/before-test/{test_case_name}', self._before_test_req)
|
||||
app.router.add_get('/cluster/after-test', self._after_test)
|
||||
app.router.add_get('/cluster/mark-dirty', self._mark_dirty)
|
||||
app.router.add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop)
|
||||
app.router.add_get('/cluster/server/{server_id}/stop_gracefully',
|
||||
self._cluster_server_stop_gracefully)
|
||||
app.router.add_get('/cluster/server/{server_id}/start', self._cluster_server_start)
|
||||
app.router.add_get('/cluster/server/{server_id}/restart', self._cluster_server_restart)
|
||||
app.router.add_put('/cluster/addserver', self._cluster_server_add)
|
||||
app.router.add_put('/cluster/remove-node/{initiator}', self._cluster_remove_node)
|
||||
app.router.add_get('/cluster/decommission-node/{server_id}',
|
||||
self._cluster_decommission_node)
|
||||
app.router.add_get('/cluster/server/{server_id}/get_config', self._server_get_config)
|
||||
app.router.add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
|
||||
def make_catching_handler(handler: Callable) -> Callable:
|
||||
async def catching_handler(request) -> aiohttp.web.Response:
|
||||
"""Catch all exceptions and return them to the client.
|
||||
Without this, the client would get an 'Internal server error' message
|
||||
without any details. Thanks to this the test log shows the actual error.
|
||||
"""
|
||||
try:
|
||||
return await handler(request)
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
self.logger.error(f'Exception when executing {handler.__name__}: {e}\n{tb}')
|
||||
return aiohttp.web.Response(status=500, text=str(e))
|
||||
return catching_handler
|
||||
|
||||
def add_get(route: str, handler: Callable):
|
||||
app.router.add_get(route, make_catching_handler(handler))
|
||||
|
||||
def add_put(route: str, handler: Callable):
|
||||
app.router.add_put(route, make_catching_handler(handler))
|
||||
|
||||
add_get('/up', self._manager_up)
|
||||
add_get('/cluster/up', self._cluster_up)
|
||||
add_get('/cluster/is-dirty', self._is_dirty)
|
||||
add_get('/cluster/replicas', self._cluster_replicas)
|
||||
add_get('/cluster/running-servers', self._cluster_running_servers)
|
||||
add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr)
|
||||
add_get('/cluster/host-id/{server_id}', self._cluster_host_id)
|
||||
add_get('/cluster/before-test/{test_case_name}', self._before_test_req)
|
||||
add_get('/cluster/after-test', self._after_test)
|
||||
add_get('/cluster/mark-dirty', self._mark_dirty)
|
||||
add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop)
|
||||
add_get('/cluster/server/{server_id}/stop_gracefully', self._cluster_server_stop_gracefully)
|
||||
add_get('/cluster/server/{server_id}/start', self._cluster_server_start)
|
||||
add_get('/cluster/server/{server_id}/restart', self._cluster_server_restart)
|
||||
add_put('/cluster/addserver', self._cluster_server_add)
|
||||
add_put('/cluster/remove-node/{initiator}', self._cluster_remove_node)
|
||||
add_get('/cluster/decommission-node/{server_id}', self._cluster_decommission_node)
|
||||
add_get('/cluster/server/{server_id}/get_config', self._server_get_config)
|
||||
add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
|
||||
|
||||
async def _manager_up(self, _request) -> aiohttp.web.Response:
|
||||
return aiohttp.web.Response(text=f"{self.is_running}")
|
||||
|
||||
@@ -143,7 +143,8 @@ private:
|
||||
throw std::bad_function_call();
|
||||
}
|
||||
virtual const std::vector<view_ptr>& get_table_views(data_dictionary::table t) const override {
|
||||
return {};
|
||||
static const std::vector<view_ptr> empty;
|
||||
return empty;
|
||||
}
|
||||
virtual sstring get_available_index_name(data_dictionary::database db, std::string_view ks_name, std::string_view table_name,
|
||||
std::optional<sstring> index_name_root) const override {
|
||||
|
||||
6
types.cc
6
types.cc
@@ -735,6 +735,7 @@ bool abstract_type::is_collection() const {
|
||||
bool abstract_type::is_tuple() const {
|
||||
struct visitor {
|
||||
bool operator()(const abstract_type&) { return false; }
|
||||
bool operator()(const reversed_type_impl& t) { return t.underlying_type()->is_tuple(); }
|
||||
bool operator()(const tuple_type_impl&) { return true; }
|
||||
};
|
||||
return visit(*this, visitor{});
|
||||
@@ -1956,6 +1957,10 @@ data_value deserialize_aux(const tuple_type_impl& t, View v) {
|
||||
|
||||
template<FragmentedView View>
|
||||
utils::multiprecision_int deserialize_value(const varint_type_impl&, View v) {
|
||||
if (v.empty()) {
|
||||
throw marshal_exception("cannot deserialize multiprecision int - empty buffer");
|
||||
}
|
||||
skip_empty_fragments(v);
|
||||
bool negative = v.current_fragment().front() < 0;
|
||||
utils::multiprecision_int num;
|
||||
while (v.size_bytes()) {
|
||||
@@ -2052,6 +2057,7 @@ bool deserialize_value(const boolean_type_impl&, View v) {
|
||||
if (v.size_bytes() != 1) {
|
||||
throw marshal_exception(format("cannot deserialize boolean, size mismatch ({:d})", v.size_bytes()));
|
||||
}
|
||||
skip_empty_fragments(v);
|
||||
return v.current_fragment().front() != 0;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user