Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
6851de4899 Fix token kind comparison in decorated_key::tri_compare
When comparing decorated_key with ring_position, we need to account for
the token kind. decorated_key tokens are always token_kind::key, but
ring_position tokens can be before_all_keys or after_all_keys.

The previous version incorrectly compared only _data fields, which would
produce wrong results when the ring_position token had a different kind.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-29 13:58:03 +00:00
copilot-swe-agent[bot]
79aa26becd Replace dht::token with int64_t in decorated_key
- Change decorated_key._token from dht::token to int64_t _token_data
- Update token() method to return constructed dht::token
- Update all direct field accesses to use token() method
- Update comparisons to use _token_data directly for efficiency
- Remove token's external_memory_usage contribution (was 0)
- Update formatters and hash functions

This eliminates 8 bytes of bloat per decorated_key instance by removing
the redundant token_kind field (always token_kind::key for decorated_key).

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-29 13:54:46 +00:00
copilot-swe-agent[bot]
eb9852499c Initial plan 2026-01-29 13:49:15 +00:00
55 changed files with 690 additions and 898 deletions

View File

@@ -35,6 +35,8 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- run: |
sudo dnf -y install clang-tools-extra
- name: Generate compilation database
run: |
cmake \

View File

@@ -244,10 +244,7 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
// Check if two JSON-encoded values match with the CONTAINS relation
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
return false;
}
if (!v2.IsObject() || v2.MemberCount() == 0) {
if (!v1) {
return false;
}
const auto& kv1 = *v1->MemberBegin();

View File

@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
}
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
if (_should_add_to_response) {
if (_should_add_to_reponse) {
auto consumption = rjson::empty_object();
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
rjson::add(response, "ConsumedCapacity", std::move(consumption));
@@ -53,9 +53,7 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
}
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
// by using division with modulo instead of addition before division
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
if (is_quorum) {
half_units *= 2;

View File

@@ -28,9 +28,9 @@ namespace alternator {
class consumed_capacity_counter {
public:
consumed_capacity_counter() = default;
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
bool operator()() const noexcept {
return _should_add_to_response;
return _should_add_to_reponse;
}
consumed_capacity_counter& operator +=(uint64_t bytes);
@@ -44,7 +44,7 @@ public:
uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request);
protected:
bool _should_add_to_response = false;
bool _should_add_to_reponse = false;
};
class rcu_consumed_capacity_counter : public consumed_capacity_counter {

View File

@@ -834,13 +834,11 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// A race condition is possible: if a DescribeTable request arrives on a different shard before
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
// 1. Both calculations will cache their results with an expiry time
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
// 3. Even if expiry times match, different shards may briefly return different table sizes
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
// exact precision for DescribeTable size information
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}

View File

@@ -52,6 +52,13 @@ static const class_registrator<
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
@@ -60,13 +67,13 @@ static db::consistency_level consistency_for_role(std::string_view role_name) no
return db::consistency_level::LOCAL_ONE;
}
future<std::optional<standard_role_manager::record>> standard_role_manager::legacy_find_record(std::string_view role_name) {
static future<std::optional<record>> find_record(cql3::query_processor& qp, std::string_view role_name) {
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE {} = ?",
get_auth_ks_name(_qp),
get_auth_ks_name(qp),
meta::roles_table::name,
meta::roles_table::role_col_name);
const auto results = co_await _qp.execute_internal(
const auto results = co_await qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_query_state(),
@@ -86,25 +93,8 @@ future<std::optional<standard_role_manager::record>> standard_role_manager::lega
: role_set())});
}
future<std::optional<standard_role_manager::record>> standard_role_manager::find_record(std::string_view role_name) {
if (legacy_mode(_qp)) {
return legacy_find_record(role_name);
}
auto name = sstring(role_name);
auto role = _cache.get(name);
if (!role) {
return make_ready_future<std::optional<record>>(std::nullopt);
}
return make_ready_future<std::optional<record>>(std::make_optional(record{
.name = std::move(name),
.is_superuser = role->is_superuser,
.can_login = role->can_login,
.member_of = role->member_of
}));
}
future<standard_role_manager::record> standard_role_manager::require_record(std::string_view role_name) {
return find_record(role_name).then([role_name](std::optional<record> mr) {
static future<record> require_record(cql3::query_processor& qp, std::string_view role_name) {
return find_record(qp, role_name).then([role_name](std::optional<record> mr) {
if (!mr) {
throw nonexistant_role(role_name);
}
@@ -396,7 +386,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
return fmt::to_string(fmt::join(assignments, ", "));
};
return require_record(role_name).then([this, role_name, &u, &mc](record) {
return require_record(_qp, role_name).then([this, role_name, &u, &mc](record) {
if (!u.is_superuser && !u.can_login) {
return make_ready_future<>();
}
@@ -630,17 +620,18 @@ standard_role_manager::revoke(std::string_view revokee_name, std::string_view ro
});
}
future<> standard_role_manager::collect_roles(
static future<> collect_roles(
cql3::query_processor& qp,
std::string_view grantee_name,
bool recurse,
role_set& roles) {
return require_record(grantee_name).then([this, &roles, recurse](standard_role_manager::record r) {
return do_with(std::move(r.member_of), [this, &roles, recurse](const role_set& memberships) {
return do_for_each(memberships.begin(), memberships.end(), [this, &roles, recurse](const sstring& role_name) {
return require_record(qp, grantee_name).then([&qp, &roles, recurse](record r) {
return do_with(std::move(r.member_of), [&qp, &roles, recurse](const role_set& memberships) {
return do_for_each(memberships.begin(), memberships.end(), [&qp, &roles, recurse](const sstring& role_name) {
roles.insert(role_name);
if (recurse) {
return collect_roles(role_name, true, roles);
return collect_roles(qp, role_name, true, roles);
}
return make_ready_future<>();
@@ -655,7 +646,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
return do_with(
role_set{sstring(grantee_name)},
[this, grantee_name, recurse](role_set& roles) {
return collect_roles(grantee_name, recurse, roles).then([&roles] { return roles; });
return collect_roles(_qp, grantee_name, recurse, roles).then([&roles] { return roles; });
});
}
@@ -715,21 +706,27 @@ future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
}
future<bool> standard_role_manager::exists(std::string_view role_name) {
return find_record(role_name).then([](std::optional<record> mr) {
return find_record(_qp, role_name).then([](std::optional<record> mr) {
return static_cast<bool>(mr);
});
}
future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
return require_record(role_name).then([](record r) {
return require_record(_qp, role_name).then([](record r) {
return r.is_superuser;
});
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
return require_record(role_name).then([](record r) {
return r.can_login;
});
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {

View File

@@ -90,12 +90,6 @@ public:
private:
enum class membership_change { add, remove };
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
future<> create_legacy_metadata_tables_if_missing() const;
@@ -113,14 +107,6 @@ private:
future<> legacy_modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change);
future<> modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change, ::service::group0_batch& mc);
future<std::optional<record>> legacy_find_record(std::string_view role_name);
future<std::optional<record>> find_record(std::string_view role_name);
future<record> require_record(std::string_view role_name);
future<> collect_roles(
std::string_view grantee_name,
bool recurse,
role_set& roles);
};
} // namespace auth

View File

@@ -329,13 +329,13 @@ public:
auto it = candidates.begin();
auto& first_sstable = *it;
it++;
dht::token first = first_sstable->get_first_decorated_key()._token;
dht::token last = first_sstable->get_last_decorated_key()._token;
dht::token first = first_sstable->get_first_decorated_key().token();
dht::token last = first_sstable->get_last_decorated_key().token();
while (it != candidates.end()) {
auto& candidate_sstable = *it;
it++;
dht::token first_candidate = candidate_sstable->get_first_decorated_key()._token;
dht::token last_candidate = candidate_sstable->get_last_decorated_key()._token;
dht::token first_candidate = candidate_sstable->get_first_decorated_key().token();
dht::token last_candidate = candidate_sstable->get_last_decorated_key().token();
first = first <= first_candidate? first : first_candidate;
last = last >= last_candidate ? last : last_candidate;
@@ -345,7 +345,7 @@ public:
template <typename T>
static std::vector<sstables::shared_sstable> overlapping(const schema& s, const sstables::shared_sstable& sstable, const T& others) {
return overlapping(s, sstable->get_first_decorated_key()._token, sstable->get_last_decorated_key()._token, others);
return overlapping(s, sstable->get_first_decorated_key().token(), sstable->get_last_decorated_key().token(), others);
}
/**
@@ -359,7 +359,7 @@ public:
auto range = ::wrapping_interval<dht::token>::make(start, end);
for (auto& candidate : sstables) {
auto candidate_range = ::wrapping_interval<dht::token>::make(candidate->get_first_decorated_key()._token, candidate->get_last_decorated_key()._token);
auto candidate_range = ::wrapping_interval<dht::token>::make(candidate->get_first_decorated_key().token(), candidate->get_last_decorated_key().token());
if (range.overlaps(candidate_range, dht::token_comparator())) {
overlapped.push_back(candidate);

View File

@@ -621,6 +621,25 @@ db::config::config(std::shared_ptr<db::extensions> exts)
* @GroupDescription: Provides an overview of the group.
*/
/**
* @Group Ungrouped properties
*/
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
"true: auto-adjust memtable shares for flush processes")
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
"Set to 0 to disable automatic flushing all tables before major compaction.")
/**
* @Group Initialization properties
* @GroupDescription The minimal properties needed for configuring a cluster.
*/
@@ -1375,10 +1394,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2,
"Admit new reads while there are less than this number of requests that need CPU.")
, reader_concurrency_semaphore_preemptive_abort_factor(this, "reader_concurrency_semaphore_preemptive_abort_factor", liveness::LiveUpdate, value_status::Used, 0.3,
"Admit new reads while their remaining time is more than this factor times their timeout times when arrived to a semaphore. Its vale means\n"
"* <= 0.0 means new reads will never get rejected during admission\n"
"* >= 1.0 means new reads will always get rejected during admission\n")
, view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2,
"Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4,
@@ -1587,25 +1602,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
/**
* @Group Ungrouped properties
*/
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
"true: auto-adjust memtable shares for flush processes")
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
"Set to 0 to disable automatic flushing all tables before major compaction.")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")

View File

@@ -185,6 +185,13 @@ public:
* All values and documentation taken from
* http://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html
*/
named_value<double> background_writer_scheduling_quota;
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name;
named_value<sstring> listen_address;
named_value<sstring> listen_interface;
@@ -439,7 +446,6 @@ public:
named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_cpu_concurrency;
named_value<float> reader_concurrency_semaphore_preemptive_abort_factor;
named_value<uint32_t> view_update_reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_cpu_concurrency;
@@ -606,14 +612,6 @@ public:
named_value<float> size_based_balance_threshold_percentage;
named_value<uint64_t> minimal_tablet_size_for_balancing;
named_value<double> background_writer_scheduling_quota;
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
static const sstring default_tls_priority;
private:
template<typename T>

View File

@@ -24,7 +24,7 @@
#include "readers/forwardable.hh"
#include "readers/nonforwardable.hh"
#include "cache_mutation_reader.hh"
#include "replica/partition_snapshot_reader.hh"
#include "partition_snapshot_reader.hh"
#include "keys/clustering_key_filter.hh"
#include "utils/assert.hh"
#include "utils/updateable_value.hh"
@@ -845,7 +845,7 @@ mutation_reader row_cache::make_nonpopulating_reader(schema_ptr schema, reader_p
cache_entry& e = *i;
upgrade_entry(e);
tracing::trace(ts, "Reading partition {} from cache", pos);
return replica::make_partition_snapshot_reader<false, dummy_accounter>(
return make_partition_snapshot_flat_reader<false, dummy_accounter>(
schema,
std::move(permit),
e.key(),

View File

@@ -30,11 +30,13 @@ namespace dht {
// Total ordering defined by comparators is compatible with Origin's ordering.
class decorated_key {
public:
dht::token _token;
// Store only the token data as int64_t to avoid the bloat of storing
// token_kind, which is always token_kind::key for decorated_key.
int64_t _token_data;
partition_key _key;
decorated_key(dht::token t, partition_key k)
: _token(std::move(t))
: _token_data(t._data)
, _key(std::move(k)) {
}
@@ -56,8 +58,8 @@ public:
std::strong_ordering tri_compare(const schema& s, const decorated_key& other) const;
std::strong_ordering tri_compare(const schema& s, const ring_position& other) const;
const dht::token& token() const noexcept {
return _token;
dht::token token() const noexcept {
return dht::token(_token_data);
}
const partition_key& key() const {
@@ -65,7 +67,7 @@ public:
}
size_t external_memory_usage() const {
return _key.external_memory_usage() + _token.external_memory_usage();
return _key.external_memory_usage();
}
size_t memory_usage() const {
@@ -102,6 +104,6 @@ template <> struct fmt::formatter<dht::decorated_key> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const dht::decorated_key& dk, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "{{key: {}, token: {}}}", dk._key, dk._token);
return fmt::format_to(ctx.out(), "{{key: {}, token: {}}}", dk._key, dk.token());
}
};

View File

@@ -95,7 +95,7 @@ std::unique_ptr<dht::i_partitioner> make_partitioner(sstring partitioner_name) {
bool
decorated_key::equal(const schema& s, const decorated_key& other) const {
if (_token == other._token) {
if (_token_data == other._token_data) {
return _key.legacy_equal(s, other._key);
}
return false;
@@ -103,7 +103,7 @@ decorated_key::equal(const schema& s, const decorated_key& other) const {
std::strong_ordering
decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
auto r = _token <=> other._token;
auto r = _token_data <=> other._token_data;
if (r != 0) {
return r;
} else {
@@ -113,13 +113,24 @@ decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
std::strong_ordering
decorated_key::tri_compare(const schema& s, const ring_position& other) const {
auto r = _token <=> other.token();
if (r != 0) {
return r;
} else if (other.has_key()) {
return _key.legacy_tri_compare(s, *other.key());
// decorated_key tokens are always of token_kind::key, so we need to
// account for ring_position tokens that might be before_all_keys or after_all_keys
const auto& other_token = other.token();
if (other_token._kind == token_kind::key) [[likely]] {
auto r = _token_data <=> other_token._data;
if (r != 0) {
return r;
} else if (other.has_key()) {
return _key.legacy_tri_compare(s, *other.key());
}
return 0 <=> other.relation_to_keys();
} else if (other_token._kind == token_kind::before_all_keys) {
// decorated_key (token_kind::key) > before_all_keys
return std::strong_ordering::greater;
} else {
// decorated_key (token_kind::key) < after_all_keys
return std::strong_ordering::less;
}
return 0 <=> other.relation_to_keys();
}
bool

View File

@@ -93,12 +93,12 @@ public:
{ }
ring_position(const dht::decorated_key& dk)
: _token(dk._token)
: _token(dk.token())
, _key(std::make_optional(dk._key))
{ }
ring_position(dht::decorated_key&& dk)
: _token(std::move(dk._token))
: _token(dk.token())
, _key(std::make_optional(std::move(dk._key)))
{ }

View File

@@ -78,7 +78,6 @@ Permits are in one of the following states:
* `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability;
* `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed;
* `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume;
* `preemptive_aborted` - the permit timed out or was rejected during admission as it was detected the read might time out later during execution;
Note that some older releases will have different names for some of these states or lack some of the states altogether:

View File

@@ -124,7 +124,6 @@ There are several test directories that are excluded from orchestration by `test
- test/cql
- test/cqlpy
- test/rest_api
- test/scylla_gdb
This means that `test.py` will not run tests directly, but will delegate all work to `pytest`.
That's why all these directories do not have `suite.yaml` files.

View File

@@ -31,7 +31,6 @@ fi
debian_base_packages=(
clang
clang-tools
gdb
cargo
wabt
@@ -73,7 +72,6 @@ debian_base_packages=(
fedora_packages=(
clang
clang-tools-extra
compiler-rt
libasan
libubsan

View File

@@ -316,7 +316,7 @@ auto fmt::formatter<mutation>::format(const mutation& m, fmt::format_context& ct
++column_iterator;
}
return fmt::format_to(out, "token: {}}}, {}\n}}", dk._token, mutation_partition::printer(s, m.partition()));
return fmt::format_to(out, "token: {}}}, {}\n}}", dk.token(), mutation_partition::printer(s, m.partition()));
}
namespace mutation_json {

View File

@@ -126,7 +126,7 @@ public:
const partition_key& key() const { return _ptr->_dk._key; };
const dht::decorated_key& decorated_key() const { return _ptr->_dk; };
dht::ring_position ring_position() const { return { decorated_key() }; }
const dht::token& token() const { return _ptr->_dk._token; }
dht::token token() const { return _ptr->_dk.token(); }
const schema_ptr& schema() const { return _ptr->_schema; }
const mutation_partition& partition() const { return _ptr->_p; }
mutation_partition& partition() { return _ptr->_p; }

View File

@@ -9,7 +9,9 @@
#pragma once
#include "mutation/partition_version.hh"
#include "readers/mutation_reader_fwd.hh"
#include "readers/mutation_reader.hh"
#include "readers/range_tombstone_change_merger.hh"
#include "keys/clustering_key_filter.hh"
#include "query/query-request.hh"
#include "db/partition_snapshot_row_cursor.hh"
@@ -17,10 +19,8 @@
extern seastar::logger mplog;
namespace replica {
template <bool Reversing, typename Accounter>
class partition_snapshot_reader : public mutation_reader::impl, public Accounter {
class partition_snapshot_flat_reader : public mutation_reader::impl, public Accounter {
struct row_info {
mutation_fragment_v2 row;
tombstone rt_for_row;
@@ -232,7 +232,7 @@ private:
}
public:
template <typename... Args>
partition_snapshot_reader(schema_ptr s, reader_permit permit, dht::decorated_key dk, partition_snapshot_ptr snp,
partition_snapshot_flat_reader(schema_ptr s, reader_permit permit, dht::decorated_key dk, partition_snapshot_ptr snp,
query::clustering_key_filter_ranges crr, bool digest_requested,
logalloc::region& region, logalloc::allocating_section& read_section,
std::any pointer_to_container, Args&&... args)
@@ -285,7 +285,7 @@ public:
template <bool Reversing, typename Accounter, typename... Args>
inline mutation_reader
make_partition_snapshot_reader(schema_ptr s,
make_partition_snapshot_flat_reader(schema_ptr s,
reader_permit permit,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
@@ -297,7 +297,7 @@ make_partition_snapshot_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
Args&&... args)
{
auto res = make_mutation_reader<partition_snapshot_reader<Reversing, Accounter>>(std::move(s), std::move(permit), std::move(dk),
auto res = make_mutation_reader<partition_snapshot_flat_reader<Reversing, Accounter>>(std::move(s), std::move(permit), std::move(dk),
snp, std::move(crr), digest_requested, region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
if (fwd) {
return make_forwardable(std::move(res)); // FIXME: optimize
@@ -305,5 +305,3 @@ make_partition_snapshot_reader(schema_ptr s,
return res;
}
}
} // namespace replica

View File

@@ -148,7 +148,6 @@ public:
};
private:
const db::timeout_clock::time_point _created;
reader_concurrency_semaphore& _semaphore;
schema_ptr _schema;
@@ -238,25 +237,17 @@ private:
break;
case state::inactive:
_semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time);
// Return here on purpose. The evicted permit is destroyed when closing a reader.
// As a consequence, any member access beyond this point is invalid.
return;
break;
case state::evicted:
case state::preemptive_aborted:
break;
}
// The function call not only sets state to reader_permit::state::preemptive_aborted
// but also correctly decreases the statistics i.e. need_cpu_permits and awaits_permits.
on_permit_inactive(reader_permit::state::preemptive_aborted);
}
public:
struct value_tag {};
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
: _created(db::timeout_clock::now())
, _semaphore(semaphore)
: _semaphore(semaphore)
, _schema(std::move(schema))
, _op_name_view(op_name)
, _base_resources(base_resources)
@@ -267,8 +258,7 @@ public:
_semaphore.on_permit_created(*this);
}
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
: _created(db::timeout_clock::now())
, _semaphore(semaphore)
: _semaphore(semaphore)
, _schema(std::move(schema))
, _op_name(std::move(op_name))
, _op_name_view(_op_name)
@@ -370,17 +360,6 @@ public:
on_permit_active();
}
void on_preemptive_aborted() {
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
}
_ttl_timer.cancel();
_state = reader_permit::state::preemptive_aborted;
_aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name));
_semaphore.on_permit_preemptive_aborted();
}
void on_register_as_inactive() {
SCYLLA_ASSERT(_state == reader_permit::state::active || _state == reader_permit::state::active_need_cpu || _state == reader_permit::state::waiting_for_memory);
on_permit_inactive(reader_permit::state::inactive);
@@ -488,10 +467,6 @@ public:
return _semaphore.do_wait_admission(*this);
}
db::timeout_clock::time_point created() const noexcept {
return _created;
}
db::timeout_clock::time_point timeout() const noexcept {
return _ttl_timer.armed() ? _ttl_timer.get_timeout() : db::no_timeout;
}
@@ -714,9 +689,6 @@ auto fmt::formatter<reader_permit::state>::format(reader_permit::state s, fmt::f
case reader_permit::state::evicted:
name = "evicted";
break;
case reader_permit::state::preemptive_aborted:
name = "preemptive_aborted";
break;
}
return formatter<string_view>::format(name, ctx);
}
@@ -1066,7 +1038,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics)
: _initial_resources(count, memory)
, _resources(count, memory)
@@ -1076,7 +1047,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(cpu_concurrency)
, _preemptive_abort_factor(preemptive_abort_factor)
, _close_readers_gate(format("[reader_concurrency_semaphore {}] close_readers", _name))
, _permit_gate(format("[reader_concurrency_semaphore {}] permit", _name))
{
@@ -1144,7 +1114,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(float(0.0)),
metrics) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
@@ -1520,25 +1489,6 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
auto& permit = _wait_list.front();
dequeue_permit(permit);
try {
// Do not admit the read as it is unlikely to finish before its timeout. The condition is:
// permit's remaining time <= preemptive_abort_factor * permit's time budget
//
// The additional check for remaining_time > 0 is to avoid preemptive aborting reads
// that already timed out but are still in the wait list due to scheduling delays.
// It also effectively disables preemptive aborting when the factor is set to 0.
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
}
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
_blessed_permit = &permit;
permit.on_granted_memory();
@@ -1599,11 +1549,7 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
case reader_permit::state::waiting_for_admission:
case reader_permit::state::waiting_for_memory:
case reader_permit::state::waiting_for_execution:
if (_stats.waiters > 0) {
--_stats.waiters;
} else {
on_internal_error_noexcept(rcslog, "reader_concurrency_semaphore::dequeue_permit(): invalid state: no waiters yet dequeueing a waiting permit");
}
--_stats.waiters;
break;
case reader_permit::state::inactive:
case reader_permit::state::evicted:
@@ -1612,17 +1558,12 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
case reader_permit::state::active:
case reader_permit::state::active_need_cpu:
case reader_permit::state::active_await:
case reader_permit::state::preemptive_aborted:
on_internal_error_noexcept(rcslog, format("reader_concurrency_semaphore::dequeue_permit(): unrecognized queued state: {}", permit.get_state()));
}
permit.unlink();
_permit_list.push_back(permit);
}
void reader_concurrency_semaphore::on_permit_preemptive_aborted() noexcept {
++_stats.total_reads_shed_due_to_overload;
}
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
_permit_gate.enter();
_permit_list.push_back(permit);

View File

@@ -42,7 +42,7 @@ using mutation_reader_opt = optimized_optional<mutation_reader>;
/// number of waiting readers becomes equal or greater than
/// `max_queue_length` (upon calling `obtain_permit()`) an exception of
/// type `std::runtime_error` is thrown. Optionally, some additional
/// code can be executed just before throwing (`prethrow_action`
/// code can be executed just before throwing (`prethrow_action`
/// constructor parameter).
///
/// The semaphore has 3 layers of defense against consuming more memory
@@ -89,7 +89,6 @@ public:
// Total number of failed reads executed through this semaphore.
uint64_t total_failed_reads = 0;
// Total number of reads rejected because the admission queue reached its max capacity
// or rejected due to a high probability of not getting finalized on time.
uint64_t total_reads_shed_due_to_overload = 0;
// Total number of reads killed due to the memory consumption reaching the kill limit.
uint64_t total_reads_killed_due_to_kill_limit = 0;
@@ -193,8 +192,6 @@ private:
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
utils::updateable_value<float> _preemptive_abort_factor;
stats _stats;
std::optional<seastar::metrics::metric_groups> _metrics;
bool _stopped = false;
@@ -253,8 +250,6 @@ private:
void on_permit_created(reader_permit::impl&);
void on_permit_destroyed(reader_permit::impl&) noexcept;
void on_permit_preemptive_aborted() noexcept;
void on_permit_need_cpu() noexcept;
void on_permit_not_need_cpu() noexcept;
@@ -292,7 +287,6 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics);
reader_concurrency_semaphore(
@@ -302,12 +296,9 @@ public:
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency),
std::move(preemptive_abort_factor), metrics)
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value<uint32_t>(1), metrics)
{ }
/// Create a semaphore with practically unlimited count and memory.
@@ -327,10 +318,9 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> cpu_concurrency = utils::updateable_value<uint32_t>(1),
utils::updateable_value<float> preemptive_abort_factor = utils::updateable_value<float>(0.0f),
register_metrics metrics = register_metrics::no)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), std::move(cpu_concurrency), std::move(preemptive_abort_factor), metrics)
std::move(kill_limit_multipler), std::move(cpu_concurrency), metrics)
{}
virtual ~reader_concurrency_semaphore();

View File

@@ -70,8 +70,7 @@ reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(
_max_queue_length,
_serialize_limit_multiplier,
_kill_limit_multiplier,
_cpu_concurrency,
_preemptive_abort_factor
_cpu_concurrency
);
auto&& it = result.first;
// since we serialize all group changes this change wait will be queues and no further operations

View File

@@ -26,7 +26,6 @@ class reader_concurrency_semaphore_group {
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
utils::updateable_value<float> _preemptive_abort_factor;
friend class database_test_wrapper;
@@ -37,12 +36,11 @@ class reader_concurrency_semaphore_group {
weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor)
utils::updateable_value<uint32_t> cpu_concurrency)
: weight(shares)
, memory_share(0)
, sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier),
std::move(cpu_concurrency), std::move(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::yes) {}
std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {}
};
std::unordered_map<scheduling_group, weighted_reader_concurrency_semaphore> _semaphores;
@@ -56,7 +54,6 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
std::optional<sstring> name_prefix = std::nullopt)
: _total_memory(memory)
, _total_weight(0)
@@ -65,7 +62,6 @@ public:
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(std::move(cpu_concurrency))
, _preemptive_abort_factor(std::move(preemptive_abort_factor))
, _operations_serializer(1)
, _name_prefix(std::move(name_prefix)) { }

View File

@@ -92,7 +92,6 @@ public:
active_await,
inactive,
evicted,
preemptive_aborted,
};
class impl;

View File

@@ -412,7 +412,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::yes)
// No limits, just for accounting.
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)
@@ -424,8 +423,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::yes)
, _view_update_read_concurrency_semaphores_group(
max_memory_concurrent_view_update_reads(),
@@ -434,7 +431,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,
utils::updateable_value(0.0f),
"view_update")
, _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value<double>(), cache_tracker::register_metrics::yes)
, _apply_stage("db_apply", &database::do_apply)
@@ -464,8 +460,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(),
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.reader_concurrency_semaphore_cpu_concurrency,
_cfg.reader_concurrency_semaphore_preemptive_abort_factor)
_cfg.reader_concurrency_semaphore_cpu_concurrency)
, _stop_barrier(std::move(barrier))
, _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); })
, _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer()))

View File

@@ -10,7 +10,7 @@
#include "memtable.hh"
#include "replica/database.hh"
#include "mutation/frozen_mutation.hh"
#include "replica/partition_snapshot_reader.hh"
#include "partition_snapshot_reader.hh"
#include "partition_builder.hh"
#include "mutation/mutation_partition_view.hh"
#include "readers/empty.hh"
@@ -19,7 +19,7 @@
namespace replica {
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
@@ -482,7 +482,7 @@ public:
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key());
bool digest_requested = _slice.options.contains<query::partition_slice::option::with_digest>();
bool is_reversed = _slice.is_reversed();
_delegate = make_partition_snapshot_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
_delegate = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
_delegate->upgrade_schema(schema());
} else {
_end_of_stream = true;
@@ -604,7 +604,7 @@ public:
}
};
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
@@ -617,10 +617,10 @@ static mutation_reader make_partition_snapshot_reader_from_snp_schema(
streamed_mutation::forwarding fwd, memtable& memtable) {
if (is_reversed) {
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
schema_ptr snp_schema = snp->schema();
return make_partition_snapshot_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}
@@ -660,7 +660,7 @@ private:
update_last(key_and_snp->first);
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key());
auto snp_schema = key_and_snp->second->schema();
_partition_reader = make_partition_snapshot_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
_partition_reader = make_partition_snapshot_flat_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
_partition_reader->upgrade_schema(schema());
}
@@ -737,7 +737,7 @@ memtable::make_mutation_reader_opt(schema_ptr query_schema,
auto dk = pos.as_decorated_key();
auto cr = query::clustering_key_filter_ranges::get_ranges(*query_schema, slice, dk.key());
bool digest_requested = slice.options.contains<query::partition_slice::option::with_digest>();
auto rd = make_partition_snapshot_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this);
auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this);
rd.upgrade_schema(query_schema);
return rd;
} else {

View File

@@ -1746,97 +1746,100 @@ table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) n
}
future<>
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit_) {
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
co_await utils::get_local_injector().inject("flush_memtable_to_sstable_wait", utils::wait_for_message(60s));
auto permit = make_lw_shared(std::move(permit_));
co_await coroutine::switch_to(_config.memtable_scheduling_group);
// Note that due to our sharded architecture, it is possible that
// in the face of a value change some shards will backup sstables
// while others won't.
//
// This is, in theory, possible to mitigate through a rwlock.
// However, this doesn't differ from the situation where all tables
// are coming from a single shard and the toggle happens in the
// middle of them.
//
// The code as is guarantees that we'll never partially backup a
// single sstable, so that is enough of a guarantee.
auto try_flush = [this, old = std::move(old), permit = make_lw_shared(std::move(permit)), &cg] () mutable -> future<> {
// Note that due to our sharded architecture, it is possible that
// in the face of a value change some shards will backup sstables
// while others won't.
//
// This is, in theory, possible to mitigate through a rwlock.
// However, this doesn't differ from the situation where all tables
// are coming from a single shard and the toggle happens in the
// middle of them.
//
// The code as is guarantees that we'll never partially backup a
// single sstable, so that is enough of a guarantee.
auto newtabs = std::vector<sstables::shared_sstable>();
auto metadata = mutation_source_metadata{};
metadata.min_timestamp = old->get_min_timestamp();
metadata.max_timestamp = old->get_max_timestamp();
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
auto newtabs = std::vector<sstables::shared_sstable>();
auto metadata = mutation_source_metadata{};
metadata.min_timestamp = old->get_min_timestamp();
metadata.max_timestamp = old->get_max_timestamp();
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
if (!cg.async_gate().is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
}
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
std::exception_ptr ex;
try {
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
cfg.backup = incremental_backups_enabled();
auto newtab = make_sstable();
newtabs.push_back(newtab);
tlogger.debug("Flushing to {}", newtab->get_filename());
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
old->get_max_timestamp());
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
} catch (...) {
ex = std::current_exception();
}
co_await reader.close();
co_await coroutine::return_exception_ptr(std::move(ex));
});
auto f = consumer(old->make_flush_reader(
old->schema(),
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
// priority inversion.
co_await coroutine::switch_to(default_scheduling_group());
try {
co_await std::move(f);
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
co_await newtab->open_data();
tlogger.debug("Flushing to {} done", newtab->get_filename());
});
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
return update_cache(cg, old, newtabs);
});
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
if (this_table_name == handler.get("table_name")) {
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
handler.set("suspended", true);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
}
});
cg.memtables()->erase(old);
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
co_return;
} catch (const std::exception& e) {
for (auto& newtab : newtabs) {
newtab->mark_for_deletion();
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
if (!cg.async_gate().is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
}
_config.cf_stats->failed_memtables_flushes_count++;
// If we failed this write we will try the write again and that will create a new flush reader
// that will decrease dirty memory again. So we need to reset the accounting.
old->revert_flushed_memory();
throw;
}
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
std::exception_ptr ex;
try {
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
cfg.backup = incremental_backups_enabled();
auto newtab = make_sstable();
newtabs.push_back(newtab);
tlogger.debug("Flushing to {}", newtab->get_filename());
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
old->get_max_timestamp());
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
} catch (...) {
ex = std::current_exception();
}
co_await reader.close();
co_await coroutine::return_exception_ptr(std::move(ex));
});
auto f = consumer(old->make_flush_reader(
old->schema(),
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
// priority inversion.
auto post_flush = [this, old = std::move(old), &newtabs, f = std::move(f), &cg] () mutable -> future<> {
try {
co_await std::move(f);
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
co_await newtab->open_data();
tlogger.debug("Flushing to {} done", newtab->get_filename());
});
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
return update_cache(cg, old, newtabs);
});
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
if (this_table_name == handler.get("table_name")) {
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
handler.set("suspended", true);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
}
});
cg.memtables()->erase(old);
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
co_return;
} catch (const std::exception& e) {
for (auto& newtab : newtabs) {
newtab->mark_for_deletion();
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
}
_config.cf_stats->failed_memtables_flushes_count++;
// If we failed this write we will try the write again and that will create a new flush reader
// that will decrease dirty memory again. So we need to reset the accounting.
old->revert_flushed_memory();
throw;
}
};
co_return co_await with_scheduling_group(default_scheduling_group(), std::ref(post_flush));
};
co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush));
}
void

View File

@@ -45,8 +45,6 @@ sstables_manager::sstables_manager(
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::no)
, _dir_semaphore(dir_sem)
, _resolve_host_id(std::move(resolve_host_id))

View File

@@ -43,7 +43,7 @@ lazy_comparable_bytes_from_ring_position::lazy_comparable_bytes_from_ring_positi
, _weight(bound_weight::equal)
, _pk(std::move(dk._key))
{
init_first_fragment(dk._token);
init_first_fragment(dk.token());
}
void lazy_comparable_bytes_from_ring_position::init_first_fragment(dht::token dht_token) {

View File

@@ -68,7 +68,6 @@ PYTEST_RUNNER_DIRECTORIES = [
TEST_DIR / 'cqlpy',
TEST_DIR / 'rest_api',
TEST_DIR / 'nodetool',
TEST_DIR / 'scylla_gdb',
]
launch_time = time.monotonic()

View File

@@ -201,7 +201,7 @@ SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_singular) {
auto key1 = partition_key::from_single_value(*s1, b);
auto dk1 = partitioner.decorate_key(*s1, key1);
BOOST_REQUIRE_EQUAL(dk._token, dk1._token);
BOOST_REQUIRE_EQUAL(dk.token(), dk1.token());
}
SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_two_components) {
@@ -223,7 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_two_component
auto key1 = partition_key::from_single_value(*s1, b);
auto dk1 = partitioner.decorate_key(*s1, key1);
BOOST_REQUIRE_EQUAL(dk._token, dk1._token);
BOOST_REQUIRE_EQUAL(dk.token(), dk1.token());
}
SEASTAR_THREAD_TEST_CASE(test_legacy_ordering_of_singular) {

View File

@@ -2152,7 +2152,6 @@ struct scoped_execption_log_level {
SEASTAR_TEST_CASE(replica_read_timeout_no_exception) {
cql_test_config cfg;
cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0);
const auto read_timeout = 10ms;
const auto write_timeout = 10s;
cfg.query_timeout.emplace(timeout_config{

View File

@@ -1185,13 +1185,6 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
}
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
#ifdef DEBUG
// This test was observed to take multiple minutes to run in debug mode on CI machines.
// This test checks that a certain behaviour is triggered when compaction falls behind.
// Not critical to run in debug mode. Both compaction and memtable have their own
// correctness tests, which do run in debug mode.
return make_ready_future<>();
#else
BOOST_ASSERT(smp::count == 2);
// The test simulates a situation where 2 threads issue flushes to 2
// tables. Both issue small flushes, but one has injected reactor stalls.
@@ -1266,7 +1259,6 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
sleep_ms *= 2;
}
});
#endif
}
static future<> exceptions_in_flush_helper(std::unique_ptr<sstables::file_io_extension> mep, bool& should_fail, const bool& did_fail, const schema*& schema_filter, bool expect_isolate) {

View File

@@ -14,6 +14,7 @@
#include "mutation/partition_version.hh"
#include "db/partition_snapshot_row_cursor.hh"
#include "partition_snapshot_reader.hh"
#include "keys/clustering_interval_set.hh"
#include "test/lib/scylla_test_case.hh"

View File

@@ -120,7 +120,7 @@ SEASTAR_THREAD_TEST_CASE(test_decorated_key_is_compatible_with_origin) {
auto dk = partitioner.decorate_key(*s, key);
// Expected value was taken from Origin
BOOST_REQUIRE_EQUAL(dk._token, token_from_long(4958784316840156970));
BOOST_REQUIRE_EQUAL(dk.token(), token_from_long(4958784316840156970));
BOOST_REQUIRE(dk._key.equal(*s, key));
}

View File

@@ -517,38 +517,6 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
});
}
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_abort) {
const auto preemptive_abort_factor = 0.5f;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost,
100, utils::updateable_value(std::numeric_limits<uint32_t>::max()), utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t>(1), utils::updateable_value<float>(preemptive_abort_factor));
auto stop_sem = deferred_stop(semaphore);
{
BOOST_REQUIRE(semaphore.get_stats().total_reads_shed_due_to_overload == 0);
auto timeout = db::timeout_clock::now() + 500ms;
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get();
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {});
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
// The permits are rejected when the remaining time is less than half of its timeout when arrived to the semaphore.
// Hence, sleep 300ms to reject the permits in the waitlist during admission.
seastar::sleep(300ms).get();
permit1 = {};
const auto futures_failed = eventually_true([&] { return permit2_fut.failed(); });
BOOST_CHECK(futures_failed);
BOOST_CHECK_THROW(std::rethrow_exception(permit2_fut.get_exception()), semaphore_aborted);
BOOST_CHECK(semaphore.get_stats().total_reads_shed_due_to_overload > 0);
}
// All units should have been deposited back.
REQUIRE_EVENTUALLY_EQUAL<ssize_t>([&] { return semaphore.available_resources().memory; }, replica::new_reader_base_cost);
}
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
return async([&] () {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost, 2);
@@ -629,8 +597,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
permit.resources = permit.permit->consume_resources(reader_resources(tests::random::get_int<unsigned>(0, 1), tests::random::get_int<unsigned>(1024, 16 * 1024 * 1024)));
} else {
//Ensure timeout_seconds > 0 to avoid permits being rejected during admission. The test will become flaky.
const auto timeout_seconds = tests::random::get_int<unsigned>(1, 4);
const auto timeout_seconds = tests::random::get_int<unsigned>(0, 3);
permit.permit_fut = semaphore.obtain_permit(
schema,
@@ -1259,14 +1226,11 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) {
auto serialize_multiplier = utils::updateable_value_source<uint32_t>(2);
auto kill_multiplier = utils::updateable_value_source<uint32_t>(3);
auto cpu_concurrency = utils::updateable_value_source<uint32_t>(1);
auto preemptive_abort_factor = utils::updateable_value_source<float>(0.0f);
reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000,
utils::updateable_value(serialize_multiplier),
utils::updateable_value(kill_multiplier),
utils::updateable_value(cpu_concurrency),
utils::updateable_value(preemptive_abort_factor));
utils::updateable_value(cpu_concurrency));
auto stop_sem = deferred_stop(sem_group);
circular_buffer<scheduling_group> recycle_bin;
@@ -1508,8 +1472,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks
const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024};
const auto serialize_multiplier = 2;
const auto kill_multiplier = 3;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
const size_t reader_count_target = 6;
@@ -1762,8 +1726,9 @@ SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) {
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -1824,8 +1789,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_inactive) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1885,8 +1851,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_memory_goes_inactive) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -1930,7 +1897,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me
// This test covers all the cases where eviction should **not** happen.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -2020,7 +1990,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicti
// Check that inactive reads are evicted when they are blocking admission
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -2174,7 +2147,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
// resources.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeup) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -2210,8 +2186,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
const uint32_t initial_memory = 4 * 1024;
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
const auto cpu_concurrency = 1;
const auto preemptive_abort_factor = 0.0f;
reader_concurrency_semaphore semaphore(
utils::updateable_value(count),
@@ -2220,8 +2194,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
100,
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
utils::updateable_value<uint32_t>(cpu_concurrency),
utils::updateable_value<float>(preemptive_abort_factor),
utils::updateable_value<uint32_t>(1),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2241,7 +2214,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
const uint32_t initial_memory = 4 * 1024;
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
const auto preemptive_abort_factor = 0.0f;
reader_concurrency_semaphore semaphore(
utils::updateable_value<int>(initial_count),
@@ -2251,7 +2223,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
utils::updateable_value(cpu_concurrency),
utils::updateable_value<float>(preemptive_abort_factor),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2304,7 +2275,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c
utils::updateable_value<uint32_t>(2),
utils::updateable_value<uint32_t>(4),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2358,7 +2328,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
utils::updateable_value<uint32_t>(2),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(2),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2423,7 +2392,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_perm
utils::updateable_value<uint32_t>(200),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2465,7 +2433,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
utils::updateable_value<uint32_t>(200),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);

View File

@@ -4837,8 +4837,8 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
// of course doesn't necessarily help release pressure on the semaphore.
SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) {
simple_schema s;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 1, std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t>(1), utils::updateable_value<uint32_t>(1));
reader_concurrency_semaphore semaphore(100, 1, get_name(), std::numeric_limits<size_t>::max(), utils::updateable_value<uint32_t>(1),
utils::updateable_value<uint32_t>(1), reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(semaphore);
cache_tracker tracker;

View File

@@ -653,8 +653,8 @@ static bool key_range_overlaps(table_for_tests& cf, const dht::decorated_key& a,
}
static bool sstable_overlaps(const lw_shared_ptr<replica::column_family>& cf, sstables::shared_sstable candidate1, sstables::shared_sstable candidate2) {
auto range1 = wrapping_interval<dht::token>::make(candidate1->get_first_decorated_key()._token, candidate1->get_last_decorated_key()._token);
auto range2 = wrapping_interval<dht::token>::make(candidate2->get_first_decorated_key()._token, candidate2->get_last_decorated_key()._token);
auto range1 = wrapping_interval<dht::token>::make(candidate1->get_first_decorated_key().token(), candidate1->get_last_decorated_key().token());
auto range2 = wrapping_interval<dht::token>::make(candidate2->get_first_decorated_key().token(), candidate2->get_last_decorated_key().token());
return range1.overlaps(range2, dht::token_comparator());
}

View File

@@ -2986,8 +2986,8 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
sst->load(sst->get_schema()->get_sharder()).get();
}
const auto t1 = muts.front().decorated_key()._token;
const auto t2 = muts.back().decorated_key()._token;
const auto t1 = muts.front().decorated_key().token();
const auto t2 = muts.back().decorated_key().token();
dht::partition_range_vector prs;
prs.emplace_back(dht::ring_position::starting_at(dht::token{t1.raw() - 200}), dht::ring_position::ending_at(dht::token{t1.raw() - 100}));

View File

@@ -25,7 +25,6 @@ import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.internal_types import ServerUpState
logger = logging.getLogger(__name__)
@@ -198,14 +197,8 @@ ALTERNATOR_PROXY_SERVER_CONFIG = {
@pytest.fixture(scope="function")
async def alternator_proxy_server(manager: ManagerClient):
"""Fixture that creates a server with Alternator proxy protocol ports enabled.
Waits for SERVING state to ensure Alternator ports are ready.
"""
server = await manager.server_add(
config=ALTERNATOR_PROXY_SERVER_CONFIG,
expected_server_up_state=ServerUpState.SERVING
)
"""Fixture that creates a server with Alternator proxy protocol ports enabled."""
server = await manager.server_add(config=ALTERNATOR_PROXY_SERVER_CONFIG)
yield (server, manager)

View File

@@ -322,7 +322,7 @@ future<> require_column_has_value(cql_test_env& e, const sstring& table_name,
auto ckey = clustering_key::from_deeply_exploded(*schema, ck);
auto exp = expected.type()->decompose(expected);
auto dk = dht::decorate_key(*schema, pkey);
auto shard = cf.get_effective_replication_map()->shard_for_reads(*schema, dk._token);
auto shard = cf.get_effective_replication_map()->shard_for_reads(*schema, dk.token());
return e.db().invoke_on(shard, [&e, dk = std::move(dk),
ckey = std::move(ckey),
column_name = std::move(column_name),

View File

@@ -39,4 +39,3 @@ class ServerUpState(IntEnum):
HOST_ID_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -44,7 +44,6 @@ import platform
import contextlib
import fcntl
import urllib
import socket
import psutil
@@ -386,10 +385,6 @@ class ScyllaServer:
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
)
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
# Unix socket for receiving sd_notify messages from Scylla
self.notify_socket_path = pathlib.Path(self.maintenance_socket_dir.name) / "notify.sock"
self.notify_socket: Optional[socket.socket] = None
self._received_serving = False
self.exe = pathlib.Path(version.path).resolve()
self.vardir = pathlib.Path(vardir)
self.logger = logger
@@ -717,50 +712,6 @@ class ScyllaServer:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
if self.notify_socket is not None:
return
# Remove existing socket file if present
self.notify_socket_path.unlink(missing_ok=True)
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC)
self.notify_socket.bind(str(self.notify_socket_path))
self._received_serving = False
def _cleanup_notify_socket(self) -> None:
"""Clean up the sd_notify socket."""
if self.notify_socket is not None:
self.notify_socket.close()
self.notify_socket = None
self.notify_socket_path.unlink(missing_ok=True)
def check_serving_notification(self) -> bool:
"""Check if Scylla has sent the 'serving' sd_notify message.
Returns True if the SERVING state has been reached.
"""
if self._received_serving:
return True
if self.notify_socket is None:
return False
# Try to read all available messages from the socket
while True:
try:
data = self.notify_socket.recv(4096)
# sd_notify message format: "STATUS=serving\n" or "READY=1\nSTATUS=serving\n"
message = data.decode('utf-8', errors='replace')
if 'STATUS=serving' in message:
self._received_serving = True
self.logger.debug("Received sd_notify 'serving' message")
return True
except BlockingIOError:
# No more messages available
break
except Exception as e:
self.logger.debug("Error reading from notify socket: %s", e)
break
return False
async def try_get_host_id(self, api: ScyllaRESTAPIClient) -> Optional[HostID]:
"""Try to get the host id (also tests Scylla REST API is serving)"""
@@ -803,10 +754,6 @@ class ScyllaServer:
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
# Set up socket for receiving sd_notify messages from Scylla
self._setup_notify_socket()
env['NOTIFY_SOCKET'] = self.notify_socket_path
# Reopen log file if it was closed (e.g., after a previous stop)
if self.log_file is None or self.log_file.closed:
self.log_file = self.log_filename.open("ab") # append mode to preserve previous logs
@@ -861,10 +808,7 @@ class ScyllaServer:
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if server_up_state == expected_server_up_state:
if expected_error is not None:
await report_error(
f"the node has reached {server_up_state} state,"
@@ -903,14 +847,13 @@ class ScyllaServer:
session.execute("DROP KEYSPACE k")
async def shutdown_control_connection(self) -> None:
"""Shut down driver connection and notify socket"""
"""Shut down driver connection"""
if self.control_connection is not None:
self.control_connection.shutdown()
self.control_connection = None
if self.control_cluster is not None:
self.control_cluster.shutdown()
self.control_cluster = None
self._cleanup_notify_socket()
@stop_event
@start_stop_lock

View File

@@ -1,92 +1,80 @@
# Copyright 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""Conftest for Scylla GDB tests"""
import logging
import os
# This file configures pytest for all tests in this directory, and also
# defines common test fixtures for all of them to use. A "fixture" is some
# setup which an individual test requires to run; The fixture has setup code
# and teardown code, and if multiple tests require the same fixture, it can
# be set up only once - while still allowing the user to run individual tests
# and automatically setting up the fixtures they need.
import pexpect
import pytest
import os
import sys
from test.pylib.suite.python import PythonTest
from test.pylib.util import LogPrefixAdapter
try:
import gdb as gdb_library
except:
print('This test must be run inside gdb. Run ./run instead.')
exit(1)
@pytest.fixture(scope="module")
async def scylla_server(testpy_test: PythonTest | None):
"""Return a running Scylla server instance from the active test cluster."""
logger_prefix = testpy_test.mode + "/" + testpy_test.uname
logger = LogPrefixAdapter(
logging.getLogger(logger_prefix), {"prefix": logger_prefix}
)
scylla_cluster = await testpy_test.suite.clusters.get(logger)
scylla_server = next(iter(scylla_cluster.running.values()))
def pytest_addoption(parser):
parser.addoption('--scylla-pid', action='store', default=None,
help='Process ID of running Scylla to attach gdb to')
parser.addoption('--scylla-tmp-dir', action='store', default=None,
help='Temporary directory where Scylla runs')
yield scylla_server
await testpy_test.suite.clusters.put(scylla_cluster, is_dirty=True)
@pytest.fixture(scope="module")
def gdb_process(scylla_server, request):
"""Spawn an interactive GDB attached to the Scylla process.
Loads `scylla-gdb.py` and test helpers (`gdb_utils.py`) so tests can run GDB/Python helpers
against the live Scylla process.
"""
scylla_gdb_py = os.path.join(request.fspath.dirname, "..", "..", "scylla-gdb.py")
script_py = os.path.join(request.fspath.dirname, "gdb_utils.py")
cmd = (
f"gdb -q "
"--nx "
"-iex 'set confirm off' "
"-iex 'set pagination off' "
f"-se {scylla_server.exe} "
f"-p {scylla_server.cmd.pid} "
f"-ex set python print-stack full "
f"-x {scylla_gdb_py} "
f"-x {script_py}"
)
gdb_process = pexpect.spawn(cmd, maxread=10, searchwindowsize=10)
gdb_process.expect_exact("(gdb)")
yield gdb_process
gdb_process.terminate()
def execute_gdb_command(
gdb_process, scylla_command: str = None, full_command: str = None
):
"""
Execute a command in an interactive GDB session and return its output.
The command can be provided either as a Scylla GDB command (which will be
wrapped and executed via GDB's Python interface) or as a full raw GDB
command string.
The function waits for the GDB prompt to reappear, enforces a timeout,
and fails the test if the command does not complete or if GDB reports an
error.
Args:
gdb_process (pexpect.pty_spawn.spawn): An active GDB process spawned via pexpect
scylla_command (str, optional): A GDB Scylla command (from scylla-gdb.py) to execute.
full_command (str, optional): A raw GDB command string to execute.
"""
command = f"python gdb.execute('scylla {scylla_command}')"
if full_command:
command = full_command
gdb_process.sendline(command)
# Scylla's "scylla-gdb.py" does two things: It configures gdb to add new
# "scylla" commands, and it implements a bunch of useful functions in Python.
# Doing just the former is easy (just add "-x scylla-gdb.py" when running
# gdb), but we also want the latter - we want to be able to use some of those
# extra functions in the test code. For that, we need to actually import the
# scylla-gdb.py module from the test code here - and remember the module
# object.
@pytest.fixture(scope="session")
def scylla_gdb(request):
save_sys_path = sys.path
sys.path.insert(1, sys.path[0] + '/../..')
# Unfortunately, the file's name includes a dash which requires some
# funky workarounds to import.
import importlib
try:
gdb_process.expect_exact("(gdb)", timeout=180)
except pexpect.exceptions.TIMEOUT:
gdb_process.sendcontrol("c")
gdb_process.expect_exact("(gdb)", timeout=1)
pytest.fail("GDB command did not complete within the timeout period")
result = gdb_process.before.decode("utf-8")
mod = importlib.import_module("scylla-gdb")
except Exception as e:
pytest.exit(f'Failed to load scylla-gdb: {e}')
sys.path = save_sys_path
yield mod
assert "Error" not in result
return result
# "gdb" fixture, attaching to a running Scylla and letting the tests
# run gdb commands on it. The fixture returns a module
# The gdb fixture depends on scylla_gdb, to ensure that the "scylla"
# subcommands are loaded into gdb.
@pytest.fixture(scope="session")
def gdb(request, scylla_gdb):
try:
gdb_library.lookup_type('size_t')
except:
pytest.exit('ERROR: Scylla executable was compiled without debugging '
'information (-g) so cannot be used to test gdb. Please '
'set SCYLLA environment variable.')
# The gdb tests are known to be broken on aarch64 (see
# https://sourceware.org/bugzilla/show_bug.cgi?id=27886) and untested
# on anything else. So skip them.
if os.uname().machine != 'x86_64':
pytest.skip('test/scylla-gdb/conftest.py: gdb tests skipped for non-x86_64')
gdb_library.execute('set python print-stack full')
scylla_pid = request.config.getoption('scylla_pid')
gdb_library.execute(f'attach {scylla_pid}')
# FIXME: We can start the test here, but at this point Scylla may be
# completely idle. To make the situation more interesting (and, e.g., have
# live live tasks for test_misc.py::task()), we can set a breakpoint and
# let Scylla run a bit more and stop in the middle of its work. However,
# I'm not sure where to set a break point that is actually guaranteed to
# happen :(
#gdb_library.execute('handle SIG34 SIG35 SIGUSR1 nostop noprint pass')
#gdb_library.execute('break sstables::compact_sstables')
#gdb_library.execute('continue')
yield gdb_library

View File

@@ -1,80 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
GDB helper functions for `scylla_gdb` tests.
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
when loaded, they can be run in gdb e.g. `python get_sstables()`
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
get_seastar_memory_start_and_size).
"""
import gdb
import uuid
def get_schema():
"""Execute GDB commands to get schema information."""
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(for_each_table(db))
ptr = seastar_lw_shared_ptr(table['_schema']).get()
print('schema=', ptr)
def get_sstables():
"""Execute GDB commands to get sstables information."""
sst = next(find_sstables())
print(f"sst=(sstables::sstable *)", sst)
def get_task():
"""
Some commands need a task to work on. The following fixture finds one.
Because we stopped Scylla while it was idle, we don't expect to find
any ready task with get_local_tasks(), but we can find one with a
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
to it for "scylla fiber") is one from http_server::do_accept_one.
"""
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
break
def get_coroutine():
"""Similar to get_task(), but looks for a coroutine frame."""
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
def coroutine_debug_config(tmpdir):
"""
Check if scylla_find agrees with find_vptrs, for debugging.
Execute GDB commands for coroutine debugging with detailed output.
This test fails sometimes, but rarely and unreliably.
We want to get a coredump from it the next time it fails.
Sending a SIGSEGV should induce that.
https://github.com/scylladb/scylladb/issues/22501
"""
target = 'service::topology_coordinator::run() [clone .resume]'
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
find_command = f"scylla find -a 0x{target_addr:x}"
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
mem_range = get_seastar_memory_start_and_size()
gdb.execute(find_command)
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
gdb.write("Found coroutines:\n")
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and '.resume' in name.strip():
gdb.write(f"{name}\n")
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
gdb.execute(f"gcore {core_filename}")
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")

119
test/scylla_gdb/run Executable file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
import sys
import os
import subprocess
from pathlib import Path
# Use the run.py library from ../cqlpy:
sys.path.insert(1, sys.path[0] + '/../cqlpy')
import run
print('Scylla is: ' + run.find_scylla() + '.')
# Gdb will only work if the executable was built with debug symbols
# (e.g., Scylla's release or debug build modes mode, but not dev mode).
# Check this quickly up-front, instead of waiting for gdb to fail in a
# mysterious way when it can't look up types.
if not '.debug_info' in subprocess.run(['objdump', '-h', run.scylla],
capture_output=True, text=True).stdout:
print(f'Scylla executable was compiled without debugging information (-g)')
print(f'so cannot be used to test gdb. Please set SCYLLA environment variable.')
exit(1)
# Run Scylla, waiting until it can respond to CQL
pid = run.run_with_temporary_dir(run.run_scylla_cmd)
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [lambda: run.check_cql(ip)])
# We do something strange here: We start pytest *inside* gdb's Python
# interpreter. This will allow us to test various gdb commands added
# by scylla-gdb.py inside gdb using the pytest framework.
# TODO: consider a more straightforward implementation, where we don't
# run pytest inside gdb - and instead run gdb as a separate process and
# pytest just sends commands to it.
# TODO: think if we can avoid code duplication with run.run_ptest().
def run_pytest_in_gdb(pytest_dir, executable, additional_parameters):
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid == 0:
# child:
run.run_with_temporary_dir_pids = set() # no children to clean up on child
run.run_pytest_pids = set()
os.chdir(pytest_dir)
pytest_args = ['-o', 'junit_family=xunit2'] + additional_parameters
pytest_cmd = f'print("Starting pytest {" ".join(pytest_args)}"); import pytest; sys.argv[0]="pytest"; sys.exit(pytest.main({str(pytest_args)}))'
print(f'Starting gdb {executable}')
sys.stdout.flush()
args = ['gdb',
'-batch', '-n',
'-ex', 'set python print-stack full',
'-ex', 'python ' + pytest_cmd,
]
if executable:
args += ['-se', executable]
os.execvp('gdb', args)
exit(1)
# parent:
run.run_pytest_pids.add(pid)
if os.waitpid(pid, 0)[1]:
return False
else:
return True
def modify_junit_xml_filename(args, filename_modifier):
"""
Modify the filename part of --junit-xml parameter while preserving the path.
Args:
args: List of command line arguments
filename_modifier: suffix to append to the filename part of --junit-xml
Returns:
Modified list of arguments
"""
if filename_modifier is None:
return args
modified_args = []
for arg in args:
if arg.startswith('--junit-xml='):
junit_xml = Path(arg.split('=', 1)[1])
new_junit_xml = junit_xml.with_suffix(f'.{filename_modifier}{junit_xml.suffix}')
modified_args.append(f'--junit-xml={new_junit_xml}')
else:
modified_args.append(arg)
return modified_args
# Interesting note: We must use "--scylla-tmp-dir=DIR" here instead of
# "--scylla-tmp-dir DIR": While the latter does work, pytest has a bug that
# its command-line parser finds the given directory name in the original
# command line, saves it as "initialpaths", and uses it to print what it
# thinks are nice (but are really incorrect) relative paths for "nodes" (test
# source files).
success = True
for with_scylla in [True, False]:
modified_argv = None
if with_scylla:
args = ['--scylla-pid='+str(pid),
'--scylla-tmp-dir='+run.pid_to_dir(pid),
'-m', 'not without_scylla']
executable = run.scylla
modified_argv = modify_junit_xml_filename(sys.argv[1:], 'with_scylla')
else:
args = ['-m', 'without_scylla']
executable = ''
modified_argv = modify_junit_xml_filename(sys.argv[1:], 'without_scylla')
if not run_pytest_in_gdb(sys.path[0], executable, args + modified_argv):
success = False
run.summary = 'Scylla GDB tests pass' if success else 'Scylla GDB tests failure'
exit(0 if success else 1)
# Note that the run.cleanup_all() function runs now, just like on any exit
# for any reason in this script. It will delete the temporary files and
# announce the failure or success of the test (printing run.summary).

View File

@@ -0,0 +1,3 @@
type: Run
run_in_release:
- run

View File

@@ -1,76 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
Basic tests for commands that does not require additional options.
Each only checks that the command does not fail - but not what it does or returns.
"""
import pytest
from test.scylla_gdb.conftest import execute_gdb_command
pytestmark = [
pytest.mark.skip_mode(
mode=["dev", "debug"],
reason="Scylla was built without debug symbols; use release mode",
),
pytest.mark.skip_mode(
mode=["dev", "debug", "release"],
platform_key="aarch64",
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
),
]
@pytest.mark.parametrize(
"command",
[
"features",
"compaction-tasks",
"databases",
"commitlog",
"tables",
"table system.local",
"tablet-metadata",
"keyspaces",
"active-sstables",
"sstables",
"memtables",
"repairs",
"gms",
"heapprof",
"io-queues",
"cache",
"mem-range",
"mem-ranges",
"memory",
"segment-descs",
"small-object -o 32 --random-page",
"small-object -o 64 --summarize",
"large-objects -o 131072 --random-page",
"large-objects -o 32768 --summarize",
"lsa",
"netw",
"smp-queues",
"task-queues",
"task_histogram",
"task_histogram -a",
"tasks",
"threads",
"timers",
"get-config-value compaction_static_shares",
"read-stats",
"prepared-statements",
],
)
def test_scylla_commands(gdb_process, command):
execute_gdb_command(gdb_process, command)
def test_nonexistent_scylla_command(gdb_process):
"""Verifies that running unknown command will produce correct error message"""
with pytest.raises(
AssertionError, match=r'Undefined scylla command: "nonexistent_command"'
):
execute_gdb_command(gdb_process, "nonexistent_command")

View File

@@ -1 +0,0 @@
type: Python

View File

@@ -0,0 +1,234 @@
import pytest
import re
import uuid
# Convenience function to execute a scylla command in gdb, returning its
# output as a string - or a gdb.error exception.
def scylla(gdb, cmd):
return gdb.execute('scylla ' + cmd, from_tty=False, to_string=True)
# Check that trying an unknown subcommand of the "scylla" subcommand
# produces the right error message.
def test_nonexistent_scylla_command(gdb):
with pytest.raises(gdb.error, match='Undefined scylla command'):
scylla(gdb, 'nonexistent_command')
# Minimal test for some of the commands. Each only checks that the command
# does not fail - but not what it does or returns. These tests are still
# useful - importantly, they can detect that one of the commands relies on
# some internal implementation detail which no longer works, and needs to
# be fixed.
def test_features(gdb):
scylla(gdb, 'features')
def test_compaction_tasks(gdb):
scylla(gdb, 'compaction-tasks')
def test_databases(gdb):
scylla(gdb, 'databases')
def test_commitlog(gdb):
scylla(gdb, 'commitlog')
def test_tables(gdb):
scylla(gdb, 'tables')
def test_table(gdb):
scylla(gdb, 'table system.local')
def test_tablet_metadata(gdb):
scylla(gdb, 'tablet-metadata')
def test_keyspaces(gdb):
scylla(gdb, 'keyspaces')
def test_active_sstables(gdb):
scylla(gdb, 'active-sstables')
def test_sstables(gdb):
scylla(gdb, 'sstables')
def test_memtables(gdb):
scylla(gdb, 'memtables')
def test_repairs(gdb):
scylla(gdb, 'repairs')
def test_gms(gdb):
scylla(gdb, 'gms')
def test_heapprof(gdb):
scylla(gdb, 'heapprof')
def test_io_queues(gdb):
scylla(gdb, 'io-queues')
def test_cache(gdb):
scylla(gdb, 'cache')
def test_mem_range(gdb):
scylla(gdb, 'mem-range')
def test_mem_ranges(gdb):
scylla(gdb, 'mem-ranges')
def test_memory(gdb):
scylla(gdb, 'memory')
def test_segment_descs(gdb):
scylla(gdb, 'segment-descs')
def test_small_object_1(gdb):
scylla(gdb, 'small-object -o 32 --random-page')
def test_small_object_2(gdb):
scylla(gdb, 'small-object -o 64 --summarize')
def test_large_objects_1(gdb):
scylla(gdb, 'large-objects -o 131072 --random-page')
def test_large_objects_2(gdb):
scylla(gdb, 'large-objects -o 32768 --summarize')
def test_lsa(gdb):
scylla(gdb, 'lsa')
def test_netw(gdb):
scylla(gdb, 'netw')
def test_smp_queues(gdb):
scylla(gdb, 'smp-queues')
def test_task_queues(gdb):
scylla(gdb, 'task-queues')
def test_task_histogram(gdb):
scylla(gdb, 'task_histogram')
def test_task_histogram_coro(gdb):
h = scylla(gdb, 'task_histogram -a')
if re.search(r'\) \[clone \.\w+\]', h) is None:
raise gdb.error('no coroutine entries in task histogram')
def test_tasks(gdb):
scylla(gdb, 'tasks')
def test_threads(gdb):
scylla(gdb, 'threads')
def test_timers(gdb):
scylla(gdb, 'timers')
# Some commands need a schema to work on. The following fixture finds
# one (the schema of the first table - note that even without any user
# tables, we will always have system tables).
@pytest.fixture(scope="module")
def schema(gdb, scylla_gdb):
db = scylla_gdb.sharded(gdb.parse_and_eval('::debug::the_database')).local()
table = next(scylla_gdb.for_each_table(db))
gdb.set_convenience_variable('schema',
scylla_gdb.seastar_lw_shared_ptr(table['_schema']).get())
yield '$schema'
@pytest.fixture(scope="module")
def sstable(gdb, scylla_gdb):
db = scylla_gdb.sharded(gdb.parse_and_eval('::debug::the_database')).local()
sst = next(scylla_gdb.find_sstables())
gdb.set_convenience_variable('sst', sst)
yield '$sst'
def test_schema(gdb, schema):
scylla(gdb, f'schema {schema}')
def test_find(gdb, schema):
scylla(gdb, f'find -r {schema}')
def test_ptr(gdb, schema):
scylla(gdb, f'ptr {schema}')
def test_generate_object_graph(gdb, schema, request):
tmpdir = request.config.getoption('scylla_tmp_dir')
scylla(gdb, f'generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}')
# Some commands need a task to work on. The following fixture finds one.
# Because we stopped Scylla while it was idle, we don't expect to find
# any ready task with get_local_tasks(), but we can find one with a
# find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
# to it for "scylla fiber") is one from http_server::do_accept_one.
@pytest.fixture(scope="module")
def task(gdb, scylla_gdb):
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
name = scylla_gdb.resolve(vtable_addr, startswith='vtable for seastar::continuation')
if name and 'do_accept_one' in name:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
raise gdb.error("no tasks found with expected name")
def test_fiber(gdb, task):
scylla(gdb, f'fiber {task}')
# Similar to task(), but looks for a coroutine frame.
@pytest.fixture(scope="module")
def coro_task(gdb, scylla_gdb, request):
target = 'service::topology_coordinator::run() [clone .resume]'
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
name = scylla_gdb.resolve(vtable_addr)
if name and name.strip() == target:
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
# Something is wrong. We should have found the target.
# Check if scylla_find agrees with find_vptrs, for debugging.
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
find_command = f"scylla find -a 0x{target_addr:x}"
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
mem_range = scylla_gdb.get_seastar_memory_start_and_size()
gdb.execute(find_command)
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
gdb.write(f"Found coroutines:\n")
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
name = scylla_gdb.resolve(vtable_addr)
if name and '.resume' in name.strip():
gdb.write(f'{name}\n')
# This test fails sometimes, but rarely and unreliably.
# We want to get a coredump from it the next time it fails.
# https://github.com/scylladb/scylladb/issues/22501
tmpdir = request.config.getoption('scylla_tmp_dir')
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
gdb.execute(f"gcore {core_filename}")
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")
def test_coro_frame(gdb, coro_task):
# Note the offset by two words.
# This moves the pointer from the outer coroutine frame to the inner seastar::task.
# $coro_frame expects a seastar::task*.
gdb.execute(f'p *$coro_frame({coro_task} + 16)')
def test_sstable_summary(gdb, sstable):
scylla(gdb, f'sstable-summary {sstable}')
def test_sstable_index_cache(gdb, sstable):
scylla(gdb, f'sstable-index-cache {sstable}')
def test_read_stats(gdb, sstable):
scylla(gdb, f'read-stats')
def test_get_config_value(gdb):
scylla(gdb, f'get-config-value compaction_static_shares')
def test_prepared_statements(gdb):
scylla(gdb, f'prepared-statements')
@pytest.mark.without_scylla
def test_run_without_scylla(scylla_gdb):
# just try to load the scylla-gdb module without attaching to scylla.
#
# please note, if this test fails, there are good chances that scylla-gdb.py
# is unable to load without debug symbols. Calls to "gdb.lookup_type()" and
# similar functions that rely on debug symbols should be made within GDB
# commands themselves when they get exuecuted. To address potential
# failures, consider moving code that references debug symbols into a code
# path executed only when debug symbols is loaded. If the value of the
# symbol is a constant, consider caching it. using functools.cache
# decorator.
_ = scylla_gdb
# FIXME: need a simple test for lsa-segment

View File

@@ -1,57 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
Tests for commands, that need a schema to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
pytestmark = [
pytest.mark.skip_mode(
mode=["dev", "debug"],
reason="Scylla was built without debug symbols; use release mode",
),
pytest.mark.skip_mode(
mode=["dev", "debug", "release"],
platform_key="aarch64",
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
),
]
@pytest.fixture(scope="module")
def schema(gdb_process):
"""
Returns pointer to schema of the first table it finds
Even without any user tables, we will always have system tables.
"""
result = execute_gdb_command(gdb_process, full_command="python get_schema()")
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
assert match, f"Failed to find schema pointer in response: {result}"
schema_pointer = match.group(1) if match else None
return schema_pointer
@pytest.mark.parametrize(
"command",
[
"find -r",
"ptr",
"schema (const schema *)", # `schema` requires type-casted pointer
],
)
def test_schema(gdb_process, command, schema):
execute_gdb_command(gdb_process, f"{command} {schema}")
def test_generate_object_graph(gdb_process, schema, request):
tmpdir = request.config.getoption("--tmpdir")
execute_gdb_command(
gdb_process, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
)

View File

@@ -1,46 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
Tests for commands, that need a sstable to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import pytest
import re
from test.scylla_gdb.conftest import execute_gdb_command
pytestmark = [
pytest.mark.skip_mode(
mode=["dev", "debug"],
reason="Scylla was built without debug symbols; use release mode",
),
pytest.mark.skip_mode(
mode=["dev", "debug", "release"],
platform_key="aarch64",
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
),
]
@pytest.fixture(scope="module")
def sstable(gdb_process):
"""Finds sstable"""
result = execute_gdb_command(gdb_process, full_command="python get_sstables()")
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
assert match is not None, "No sstable was present in result.stdout"
sstable_pointer = match.group(0).strip() if match else None
return sstable_pointer
@pytest.mark.parametrize(
"command",
[
"sstable-summary",
"sstable-index-cache",
],
)
def test_sstable(gdb_process, command, sstable):
execute_gdb_command(gdb_process, f"{command} {sstable}")

View File

@@ -1,85 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
Tests for commands, that need a some task to work on.
Each only checks that the command does not fail - but not what it does or returns.
"""
import re
import pytest
from test.scylla_gdb.conftest import execute_gdb_command
pytestmark = [
pytest.mark.skip_mode(
mode=["dev", "debug"],
reason="Scylla was built without debug symbols; use release mode",
),
pytest.mark.skip_mode(
mode=["dev", "debug", "release"],
platform_key="aarch64",
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
),
]
@pytest.fixture(scope="module")
def task(gdb_process):
"""
Finds a Scylla fiber task using a `find_vptrs()` loop.
Since Scylla is freshbooted, `get_local_tasks()` returns nothing.
Nevertheless, a `find_vptrs()` scan can still discover the first task
skeleton created by `http_server::do_accept_one` (often the earliest
“Scylla fiber” to appear).
"""
result = execute_gdb_command(gdb_process, full_command="python get_task()")
match = re.search(r"task=(\d+)", result)
assert match is not None, f"No task was present in {result.stdout}"
task = match.group(1) if match else None
return task
def test_fiber(gdb_process, task):
execute_gdb_command(gdb_process, f"fiber {task}")
@pytest.fixture(scope="module")
def coroutine_task(gdb_process, scylla_server):
"""
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task. If the task is not found, the `coroutine_debug_config`
debugging script is called which checks if scylla_find agrees with find_vptrs.
This debugging script then forces a coredump to capture additional
diagnostic information before the test is marked as failed.
Coredump is saved to `testlog/release/{scylla}`.
"""
result = execute_gdb_command(gdb_process, full_command="python get_coroutine()")
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
result = execute_gdb_command(
gdb_process,
full_command=f"python coroutine_debug_config('{scylla_server.workdir}')",
)
pytest.fail(
f"Failed to find coroutine task. Debugging logs have been collected\n"
f"Debugging code result: {result}\n"
)
return match.group(1).strip()
def test_coroutine_frame(gdb_process, coroutine_task):
"""
Offsets the pointer by two words to shift from the outer coroutine frame
to the inner `seastar::task`, as required by `$coro_frame`, which expects
a `seastar::task*`.
"""
execute_gdb_command(
gdb_process, full_command=f"p *$coro_frame({coroutine_task} + 16)"
)

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-43-20260128
docker.io/scylladb/scylla-toolchain:fedora-43-20260113