mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 03:12:13 +00:00
Compare commits
36 Commits
fast-serve
...
next-2026.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7acb040470 | ||
|
|
9563994298 | ||
|
|
714003ef2e | ||
|
|
be2f0a8601 | ||
|
|
0ac15b7030 | ||
|
|
aff9aa156b | ||
|
|
815260866c | ||
|
|
119df703b0 | ||
|
|
4f87c9c510 | ||
|
|
f9aae8c2f1 | ||
|
|
104e9b3c32 | ||
|
|
4fc4f4e9f9 | ||
|
|
ee34573bd1 | ||
|
|
851c605b1d | ||
|
|
57f9d9d581 | ||
|
|
15b2ed99f0 | ||
|
|
3aac93f49e | ||
|
|
44249c0a75 | ||
|
|
e9240587f4 | ||
|
|
b39c7fa034 | ||
|
|
3e3096d6df | ||
|
|
6195e08408 | ||
|
|
53caa6eca4 | ||
|
|
fb6d5368bb | ||
|
|
9e0c86b7fd | ||
|
|
6d09897339 | ||
|
|
5c8662d606 | ||
|
|
74a58a6757 | ||
|
|
148e05820b | ||
|
|
1438830348 | ||
|
|
c25f3eced8 | ||
|
|
d264fea176 | ||
|
|
9d942a5408 | ||
|
|
9622291e07 | ||
|
|
b98470a860 | ||
|
|
5231c77e8e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.2.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -185,24 +185,14 @@ future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& r
|
||||
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
if (!r.has("value")) {
|
||||
continue;
|
||||
}
|
||||
rec->attributes[r.get_as<sstring>("name")] =
|
||||
r.get_as<sstring>("value");
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
// permissions
|
||||
{
|
||||
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
auto resource = r.get_as<sstring>("resource");
|
||||
auto perms_strings = r.get_set<sstring>("permissions");
|
||||
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
|
||||
auto pset = permissions::from_strings(perms_set);
|
||||
rec->permissions[std::move(resource)] = std::move(pset);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return rec;
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ public:
|
||||
std::unordered_set<role_name_t> members;
|
||||
sstring salted_hash;
|
||||
std::unordered_map<sstring, sstring, sstring_hash, sstring_eq> attributes;
|
||||
std::unordered_map<sstring, permission_set, sstring_hash, sstring_eq> permissions;
|
||||
private:
|
||||
friend cache;
|
||||
// cached permissions include effects of role's inheritance
|
||||
|
||||
@@ -76,7 +76,11 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
|
||||
if (results->empty()) {
|
||||
co_return permissions::NONE;
|
||||
}
|
||||
co_return permissions::from_strings(results->one().get_set<sstring>(PERMISSIONS_NAME));
|
||||
const auto& row = results->one();
|
||||
if (!row.has(PERMISSIONS_NAME)) {
|
||||
co_return permissions::NONE;
|
||||
}
|
||||
co_return permissions::from_strings(row.get_set<sstring>(PERMISSIONS_NAME));
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
|
||||
} catch (const seastar::sleep_aborted&) {
|
||||
co_return; // ignore
|
||||
}
|
||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||
try {
|
||||
co_await c.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
try {
|
||||
co_await _cache.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
return _std_mgr.start();
|
||||
|
||||
@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
return create_legacy_keyspace_if_missing(mm);
|
||||
});
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
// Authorizer must be started before the permission loader is set,
|
||||
// because the loader calls _authorizer->authorize().
|
||||
// The loader must be set before starting the role manager, because
|
||||
// LDAP role manager starts a pruner fiber that calls
|
||||
// reload_all_permissions() which asserts _permission_loader is set.
|
||||
co_await _authorizer->start();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
// Authenticator must be started after ensure_superuser_is_created()
|
||||
// because password_authenticator queries system.roles for the
|
||||
// superuser entry created by the role manager.
|
||||
co_await _authenticator->start();
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
_as.request_abort();
|
||||
// Reverse of start() order.
|
||||
co_await _authenticator->stop();
|
||||
co_await _role_manager->stop();
|
||||
_cache.set_permission_loader(nullptr);
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
co_await _authorizer->stop();
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
|
||||
@@ -593,6 +593,7 @@ scylla_tests = set([
|
||||
'test/boost/linearizing_input_stream_test',
|
||||
'test/boost/lister_test',
|
||||
'test/boost/locator_topology_test',
|
||||
'test/boost/lock_tables_metadata_test',
|
||||
'test/boost/log_heap_test',
|
||||
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
|
||||
'test/boost/logalloc_test',
|
||||
@@ -1659,6 +1660,7 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/auth_cache_test.cc',
|
||||
'test/boost/auth_test.cc',
|
||||
'test/boost/batchlog_manager_test.cc',
|
||||
'test/boost/table_helper_test.cc',
|
||||
'test/boost/cache_algorithm_test.cc',
|
||||
'test/boost/castas_fcts_test.cc',
|
||||
'test/boost/cdc_test.cc',
|
||||
|
||||
@@ -136,9 +136,9 @@ public:
|
||||
{}
|
||||
|
||||
future<> insert(auth::authenticated_user user, cql3::prepared_cache_key_type prep_cache_key, value_type v) noexcept {
|
||||
return _cache.get_ptr(key_type(std::move(user), std::move(prep_cache_key)), [v = std::move(v)] (const cache_key_type&) mutable {
|
||||
return _cache.insert(key_type(std::move(user), std::move(prep_cache_key)), [v = std::move(v)] (const cache_key_type&) mutable {
|
||||
return make_ready_future<value_type>(std::move(v));
|
||||
}).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
value_ptr find(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
|
||||
|
||||
@@ -71,7 +71,7 @@ used. If it is used, the statement will be a no-op if the materialized view alre
|
||||
MV Select Statement
|
||||
...................
|
||||
|
||||
The select statement of a materialized view creation defines which of the base table is included in the view. That
|
||||
The select statement of a materialized view creation defines which of the base table columns are included in the view. That
|
||||
statement is limited in a number of ways:
|
||||
|
||||
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
||||
|
||||
@@ -16,7 +16,7 @@ Cluster and Node Limits
|
||||
* - Nodes per cluster
|
||||
- Low hundreds
|
||||
* - Node size
|
||||
- 256 vcpu
|
||||
- 4096 CPUs
|
||||
|
||||
See :ref:`Hardware Requirements <system-requirements-hardware>` for storage
|
||||
and memory requirements and limits.
|
||||
|
||||
@@ -4,7 +4,7 @@ Upgrade ScyllaDB
|
||||
|
||||
.. toctree::
|
||||
|
||||
ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1/index>
|
||||
ScyllaDB 2026.1 to ScyllaDB 2026.2 <upgrade-guide-from-2026.1-to-2026.2/index>
|
||||
ScyllaDB 2026.x Patch Upgrades <upgrade-guide-from-2026.x.y-to-2026.x.z>
|
||||
ScyllaDB Image <ami-upgrade>
|
||||
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB 2025.x to ScyllaDB 2026.1
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-2025.x-to-2026.1>
|
||||
Metrics Update <metric-update-2025.x-to-2026.1>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1>`
|
||||
* :doc:`Metrics Update Between 2025.x and 2026.1 <metric-update-2025.x-to-2026.1>`
|
||||
@@ -1,82 +0,0 @@
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2026.1
|
||||
.. |PRECEDING_VERSION| replace:: 2025.4
|
||||
|
||||
================================================================
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
|
||||
New Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_alternator_operation_size_kb
|
||||
- Histogram of item sizes involved in a request.
|
||||
* - scylla_column_family_total_disk_space_before_compression
|
||||
- Hypothetical total disk space used if data files weren't compressed
|
||||
* - scylla_group_name_auto_repair_enabled_nr
|
||||
- Number of tablets with auto repair enabled.
|
||||
* - scylla_group_name_auto_repair_needs_repair_nr
|
||||
- Number of tablets with auto repair enabled that currently need repair.
|
||||
* - scylla_lsa_compact_time_ms
|
||||
- Total time spent on segment compaction that was not accounted under ``reclaim_time_ms``.
|
||||
* - scylla_lsa_evict_time_ms
|
||||
- Total time spent on evicting objects that was not accounted under ``reclaim_time_ms``,
|
||||
* - scylla_lsa_reclaim_time_ms
|
||||
- Total time spent in reclaiming LSA memory back to std allocator.
|
||||
* - scylla_object_storage_memory_usage
|
||||
- Total number of bytes consumed by the object storage client.
|
||||
* - scylla_tablet_ops_failed
|
||||
- Number of failed tablet auto repair attempts.
|
||||
* - scylla_tablet_ops_succeeded
|
||||
- Number of successful tablet auto repair attempts.
|
||||
|
||||
Renamed Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are renamed in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric Name in |PRECEDING_VERSION|
|
||||
- Metric Name in |NEW_VERSION|
|
||||
* - scylla_s3_memory_usage
|
||||
- scylla_object_storage_memory_usage
|
||||
|
||||
Removed Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are removed in ScyllaDB |NEW_VERSION|.
|
||||
|
||||
* scylla_redis_current_connections
|
||||
* scylla_redis_op_latency
|
||||
* scylla_redis_operation
|
||||
* scylla_redis_operation
|
||||
* scylla_redis_requests_latency
|
||||
* scylla_redis_requests_served
|
||||
* scylla_redis_requests_serving
|
||||
|
||||
New and Updated Metrics in Previous Releases
|
||||
-------------------------------------------------------
|
||||
|
||||
* `Metrics Update Between 2025.3 and 2025.4 <https://docs.scylladb.com/manual/branch-2025.4/upgrade/upgrade-guides/upgrade-guide-from-2025.x-to-2025.4/metric-update-2025.x-to-2025.4.html>`_
|
||||
* `Metrics Update Between 2025.2 and 2025.3 <https://docs.scylladb.com/manual/branch-2025.3/upgrade/upgrade-guides/upgrade-guide-from-2025.2-to-2025.3/metric-update-2025.2-to-2025.3.html>`_
|
||||
* `Metrics Update Between 2025.1 and 2025.2 <https://docs.scylladb.com/manual/branch-2025.2/upgrade/upgrade-guides/upgrade-guide-from-2025.1-to-2025.2/metric-update-2025.1-to-2025.2.html>`_
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
==========================================================
|
||||
Upgrade - ScyllaDB 2026.1 to ScyllaDB 2026.2
|
||||
==========================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
Upgrade ScyllaDB <upgrade-guide-from-2026.1-to-2026.2>
|
||||
Metrics Update <metric-update-2026.1-to-2026.2>
|
||||
|
||||
* :doc:`Upgrade from ScyllaDB 2026.1 to ScyllaDB 2026.2 <upgrade-guide-from-2026.1-to-2026.2>`
|
||||
* :doc:`Metrics Update Between 2026.1 and 2026.2 <metric-update-2026.1-to-2026.2>`
|
||||
@@ -0,0 +1,126 @@
|
||||
.. |SRC_VERSION| replace:: 2026.1
|
||||
.. |NEW_VERSION| replace:: 2026.2
|
||||
.. |PRECEDING_VERSION| replace:: 2026.1
|
||||
|
||||
================================================================
|
||||
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
|
||||
================================================================
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
|
||||
|
||||
|
||||
New Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_VERSION|.
|
||||
|
||||
.. list-table::
|
||||
:widths: 25 150
|
||||
:header-rows: 1
|
||||
|
||||
* - Metric
|
||||
- Description
|
||||
* - scylla_auth_cache_permissions
|
||||
- Total number of permission sets currently cached across all roles.
|
||||
* - scylla_auth_cache_roles
|
||||
- Number of roles currently cached.
|
||||
* - scylla_cql_forwarded_requests
|
||||
- Counts the total number of attempts to forward CQL requests to other nodes.
|
||||
One request may be forwarded multiple times, particularly when a write is
|
||||
handled by a non-replica node.
|
||||
* - scylla_cql_write_consistency_levels_disallowed_violations
|
||||
- Counts the number of write_consistency_levels_disallowed guardrail violations,
|
||||
i.e. attempts to write with a forbidden consistency level.
|
||||
* - scylla_cql_write_consistency_levels_warned_violations
|
||||
- Counts the number of write_consistency_levels_warned guardrail violations,
|
||||
i.e. attempts to write with a discouraged consistency level.
|
||||
* - scylla_cql_writes_per_consistency_level
|
||||
- Counts the number of writes for each consistency level.
|
||||
* - scylla_io_queue_integrated_disk_queue_length
|
||||
- Length of the integrated disk queue.
|
||||
* - scylla_io_queue_integrated_queue_length
|
||||
- Length of the integrated queue.
|
||||
* - scylla_logstor_sm_bytes_freed
|
||||
- Counts the number of data bytes freed.
|
||||
* - scylla_logstor_sm_bytes_read
|
||||
- Counts the number of bytes read from the disk.
|
||||
* - scylla_logstor_sm_bytes_written
|
||||
- Counts the number of bytes written to the disk.
|
||||
* - scylla_logstor_sm_compaction_bytes_written
|
||||
- Counts the number of bytes written to the disk by compaction.
|
||||
* - scylla_logstor_sm_compaction_data_bytes_written
|
||||
- Counts the number of data bytes written to the disk by compaction.
|
||||
* - scylla_logstor_sm_compaction_records_rewritten
|
||||
- Counts the number of records rewritten during compaction.
|
||||
* - scylla_logstor_sm_compaction_records_skipped
|
||||
- Counts the number of records skipped during compaction.
|
||||
* - scylla_logstor_sm_compaction_segments_freed
|
||||
- Counts the number of data bytes written to the disk.
|
||||
* - scylla_logstor_sm_disk_usage
|
||||
- Total disk usage.
|
||||
* - scylla_logstor_sm_free_segments
|
||||
- Counts the number of free segments currently available.
|
||||
* - scylla_logstor_sm_segment_pool_compaction_segments_get
|
||||
- Counts the number of segments taken from the segment pool for compaction.
|
||||
* - scylla_logstor_sm_segment_pool_normal_segments_get
|
||||
- Counts the number of segments taken from the segment pool for normal writes.
|
||||
* - scylla_logstor_sm_segment_pool_normal_segments_wait
|
||||
- Counts the number of times normal writes had to wait for a segment to become
|
||||
available in the segment pool.
|
||||
* - scylla_logstor_sm_segment_pool_segments_put
|
||||
- Counts the number of segments returned to the segment pool.
|
||||
* - scylla_logstor_sm_segment_pool_separator_segments_get
|
||||
- Counts the number of segments taken from the segment pool for separator writes.
|
||||
* - scylla_logstor_sm_segment_pool_size
|
||||
- Counts the number of segments in the segment pool.
|
||||
* - scylla_logstor_sm_segments_allocated
|
||||
- Counts the number of segments allocated.
|
||||
* - scylla_logstor_sm_segments_compacted
|
||||
- Counts the number of segments compacted.
|
||||
* - scylla_logstor_sm_segments_freed
|
||||
- Counts the number of segments freed.
|
||||
* - scylla_logstor_sm_segments_in_use
|
||||
- Counts the number of segments currently in use.
|
||||
* - scylla_logstor_sm_separator_buffer_flushed
|
||||
- Counts the number of times the separator buffer has been flushed.
|
||||
* - scylla_logstor_sm_separator_bytes_written
|
||||
- Counts the number of bytes written to the separator.
|
||||
* - scylla_logstor_sm_separator_data_bytes_written
|
||||
- Counts the number of data bytes written to the separator.
|
||||
* - scylla_logstor_sm_separator_flow_control_delay
|
||||
- Current delay applied to writes to control separator debt in microseconds.
|
||||
* - scylla_logstor_sm_separator_segments_freed
|
||||
- Counts the number of segments freed by the separator.
|
||||
* - scylla_transport_cql_pending_response_memory
|
||||
- Holds the total memory in bytes consumed by responses waiting to be sent.
|
||||
* - scylla_transport_cql_request_histogram_bytes
|
||||
- A histogram of received bytes in CQL messages of a specific kind and
|
||||
specific scheduling group.
|
||||
* - scylla_transport_cql_requests_serving
|
||||
- Holds the number of requests that are being processed right now.
|
||||
* - scylla_transport_cql_response_histogram_bytes
|
||||
- A histogram of received bytes in CQL messages of a specific kind and
|
||||
specific scheduling group.
|
||||
* - scylla_transport_requests_forwarded_failed
|
||||
- Counts the number of requests that were forwarded to another replica
|
||||
but failed to execute there.
|
||||
* - scylla_transport_requests_forwarded_prepared_not_found
|
||||
- Counts the number of requests that were forwarded to another replica
|
||||
but failed there because the statement was not prepared on the target.
|
||||
When this happens, the coordinator performs an additional remote call
|
||||
to prepare the statement on the replica and retries the EXECUTE request
|
||||
afterwards.
|
||||
* - scylla_transport_requests_forwarded_redirected
|
||||
- Counts the number of requests that were forwarded to another replica
|
||||
but that replica responded with a redirect to another node. This can
|
||||
happen when replica has stale information about the cluster topology or
|
||||
when the request is handled by a node that is not a replica for the data
|
||||
being accessed by the request.
|
||||
* - scylla_transport_requests_forwarded_successfully
|
||||
- Counts the number of requests that were forwarded to another replica
|
||||
and executed successfully there.
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2025.x
|
||||
.. |NEW_VERSION| replace:: 2026.1
|
||||
.. |SRC_VERSION| replace:: 2026.1
|
||||
.. |NEW_VERSION| replace:: 2026.2
|
||||
|
||||
.. |ROLLBACK| replace:: rollback
|
||||
.. _ROLLBACK: ./#rollback-procedure
|
||||
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.x to 2026.1
|
||||
.. _SCYLLA_METRICS: ../metric-update-2025.x-to-2026.1
|
||||
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2026.1 to 2026.2
|
||||
.. _SCYLLA_METRICS: ../metric-update-2026.1-to-2026.2
|
||||
|
||||
=======================================================================================
|
||||
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
|
||||
@@ -399,9 +399,10 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
|
||||
}
|
||||
}
|
||||
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
auto ack2_msg_str = fmt::format("{}", ack2_msg);
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
}
|
||||
|
||||
// Depends on
|
||||
|
||||
@@ -16,7 +16,6 @@ Usage:
|
||||
import argparse, os, sys
|
||||
from typing import Sequence
|
||||
|
||||
from test.pylib.driver_utils import safe_driver_shutdown
|
||||
|
||||
def read_statements(path: str) -> list[tuple[int, str]]:
|
||||
stms: list[tuple[int, str]] = []
|
||||
@@ -58,7 +57,7 @@ def exec_statements(statements: list[tuple[int, str]], socket_path: str, timeout
|
||||
print(f"ERROR executing statement from file line {lineno}: {s}\n{e}", file=sys.stderr)
|
||||
return 1
|
||||
finally:
|
||||
safe_driver_shutdown(cluster)
|
||||
cluster.shutdown()
|
||||
return 0
|
||||
|
||||
def main(argv: Sequence[str]) -> int:
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:524c54493b72c5e1b783f14dfa49d733e21b24cc2ec776e9c6e578095073162d
|
||||
size 6646304
|
||||
oid sha256:0a39166e74aa95af9df0bfb9d521ae499cb71f0c31573fb73f396655797ea729
|
||||
size 6706020
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:fec2bb253d43139da954cee3441fc8bc74824246b080f23bf1f824714d0adc45
|
||||
size 6646576
|
||||
oid sha256:c18384f49f019a07445987f070fe8ffc100df38399650a52bc088df8f8de8efc
|
||||
size 6705336
|
||||
|
||||
@@ -1279,6 +1279,9 @@ future<int> repair_service::do_repair_start(gms::gossip_address_map& addr_map, s
|
||||
}
|
||||
|
||||
if (!options.start_token.empty() || !options.end_token.empty()) {
|
||||
if (!options.start_token.empty() && !options.end_token.empty() && options.start_token == options.end_token) {
|
||||
throw std::invalid_argument("Start and end tokens must be different.");
|
||||
}
|
||||
// Intersect the list of local ranges with the given token range,
|
||||
// dropping ranges with no intersection.
|
||||
std::optional<::wrapping_interval<dht::token>::bound> tok_start;
|
||||
|
||||
@@ -1142,7 +1142,7 @@ future<> database::create_local_system_table(
|
||||
cfg.memtable_scheduling_group = default_scheduling_group();
|
||||
cfg.memtable_to_cache_scheduling_group = default_scheduling_group();
|
||||
}
|
||||
auto lock = get_tables_metadata().hold_write_lock();
|
||||
auto lock = co_await get_tables_metadata().hold_write_lock();
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
add_column_family(ks, table, std::move(cfg), replica::database::is_new_cf::no);
|
||||
@@ -1328,9 +1328,27 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
|
||||
|
||||
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
|
||||
tables_metadata_lock_on_all_shards locks;
|
||||
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
|
||||
// Acquire write lock on shard 0 first, and then on the remaining shards.
|
||||
//
|
||||
// Parallel acquisition on all shards could deadlock when two
|
||||
// fibers call lock_tables_metadata() concurrently: parallel_for_each
|
||||
// sends SMP messages to all shards even when the local shard's lock
|
||||
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
|
||||
// debug/sanitize builds) causes fiber A to win on shard X while
|
||||
// fiber B wins on shard Y, neither can make progress — classic
|
||||
// cross-shard lock-ordering deadlock.
|
||||
//
|
||||
// Acquiring the write lock on shard 0 first, and then on the remaining
|
||||
// shards, eliminates this: whichever fiber acquires shard 0 first is
|
||||
// guaranteed to acquire locks on all other shards before the other fiber
|
||||
// can acquire the lock on shard 0.
|
||||
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
});
|
||||
});
|
||||
|
||||
co_return locks;
|
||||
}
|
||||
|
||||
|
||||
@@ -438,9 +438,10 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
|
||||
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
|
||||
auto ps_ptr = qp.get_prepared(cache_key);
|
||||
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
|
||||
if (!ps_ptr) {
|
||||
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = msg_ptr->get_prepared();
|
||||
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = prepared_msg->get_prepared();
|
||||
if (!ps_ptr) {
|
||||
on_internal_error(paxos_state::logger, "prepared statement is null");
|
||||
}
|
||||
@@ -449,8 +450,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
-1, service::node_local_only::yes);
|
||||
const auto st = ps_ptr->statement;
|
||||
|
||||
const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(msg_ptr);
|
||||
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(result_ptr);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
|
||||
@@ -434,6 +434,8 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
|
||||
}
|
||||
|
||||
future<> group0_state_machine::enable_in_memory_state_machine() {
|
||||
co_await utils::get_local_injector().inject("group0_state_machine_enable_in_memory_fail",
|
||||
[] { return std::make_exception_ptr(std::runtime_error("injected failure in enable_in_memory_state_machine")); });
|
||||
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(_abort_source);
|
||||
if (!_in_memory_state_machine_enabled) {
|
||||
_in_memory_state_machine_enabled = true;
|
||||
|
||||
@@ -452,14 +452,16 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
|
||||
auto srv_for_group0 = create_server_for_group0(group0_id, my_id, ss, qp, mm);
|
||||
auto& persistence = srv_for_group0.persistence;
|
||||
auto& server = *srv_for_group0.server;
|
||||
co_await with_scheduling_group(_sg, [this, &srv_for_group0] (this auto self) -> future<> {
|
||||
co_await with_scheduling_group(_sg, [this, &srv_for_group0, group0_id] (this auto self) -> future<> {
|
||||
auto& state_machine = dynamic_cast<group0_state_machine&>(srv_for_group0.state_machine);
|
||||
co_await _raft_gr.start_server_for_group(std::move(srv_for_group0));
|
||||
// Set _group0 immediately after the server is registered in _raft_gr._servers.
|
||||
// This ensures abort_and_drain()/destroy() can find and clean up the server
|
||||
// even if enable_in_memory_state_machine() or later steps throw.
|
||||
_group0.emplace<raft::group_id>(group0_id);
|
||||
co_await state_machine.enable_in_memory_state_machine();
|
||||
});
|
||||
|
||||
_group0.emplace<raft::group_id>(group0_id);
|
||||
|
||||
// Fix for scylladb/scylladb#16683:
|
||||
// If the snapshot index is 0, trigger creation of a new snapshot
|
||||
// so bootstrapping nodes will receive a snapshot transfer.
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "service/session.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -58,18 +59,35 @@ void session_manager::initiate_close_of_sessions_except(const std::unordered_set
|
||||
}
|
||||
|
||||
future<> session_manager::drain_closing_sessions() {
|
||||
slogger.info("drain_closing_sessions: waiting for lock");
|
||||
seastar::timer<lowres_clock> lock_timer([this] {
|
||||
slogger.warn("drain_closing_sessions: still waiting for lock, available units {}",
|
||||
_session_drain_sem.available_units());
|
||||
});
|
||||
lock_timer.arm_periodic(std::chrono::minutes(5));
|
||||
auto lock = co_await get_units(_session_drain_sem, 1);
|
||||
lock_timer.cancel();
|
||||
auto n = std::distance(_closing_sessions.begin(), _closing_sessions.end());
|
||||
slogger.info("drain_closing_sessions: acquired lock, {} sessions to drain", n);
|
||||
auto i = _closing_sessions.begin();
|
||||
while (i != _closing_sessions.end()) {
|
||||
session& s = *i;
|
||||
++i;
|
||||
auto id = s.id();
|
||||
slogger.debug("draining session {}", id);
|
||||
slogger.info("drain_closing_sessions: waiting for session {} to close, gate count {}", id, s.gate_count());
|
||||
std::optional<seastar::timer<lowres_clock>> warn_timer;
|
||||
warn_timer.emplace([&s, id] {
|
||||
slogger.warn("drain_closing_sessions: session {} still not closed, gate count {}",
|
||||
id, s.gate_count());
|
||||
});
|
||||
warn_timer->arm_periodic(std::chrono::minutes(5));
|
||||
co_await s.close();
|
||||
warn_timer.reset();
|
||||
if (_sessions.erase(id)) {
|
||||
slogger.debug("session {} closed", id);
|
||||
slogger.info("drain_closing_sessions: session {} closed", id);
|
||||
}
|
||||
}
|
||||
slogger.info("drain_closing_sessions: done");
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -95,6 +95,10 @@ public:
|
||||
return _id;
|
||||
}
|
||||
|
||||
size_t gate_count() const {
|
||||
return _gate.get_count();
|
||||
}
|
||||
|
||||
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
|
||||
/// and it's impossible to create more such guards later.
|
||||
/// Can be called concurrently.
|
||||
|
||||
@@ -2732,13 +2732,23 @@ future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl)
|
||||
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
ss.raft_decommission().get();
|
||||
|
||||
// SCYLLADB-1693. In case we abort, the snapshot/backup mechanism need
|
||||
// to remain open. Move it to after raft_decommission.
|
||||
// In the case of a cluster snapshot, our nodes ownership
|
||||
// or not of tables will be serialized by raft anyway, so
|
||||
// should remain consistent. In that case we at worst coordinate
|
||||
// from a node in "leave" status
|
||||
// In the case of a local snapshot, ownership matters less,
|
||||
// only sstables on disk, which should not change.
|
||||
// In the case of backup, this operates on a snapshot, state of which
|
||||
// is not affected.
|
||||
snapshot_ctl.invoke_on_all([](auto& sctl) {
|
||||
return sctl.disable_all_operations();
|
||||
}).get();
|
||||
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
|
||||
|
||||
ss.raft_decommission().get();
|
||||
|
||||
ss.stop_transport().get();
|
||||
slogger.info("DECOMMISSIONING: stopped transport");
|
||||
|
||||
@@ -4494,10 +4504,20 @@ future<> storage_service::local_topology_barrier() {
|
||||
version, current_version)));
|
||||
}
|
||||
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: waiting for stale token metadata versions to be released", version);
|
||||
{
|
||||
seastar::timer<lowres_clock> warn_timer([&ss, version] {
|
||||
rtlogger.warn("raft_topology_cmd::barrier_and_drain version {}: still waiting for stale versions, "
|
||||
"stale versions (version: use_count): {}",
|
||||
version, ss._shared_token_metadata.describe_stale_versions());
|
||||
});
|
||||
warn_timer.arm_periodic(std::chrono::minutes(5));
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
}
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: stale versions released, draining closing sessions", version);
|
||||
co_await get_topology_session_manager().drain_closing_sessions();
|
||||
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: done", version);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4509,7 +4529,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto group0_holder = _group0->hold_group0_gate();
|
||||
// do barrier to make sure we always see the latest topology
|
||||
rtlogger.info("topology cmd rpc {} index={}: starting read_barrier, term={}", cmd.cmd, cmd_index, term);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
rtlogger.info("topology cmd rpc {} index={}: read_barrier completed", cmd.cmd, cmd_index);
|
||||
if (raft_server.get_current_term() != term) {
|
||||
// Return an error since the command is from outdated leader
|
||||
co_return result;
|
||||
|
||||
@@ -3811,6 +3811,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
on_internal_error(rtlogger, ::format("Leaving node {} doesn't own tokens", node.id));
|
||||
}
|
||||
|
||||
// Leave break point. For testing decommission
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_before_leave", utils::wait_for_message(std::chrono::minutes(2)));
|
||||
|
||||
auto validation_result = validate_removing_node(_db, to_host_id(node.id));
|
||||
if (std::holds_alternative<node_validation_failure>(validation_result)) {
|
||||
builder.with_node(node.id)
|
||||
@@ -4237,6 +4240,7 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4400,6 +4404,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4723,7 +4728,6 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4736,6 +4740,8 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4797,7 +4803,6 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4818,7 +4823,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -543,11 +543,16 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
// during SSTable writing and removed before sealing. If the write
|
||||
// failed before sealing, the file may still be on disk and must be
|
||||
// cleaned up explicitly.
|
||||
// The component is only defined for the `ms` sstable format; for
|
||||
// older formats it is absent from the component map and looking up
|
||||
// its filename would throw std::out_of_range.
|
||||
// Use file_exists() to avoid a C++ exception on the common path
|
||||
// where the file was already removed before sealing.
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
}
|
||||
}
|
||||
if (sync) {
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
#include <map>
|
||||
#include <set>
|
||||
@@ -254,6 +255,7 @@ inline void trie_writer<Output>::lay_out_children(ptr<writer_node> x) {
|
||||
}
|
||||
|
||||
while (unwritten_children.size()) {
|
||||
seastar::thread::maybe_yield();
|
||||
// Find the smallest child which doesn't fit.
|
||||
// (If all fit, then this will be the past-the-end iterator).
|
||||
// Its predecessor will be the biggest child which does fit.
|
||||
@@ -350,6 +352,7 @@ template <trie_writer_sink Output>
|
||||
inline void trie_writer<Output>::complete_until_depth(size_t depth) {
|
||||
expensive_log("writer_node::complete_until_depth: start,_stack={}, depth={}, _current_depth={}", _stack.size(), depth, _current_depth);
|
||||
while (_current_depth > depth) {
|
||||
seastar::thread::maybe_yield();
|
||||
// Every node must be smaller than a page, and the transition chain
|
||||
// must be short enough to ensure that.
|
||||
//
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include "cql3/statements/property_definitions.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "table_helper.hh"
|
||||
@@ -135,10 +136,32 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
|
||||
}
|
||||
|
||||
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
// _prepared_stmt is a checked_weak_ptr into the prepared statements
|
||||
// cache and can be invalidated by a concurrent purge (e.g. on a schema
|
||||
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
|
||||
// but the pin protecting the entry is dropped when try_prepare()
|
||||
// returns. In release the chain of ready-future co_awaits back to here
|
||||
// resumes synchronously, but debug builds preempt on every co_await
|
||||
// even for ready futures, opening a window for a purge to drop the
|
||||
// entry and leave _prepared_stmt null. Loop until a synchronous
|
||||
// post-resume check finds _prepared_stmt valid; nothing can run between
|
||||
// that check and the dereference below. _insert_stmt is a strong
|
||||
// shared_ptr and is not affected by cache invalidation.
|
||||
while (true) {
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
if (_prepared_stmt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Pin a strong ref locally: while we suspend in execute(), a concurrent
|
||||
// insert() on this shard may reset _insert_stmt to nullptr if the
|
||||
// prepared_statements_cache entry gets invalidated, freeing the object.
|
||||
auto stmt = _insert_stmt;
|
||||
auto opts = opt_maker();
|
||||
opts.prepare(_prepared_stmt->bound_names);
|
||||
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);
|
||||
co_await utils::get_local_injector().inject("table_helper_insert_before_execute",
|
||||
utils::wait_for_message(std::chrono::seconds{30}));
|
||||
co_await stmt->execute(qp, qs, opts, std::nullopt);
|
||||
}
|
||||
|
||||
future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_strategy_name,
|
||||
|
||||
@@ -150,6 +150,8 @@ add_scylla_test(lister_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(locator_topology_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(lock_tables_metadata_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(log_heap_test
|
||||
KIND BOOST)
|
||||
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test
|
||||
@@ -323,6 +325,7 @@ add_scylla_test(combined_tests
|
||||
auth_cache_test.cc
|
||||
auth_test.cc
|
||||
batchlog_manager_test.cc
|
||||
table_helper_test.cc
|
||||
cache_algorithm_test.cc
|
||||
castas_fcts_test.cc
|
||||
cdc_test.cc
|
||||
|
||||
@@ -823,4 +823,42 @@ SEASTAR_TEST_CASE(test_prepared_statement_small_cache) {
|
||||
}, small_cache_config);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_insert) {
|
||||
using namespace std::chrono;
|
||||
loader loader;
|
||||
loading_cache_for_test<int, sstring> loading_cache(num_loaders, 1h, testlog);
|
||||
auto stop_cache = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
// insert() must populate the cache and invoke the loader exactly once.
|
||||
loading_cache.insert(0, loader.get()).get();
|
||||
BOOST_REQUIRE_EQUAL(loader.load_count(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
auto vp = loading_cache.find(0);
|
||||
BOOST_REQUIRE(vp != nullptr);
|
||||
BOOST_REQUIRE_EQUAL(*vp, test_string);
|
||||
|
||||
// A second insert() for the same key must not re-invoke the loader.
|
||||
loading_cache.insert(0, loader.get()).get();
|
||||
BOOST_REQUIRE_EQUAL(loader.load_count(), 1);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
|
||||
}
|
||||
|
||||
// Regression test for SCYLLADB-1699: insert() on a cache constructed with
|
||||
// expiry == 0 (caching disabled) must be a no-op rather than asserting in
|
||||
// loading_cache::get_ptr().
|
||||
SEASTAR_THREAD_TEST_CASE(test_loading_cache_insert_caching_disabled) {
|
||||
using namespace std::chrono;
|
||||
loader loader;
|
||||
loading_cache_for_test<int, sstring> loading_cache(num_loaders, 0ms, testlog);
|
||||
auto stop_cache = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
|
||||
|
||||
auto f = loading_cache.insert(0, loader.get());
|
||||
loading_cache.insert(0, loader.get()).get();
|
||||
|
||||
// The loader must not have been invoked and the cache must remain empty.
|
||||
BOOST_REQUIRE_EQUAL(loader.load_count(), 0);
|
||||
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
|
||||
BOOST_REQUIRE(loading_cache.find(0) == nullptr);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
36
test/boost/lock_tables_metadata_test.cc
Normal file
36
test/boost/lock_tables_metadata_test.cc
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/with_timeout.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Test that two lock_tables_metadata calls don't deadlock
|
||||
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
try {
|
||||
// Repeat the test scenario to increase the chance of hitting the deadlock.
|
||||
// If no deadlock occurs, each repetition should complete within a fraction of a second,
|
||||
// so even with 100 repetitions, the total test time should be reasonable.
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
with_timeout(lowres_clock::now() + 30s,
|
||||
when_all_succeed(
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result(),
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result()
|
||||
)).get();
|
||||
}
|
||||
} catch (seastar::timed_out_error&) {
|
||||
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
|
||||
_exit(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -246,6 +246,33 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
|
||||
});
|
||||
}
|
||||
|
||||
// Reproducer for SCYLLADB-1697
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
|
||||
testlog.info("Testing sstable version: {}", version);
|
||||
auto sst = make_sstable_for_this_shard([&env, version] {
|
||||
return env.make_sstable(test_table_schema(), version);
|
||||
});
|
||||
|
||||
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
|
||||
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
|
||||
|
||||
sst->unlink().get();
|
||||
|
||||
std::vector<sstring> remaining;
|
||||
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||
[&remaining] (fs::path, directory_entry de) {
|
||||
remaining.push_back(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_MESSAGE(remaining.empty(),
|
||||
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test the absence of TOC. Behavior is controllable by a flag
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
|
||||
111
test/boost/table_helper_test.cc
Normal file
111
test/boost/table_helper_test.cc
Normal file
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <vector>
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include "table_helper.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/query_state.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
// Regression test for use-after-free in table_helper::insert() when the
|
||||
// prepared_statements_cache entry is invalidated (e.g. DROP TABLE) while a
|
||||
// concurrent insert() is suspended in execute(). The injection point inside
|
||||
// insert() is used to park fiber A deterministically, then fiber B drops the
|
||||
// last strong ref; without the fix, resuming A crashes.
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(table_helper_test)
|
||||
|
||||
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
|
||||
SEASTAR_TEST_CASE(test_concurrent_invalidation) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& env) {
|
||||
auto& qp = env.local_qp();
|
||||
auto& mm = env.migration_manager().local();
|
||||
|
||||
env.execute_cql("CREATE KEYSPACE th_ks WITH replication = "
|
||||
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}").get();
|
||||
env.execute_cql("CREATE TABLE th_ks.t (id int PRIMARY KEY, v int)").get();
|
||||
|
||||
const sstring create_cql = "CREATE TABLE IF NOT EXISTS th_ks.t (id int PRIMARY KEY, v int)";
|
||||
const sstring insert_cql = "INSERT INTO th_ks.t (id, v) VALUES (?, ?)";
|
||||
|
||||
table_helper helper("th_ks", "t", create_cql, insert_cql);
|
||||
|
||||
service::query_state qs(service::client_state::for_internal_calls(), empty_service_permit());
|
||||
|
||||
auto make_opts = [] {
|
||||
std::vector<cql3::raw_value> vals {
|
||||
cql3::raw_value::make_value(int32_type->decompose(0)),
|
||||
cql3::raw_value::make_value(int32_type->decompose(0)),
|
||||
};
|
||||
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE,
|
||||
std::nullopt, std::move(vals), false,
|
||||
cql3::query_options::specific_options::DEFAULT);
|
||||
};
|
||||
|
||||
// Prime the prepared cache.
|
||||
helper.insert(qp, mm, qs, make_opts).get();
|
||||
|
||||
utils::get_local_injector().enable("table_helper_insert_before_execute", true /*one_shot*/);
|
||||
|
||||
// Fiber A: suspends at the injection, between cache_table_info() and execute().
|
||||
auto fiber_a = helper.insert(qp, mm, qs, make_opts);
|
||||
|
||||
// Wait until fiber A is actually parked in wait_for_message.
|
||||
while (utils::get_local_injector().waiters("table_helper_insert_before_execute") == 0) {
|
||||
seastar::yield().get();
|
||||
}
|
||||
|
||||
// Evict the prepared cache entry - drops its strong ref to the
|
||||
// modification_statement. helper._insert_stmt is the only ref left.
|
||||
env.execute_cql("DROP TABLE th_ks.t").discard_result().get();
|
||||
|
||||
// Fiber B: cache_table_info() sees the weak ref invalidated and sets
|
||||
// _insert_stmt = nullptr; the re-prepare then throws (table is gone).
|
||||
helper.insert(qp, mm, qs, make_opts)
|
||||
.handle_exception([] (std::exception_ptr) {}).get();
|
||||
|
||||
// Release fiber A. Unfixed: re-reads null _insert_stmt and crashes.
|
||||
utils::get_local_injector().receive_message("table_helper_insert_before_execute");
|
||||
|
||||
try {
|
||||
fiber_a.get();
|
||||
} catch (...) {
|
||||
// execute() may fail (table is gone); only the crash matters.
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#endif // SCYLLA_ENABLE_ERROR_INJECTION
|
||||
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
// The only test in this suite requires error injection support. Without this
|
||||
// dummy case the suite would be empty, which causes boost to report
|
||||
// "test tree is empty" and pytest to exit with code 5 ("no tests collected"),
|
||||
// failing CI in modes (e.g. release) where error injection is disabled.
|
||||
BOOST_AUTO_TEST_CASE(test_skipped_no_error_injection) {
|
||||
BOOST_TEST_MESSAGE("table_helper_test requires SCYLLA_ENABLE_ERROR_INJECTION; skipping");
|
||||
}
|
||||
#endif
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
@@ -0,0 +1,65 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_permissions_removal_and_restart(manager: ManagerClient) -> None:
|
||||
"""Test that a node boots successfully when role_permissions contains a
|
||||
ghost row with role and resource set but the permissions column missing.
|
||||
|
||||
The auth v2 migration (now removed) used INSERT to copy permission rows
|
||||
from the legacy table, which created CQL row markers. Normal GRANT uses
|
||||
UPDATE, which only writes collection cells without row markers. When
|
||||
permissions were later revoked, the collection cells were tombstoned but
|
||||
the row marker from the migration INSERT persisted. That leaves a row
|
||||
with role and resource but no permissions column.
|
||||
|
||||
This test simulates that scenario:
|
||||
1. INSERT permissions with row marker (simulating auth v2 migration)
|
||||
2. REVOKE ALL permissions (tombstones the cells, marker survives)
|
||||
3. Restart and verify the node boots successfully
|
||||
"""
|
||||
servers = await manager.servers_add(1, config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
server = servers[0]
|
||||
|
||||
await cql.run_async("CREATE ROLE scylla_admin WITH PASSWORD = 'x' AND LOGIN = true")
|
||||
await cql.run_async("CREATE ROLE scylla_manager WITH PASSWORD = 'x' AND LOGIN = true")
|
||||
|
||||
# Simulate auth v2 migration: INSERT creates a row marker alongside the
|
||||
# permission cells, unlike GRANT which uses UPDATE (no row marker).
|
||||
await cql.run_async(
|
||||
"INSERT INTO system.role_permissions (role, resource, permissions) "
|
||||
"VALUES ('scylla_admin', 'roles/scylla_manager', {'ALTER', 'AUTHORIZE', 'DROP'})")
|
||||
|
||||
# Revoke all permissions — tombstones the collection cells, but the
|
||||
# row marker from the INSERT survives, creating a ghost row.
|
||||
await cql.run_async("REVOKE ALL ON ROLE scylla_manager FROM scylla_admin")
|
||||
|
||||
# Additional check: a row with an explicitly empty permissions set.
|
||||
await cql.run_async("CREATE ROLE test_empty_perms WITH PASSWORD = 'x' AND LOGIN = true")
|
||||
await cql.run_async(
|
||||
"INSERT INTO system.role_permissions (role, resource) "
|
||||
"VALUES ('test_empty_perms', 'roles/scylla_manager')")
|
||||
|
||||
# Restart — the auth cache loads the ghost row and must not crash
|
||||
logger.info("Restarting node")
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
await manager.server_start(server.server_id)
|
||||
|
||||
await manager.driver_connect()
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
rows = await cql.run_async("SELECT * FROM system.local")
|
||||
assert len(rows) == 1, "Node should be functional after restart"
|
||||
logger.info("Node restarted successfully")
|
||||
@@ -60,8 +60,8 @@ async def insert_with_concurrency(cql, table, value_count, concurrency):
|
||||
@pytest.mark.skip_mode(mode='release', reason="error injections aren't enabled in release mode")
|
||||
async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient) -> None:
|
||||
node_count = 2
|
||||
await manager.servers_add(node_count, config={'error_injections_at_startup': ['view_update_limit', 'delay_before_remote_view_update']})
|
||||
cql = manager.get_cql()
|
||||
servers = await manager.servers_add(node_count, config={'error_injections_at_startup': ['view_update_limit', 'delay_before_remote_view_update', 'update_backlog_immediately']})
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, PRIMARY KEY (key, c))")
|
||||
await insert_with_concurrency(cql, f"{ks}.tab", 200, 100)
|
||||
@@ -71,8 +71,13 @@ async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient)
|
||||
|
||||
await wait_for_view(cql, "mv_cf_view", node_count)
|
||||
|
||||
# The view building process elevates the view update backlog, potentially above the limit.
|
||||
# When the view is build it should drop back down to 0 but this information may not reach
|
||||
# the coordinator before the delete, so we perform an additional write on the same host before
|
||||
# the delete - the current view update backlog will be propagated along the write response.
|
||||
await cql.run_async(f"INSERT INTO {ks}.tab (key, c) VALUES (0, 999)", host=hosts[0], timeout=300)
|
||||
logger.info(f"Deleting all rows from partition with key 0")
|
||||
await cql.run_async(f"DELETE FROM {ks}.tab WHERE key = 0", timeout=300)
|
||||
await cql.run_async(f"DELETE FROM {ks}.tab WHERE key = 0", host=hosts[0], timeout=300)
|
||||
|
||||
# Test deleting a large partition when there is a view with the same partition
|
||||
# key, and verify that view updates metrics is increased by exactly 1. Deleting
|
||||
|
||||
@@ -20,6 +20,8 @@ from cassandra.cluster import ConsistencyLevel
|
||||
from collections import defaultdict
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.rest_client import HTTPError
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import wait_for_token_ring_and_group0_consistency
|
||||
import statistics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -964,3 +966,64 @@ async def test_decommision_waits_for_backup(manager: ManagerClient, object_stora
|
||||
|
||||
await do_test_backup_helper(manager, object_storage, "backup_task_pre_upload", decommission_and_check, 2)
|
||||
|
||||
async def test_aborted_decommision_reenables_snapshot(manager: ManagerClient, object_storage):
|
||||
"""
|
||||
Tests that an aborted decommission will still allow snapshots
|
||||
"""
|
||||
num_servers = 2
|
||||
objconf = object_storage.create_endpoint_conf()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_endpoints': objconf,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
|
||||
servers = (await manager.servers_add(num_servers, config=cfg, cmdline=cmd))
|
||||
cql = manager.get_cql()
|
||||
cf = 'test_cf'
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.{cf} ( name text primary key, value text );")
|
||||
await asyncio.gather(*(cql.run_async(f"INSERT INTO {ks}.{cf} ( name, value ) VALUES ('{name}', '{value}');") for name, value in [('0', 'zero'), ('1', 'one'), ('2', 'two')]))
|
||||
|
||||
await manager.server_sees_others(servers[1].server_id, 1)
|
||||
|
||||
async def abort_decommission():
|
||||
tm = TaskManagerClient(manager.api)
|
||||
while True:
|
||||
logger.info("Listing tasks in %s", servers[1])
|
||||
tasks = await tm.list_tasks(servers[1].ip_addr, "node_ops")
|
||||
for t in tasks:
|
||||
if t.type == 'decommission':
|
||||
logger.debug("Found decommission task. Aborting...")
|
||||
await tm.abort_task(servers[1].ip_addr, t.task_id)
|
||||
|
||||
for s in servers:
|
||||
await manager.api.message_injection(s.ip_addr, "topology_coordinator_before_leave")
|
||||
|
||||
try:
|
||||
logger.debug("Checking decommission task status")
|
||||
status = await tm.wait_for_task(servers[1].ip_addr, t.task_id)
|
||||
logger.debug("Task status %s", status)
|
||||
return status.state != "done"
|
||||
except:
|
||||
return False
|
||||
await asyncio.sleep(.1)
|
||||
|
||||
async def decommission():
|
||||
try:
|
||||
logger.info("Decommissioning %s", servers[0])
|
||||
await manager.api.decommission_node(servers[0].ip_addr, 1000)
|
||||
except Exception as e:
|
||||
logger.error("Exception in decommission %s", e)
|
||||
pass
|
||||
|
||||
for s in servers:
|
||||
await manager.api.enable_injection(s.ip_addr, "topology_coordinator_before_leave", one_shot=True)
|
||||
|
||||
_, aborted = await asyncio.gather(decommission(), abort_decommission())
|
||||
|
||||
assert aborted, "Injection point sync should ensure we abort decommission"
|
||||
|
||||
logger.info("Decommissioned was aborted. Creating snapshot")
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
await take_snapshot_on_one_server(ks, servers[0], manager, logger)
|
||||
|
||||
@@ -1379,7 +1379,7 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
|
||||
# The next barrier must be for the write_both_read_new, we need a guarantee
|
||||
# that the src_shard observed it
|
||||
logger.info("Waiting for the next barrier")
|
||||
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
|
||||
await log.wait_for(f"\\[shard {src_shard}: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done",
|
||||
from_mark=m)
|
||||
|
||||
# Now we have a guarantee that a new barrier succeeded on the src_shard,
|
||||
|
||||
@@ -11,7 +11,8 @@ import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency,
|
||||
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected)
|
||||
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected,
|
||||
wait_for_no_pending_topology_transition)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None:
|
||||
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
|
||||
bootstrap, removenode, replace.
|
||||
|
||||
@@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
logger.debug("Kill coordinator during decommission")
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")
|
||||
await wait_new_coordinator_elected(manager, 2, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
@@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
node_to_remove_srv_id = other_nodes[-1].server_id
|
||||
logger.debug("Stop node with srv_id %s", node_to_remove_srv_id)
|
||||
await manager.server_stop_gracefully(node_to_remove_srv_id)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id)
|
||||
await manager.remove_node(working_srv_id,
|
||||
node_to_remove_srv_id,
|
||||
expected_error="Removenode failed. See earlier errors")
|
||||
|
||||
await wait_new_coordinator_elected(manager, 3, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
|
||||
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
|
||||
logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id)
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id)
|
||||
await manager.remove_node(working_srv_id, node_to_remove_srv_id)
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
logger.debug("Restore number of nodes in cluster")
|
||||
await manager.server_add(cmdline=cmdline)
|
||||
await manager.server_add(config=config, cmdline=cmdline)
|
||||
|
||||
# kill coordinator during bootstrap
|
||||
logger.debug("Kill coordinator during bootstrap")
|
||||
nodes = await manager.running_servers()
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
new_node = await manager.server_add(start=False, cmdline=cmdline)
|
||||
new_node = await manager.server_add(start=False, config=config, cmdline=cmdline)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
await manager.server_start(new_node.server_id,
|
||||
expected_error="Startup failed: std::runtime_error")
|
||||
await wait_new_coordinator_elected(manager, 4, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
@@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
node_to_replace_srv_id = other_nodes[-1].server_id
|
||||
await manager.server_stop_gracefully(node_to_replace_srv_id)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True)
|
||||
new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline)
|
||||
new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline)
|
||||
await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors")
|
||||
await wait_new_coordinator_elected(manager, 5, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
logger.debug("Start old coordinator node")
|
||||
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
@@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it")
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60))
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_failure_after_group0_server_registration(manager: ManagerClient) -> None:
|
||||
"""Test that a node shuts down cleanly when group0 startup fails after server registration.
|
||||
|
||||
Reproducer for: CUSTOMER-340, CUSTOMER-335, SCYLLADB-1217
|
||||
|
||||
On restart, setup_group0_if_exist() calls start_server_for_group0() which
|
||||
registers the raft server in raft_group_registry._servers, then calls
|
||||
enable_in_memory_state_machine(). If enable_in_memory_state_machine() throws
|
||||
(e.g., because reload_state() -> auth_cache().load_all() fails due to topology
|
||||
being in a transitional state), the exception propagates and stack unwinding
|
||||
calls raft_group_registry::stop().
|
||||
|
||||
Previously, _group0 was set AFTER the with_scheduling_group lambda returned,
|
||||
so a throw inside the lambda left _group0 as monostate. abort_and_drain() and
|
||||
destroy() would be no-ops, leaving the server orphaned in _servers.
|
||||
raft_group_registry::stop() would then hit on_internal_error
|
||||
("server for group ... is not destroyed") and abort.
|
||||
|
||||
The fix moves _group0.emplace() inside the lambda, immediately after
|
||||
start_server_for_group(), so destroy() can always find and clean up the server.
|
||||
|
||||
This test:
|
||||
1. Starts a node normally (group0 established)
|
||||
2. Stops the node
|
||||
3. Restarts with an injection that fails enable_in_memory_state_machine()
|
||||
4. Verifies the node fails startup cleanly (no abort)
|
||||
"""
|
||||
# Start a node normally so group0 is established
|
||||
srv = await manager.server_add()
|
||||
logger.info("Server %s started successfully with group0", srv.server_id)
|
||||
|
||||
logger.info("Stopping server %s", srv.server_id)
|
||||
await manager.server_stop_gracefully(srv.server_id)
|
||||
|
||||
logger.info("Restarting server %s with injection to fail enable_in_memory_state_machine", srv.server_id)
|
||||
await manager.server_update_config(srv.server_id,
|
||||
key='error_injections_at_startup',
|
||||
value=['group0_state_machine_enable_in_memory_fail'])
|
||||
await manager.server_start(srv.server_id,
|
||||
expected_error="injected failure in enable_in_memory_state_machine")
|
||||
|
||||
# If we get here without the test framework detecting a crash/abort,
|
||||
# the node shut down cleanly. The fix ensures abort_and_drain()/destroy()
|
||||
# can find and clean up the raft server even when startup fails.
|
||||
logger.info("Server failed startup and shut down cleanly (no abort)")
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown, create_new_test_keyspace
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,12 +880,25 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
|
||||
async def _setup_table_for_race_window(manager, servers, cql):
|
||||
"""Create a fresh keyspace+table with incremental repair setup for the race window test.
|
||||
|
||||
Creates a new keyspace (unique name each call), creates the table with
|
||||
tombstone_gc=repair and STCS min_threshold=2, inserts keys 0-9 as baseline,
|
||||
runs repair 1 (sstables_repaired_at=1), then inserts keys 10-19 (subject
|
||||
of repair 2) and flushes all nodes.
|
||||
|
||||
Returns (ks, current_key) where current_key is 20.
|
||||
"""
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': 3} AND tablets = {'initial': 2};")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) "
|
||||
f"WITH tombstone_gc = {{'mode':'repair'}};")
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
@@ -894,27 +907,47 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Insert keys 0-9 (baseline for repair 1).
|
||||
keys = list(range(0, 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in keys])
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
# Keys 0-9 end up in S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", 'all', incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
repair2_keys = list(range(10, 20))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
return ks, 20
|
||||
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if the topology coordinator changes or if a
|
||||
residual re-repair is detected, signalling the caller to retry with a fresh
|
||||
keyspace.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -948,7 +981,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
# still 1, so is_repaired(1, S1'{repaired_at=2}) == false and S1' lands in the
|
||||
# UNREPAIRED compaction view on every replica. The race window is now open.
|
||||
pos, _ = await coord_log.wait_for("Finished tablet repair host=", from_mark=coord_mark)
|
||||
await coord_log.wait_for("Finished tablet repair host=", from_mark=pos)
|
||||
post_marks_pos, _ = await coord_log.wait_for("Finished tablet repair host=", from_mark=pos)
|
||||
|
||||
# --- Race window is open ---
|
||||
# Write post-repair keys 20-29. All nodes receive the writes into their memtables
|
||||
@@ -978,6 +1011,29 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred during the restart. Any coordinator
|
||||
# change (not just to servers[1]) can trigger a residual re-repair that
|
||||
# flushes memtables on all replicas and marks post-repair data as repaired,
|
||||
# masking the bug this test detects.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
if new_coord != coord:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
f"topology coordinator changed from {coord} to {new_coord} after restart")
|
||||
|
||||
# Even without a coordinator change, check if the coordinator initiated a
|
||||
# residual re-repair (e.g. after seeing tablets stuck in the repair stage
|
||||
# following the topology restart). Such a re-repair flushes memtables on
|
||||
# all replicas and contaminates the repaired set with post-repair data.
|
||||
rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos)
|
||||
if rerepair_matches:
|
||||
logger.warning(f"Coordinator initiated residual re-repair post-restart: {rerepair_matches[0][1]}")
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"coordinator initiated residual re-repair after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -991,16 +1047,32 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
break
|
||||
if compaction_ran:
|
||||
break
|
||||
# Check for residual re-repair during the polling window.
|
||||
rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos)
|
||||
if rerepair_matches:
|
||||
logger.warning(f"Coordinator initiated residual re-repair during poll: {rerepair_matches[0][1]}")
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"coordinator initiated residual re-repair during compaction poll")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# --- Release the race window ---
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
|
||||
# Final re-repair check after injection release: the coordinator may have
|
||||
# queued a re-repair that only executes once the injection is lifted.
|
||||
rerepair_matches = await coord_log.grep("Initiating tablet repair host=", from_mark=post_marks_pos)
|
||||
if rerepair_matches:
|
||||
logger.warning(f"Coordinator initiated residual re-repair after injection release: {rerepair_matches[0][1]}")
|
||||
raise _LeadershipTransferred(
|
||||
"coordinator initiated residual re-repair after injection release")
|
||||
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return
|
||||
return current_key
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1031,8 +1103,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,6 +1126,34 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, _, _, _, _, _, _, _ = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
ks, current_key = await _setup_table_for_race_window(manager, servers, cql)
|
||||
|
||||
# If leadership transfers or a residual re-repair is triggered between our
|
||||
# coordinator check and the restart, the coordinator change masks the bug.
|
||||
# Detect and retry with a fresh keyspace.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, 'all', scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
ks, current_key = await _setup_table_for_race_window(manager, servers, cql)
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -16,6 +16,7 @@ from cassandra.cluster import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import HTTPError
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import new_test_keyspace
|
||||
|
||||
@@ -354,3 +355,28 @@ async def test_small_table_optimization_repair(manager):
|
||||
|
||||
rows = await cql.run_async(f"SELECT * from system.repair_history")
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_repair_rejects_equal_start_and_end_token(manager):
|
||||
"""Verify that repair rejects a request where startToken == endToken.
|
||||
When start == end, the wrapping range (T, T] covers the full token ring,
|
||||
causing an unintended full repair instead of a no-op.
|
||||
Reproduces https://scylladb.atlassian.net/browse/CUSTOMER-358
|
||||
"""
|
||||
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND TABLETS = {'enabled': false}")
|
||||
cql.execute("CREATE TABLE ks.tbl (pk int PRIMARY KEY)")
|
||||
|
||||
token = "1558831538804957103"
|
||||
params = {
|
||||
"columnFamilies": "tbl",
|
||||
"startToken": token,
|
||||
"endToken": token,
|
||||
}
|
||||
with pytest.raises(HTTPError, match="Start and end tokens must be different"):
|
||||
await manager.api.client.post_json(f"/storage_service/repair_async/ks",
|
||||
host=servers[0].ip_addr, params=params)
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
@@ -961,7 +961,7 @@ async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout
|
||||
logger.info("Wait for the global barrier to start draining on shard0")
|
||||
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
|
||||
# Just to confirm that the guard still holds the erm
|
||||
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
|
||||
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m)
|
||||
assert len(matches) == 0
|
||||
|
||||
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.
|
||||
|
||||
@@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from test.pylib.internal_types import ServerInfo, HostID
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
|
||||
from typing import Optional, List, Union
|
||||
|
||||
@@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non
|
||||
assert token_ring_ids == group0_ids
|
||||
|
||||
|
||||
async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None:
|
||||
"""Wait until there is no pending topology transition.
|
||||
Polls system.topology until the transition_state column is null,
|
||||
indicating that the topology coordinator has finished processing the
|
||||
current operation (whether it completed successfully or was rolled back).
|
||||
"""
|
||||
cql = manager.get_cql()
|
||||
|
||||
async def no_transition():
|
||||
try:
|
||||
host = await get_available_host(cql, deadline)
|
||||
await read_barrier(manager.api, get_host_api_address(host))
|
||||
rs = await cql.run_async(
|
||||
"select transition_state from system.topology where key = 'topology'",
|
||||
host=host)
|
||||
except NoHostAvailable as e:
|
||||
logger.info(f"Topology transition check failed, retrying: {e}")
|
||||
return None
|
||||
except ConnectionException as e:
|
||||
logger.info(f"Topology transition check failed, retrying: {e}")
|
||||
return None
|
||||
except HTTPError as e:
|
||||
logger.info(f"Read barrier failed, retrying: {e}")
|
||||
return None
|
||||
|
||||
if not rs:
|
||||
logger.warning(f"Topology transition not visible: system.topology row not found, retrying")
|
||||
return None
|
||||
if rs[0].transition_state is not None:
|
||||
logger.warning(f"Topology transition still in progress: {rs[0].transition_state}")
|
||||
return None
|
||||
return True
|
||||
|
||||
await wait_for(no_transition, deadline, period=.5)
|
||||
|
||||
|
||||
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
|
||||
"""
|
||||
Weaker version of the above check.
|
||||
@@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str:
|
||||
async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None:
|
||||
"""Wait new coordinator to be elected
|
||||
|
||||
Wait while the table 'system.group0_history' will have a number of lines
|
||||
with the 'new topology coordinator' equal to the expected_num_of_elections number,
|
||||
Wait while the table 'system.group0_history' will have at least
|
||||
expected_num_of_elections lines with 'new topology coordinator',
|
||||
and the latest host_id coordinator differs from the previous one.
|
||||
"""
|
||||
async def new_coordinator_elected():
|
||||
coordinators_ids = await get_coordinator_host_ids(manager)
|
||||
if len(coordinators_ids) == expected_num_of_elections \
|
||||
logger.debug(f"Coordinators ids in history: {coordinators_ids}")
|
||||
if len(coordinators_ids) >= expected_num_of_elections \
|
||||
and coordinators_ids[0] != coordinators_ids[1]:
|
||||
return True
|
||||
logger.warning("New coordinator was not elected %s", coordinators_ids)
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "ldap_common.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
|
||||
},
|
||||
make_ldap_config());
|
||||
}
|
||||
|
||||
// Reproduces the race between the cache pruner and the permission
|
||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||
auto cfg = make_ldap_config();
|
||||
cfg->permissions_update_interval_in_ms.set(1);
|
||||
|
||||
auto call_count = seastar::make_lw_shared<int>(0);
|
||||
|
||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||
auto& cache = env.auth_cache().local();
|
||||
|
||||
testlog.info("Populating 50 cache entries");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||
}
|
||||
|
||||
testlog.info("Installing slow permission loader (10ms per call)");
|
||||
cache.set_permission_loader(
|
||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||
-> seastar::future<auth::permission_set> {
|
||||
++(*call_count);
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
co_return auth::permission_set();
|
||||
});
|
||||
|
||||
testlog.info("Waiting for pruner to start reloading");
|
||||
while (*call_count == 0) {
|
||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||
}
|
||||
|
||||
testlog.info("Pruner started, letting teardown run");
|
||||
}, cfg);
|
||||
|
||||
testlog.info("Loader called {} times", *call_count);
|
||||
}
|
||||
|
||||
|
||||
@@ -176,7 +176,7 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
vs.start_background_tasks();
|
||||
|
||||
// Wait for the DNS resolution to fail
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
BOOST_CHECK(co_await repeat_until([&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
@@ -184,7 +184,7 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
BOOST_CHECK(co_await repeat_until([&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
@@ -193,12 +193,11 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
|
||||
|
||||
fail_dns_resolution = true;
|
||||
// Trigger DNS resolver to check for address changes
|
||||
// Resolver will not re-check automatically after successful resolution
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail again
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
// Wait for the DNS resolution to fail again.
|
||||
// Trigger is called inside the loop to mitigate SCYLLADB-1794.
|
||||
BOOST_CHECK(co_await repeat_until([&vs, &as]() -> future<bool> {
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
@@ -208,7 +207,7 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
BOOST_CHECK(co_await repeat_until([&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
|
||||
@@ -143,6 +143,7 @@ class error_injection {
|
||||
struct injection_shared_data {
|
||||
size_t received_message_count{0};
|
||||
size_t shared_read_message_count{0};
|
||||
size_t waiter_count{0};
|
||||
condition_variable received_message_cv;
|
||||
error_injection_parameters parameters;
|
||||
sstring injection_name;
|
||||
@@ -216,6 +217,8 @@ public:
|
||||
}) : optimized_optional<abort_source::subscription>{};
|
||||
|
||||
try {
|
||||
++_shared_data->waiter_count;
|
||||
auto dec = defer([this] () noexcept { --_shared_data->waiter_count; });
|
||||
co_await _shared_data->received_message_cv.wait(timeout, [&] {
|
||||
if (as) {
|
||||
as->check();
|
||||
@@ -365,6 +368,17 @@ public:
|
||||
return data && !data->is_ongoing_oneshot();
|
||||
}
|
||||
|
||||
// \brief Returns the number of handlers of the named injection that are
|
||||
// currently suspended in wait_for_message().
|
||||
//
|
||||
// Intended for tests that need to synchronize with one or more fibers
|
||||
// parked on an injection.
|
||||
// \param name error injection name to check
|
||||
size_t waiters(const std::string_view& injection_name) const {
|
||||
auto data = get_data(injection_name);
|
||||
return data ? data->shared_data->waiter_count : 0;
|
||||
}
|
||||
|
||||
// \brief Enter into error injection if it's enabled
|
||||
// \param name error injection name to check
|
||||
bool enter(const std::string_view& name) {
|
||||
@@ -623,6 +637,10 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t waiters(const std::string_view& name) const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool enter(const std::string_view& name) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -65,8 +65,9 @@ struct do_nothing_loading_cache_stats {
|
||||
/// The values are going to be evicted from the cache if they are not accessed during the "expiration" period or haven't
|
||||
/// been reloaded even once during the same period.
|
||||
///
|
||||
/// If "expiration" is set to zero - the caching is going to be disabled and get_XXX(...) is going to call the "loader" callback
|
||||
/// every time in order to get the requested value.
|
||||
/// If "expiration" is set to zero - the caching is going to be disabled and get(...) is going to call the "loader" callback
|
||||
/// every time in order to get the requested value. insert(...) is going to be a no-op in this mode. get_ptr(...) is not
|
||||
/// safe to call when caching is disabled (it asserts) since it returns a handle into the cache.
|
||||
///
|
||||
/// \note In order to avoid the eviction of cached entries due to "aging" of the contained value the user has to choose
|
||||
/// the "expiration" to be at least ("refresh" + "max load latency"). This way the value is going to stay in the cache and is going to be
|
||||
@@ -353,6 +354,24 @@ public:
|
||||
return get_ptr(k, _load);
|
||||
}
|
||||
|
||||
/// \brief Insert a value into the cache, loading it via \p load if not already present.
|
||||
///
|
||||
/// Equivalent to get_ptr(k, load).discard_result() when caching is enabled,
|
||||
/// but is a no-op when caching is disabled (i.e. the cache was constructed
|
||||
/// with expiry == 0). Use this when you only want the side effect of
|
||||
/// populating the cache and don't need a handle to the cached value.
|
||||
///
|
||||
/// Unlike get_ptr(), it is safe to call this on a cache configured with
|
||||
/// caching disabled.
|
||||
template <typename LoadFunc>
|
||||
requires std::is_invocable_r_v<future<value_type>, LoadFunc, const key_type&>
|
||||
future<> insert(const Key& k, LoadFunc&& load) {
|
||||
if (!caching_enabled()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return get_ptr(k, std::forward<LoadFunc>(load)).discard_result();
|
||||
}
|
||||
|
||||
future<Tp> get(const Key& k) {
|
||||
static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user