Compare commits
3 Commits
copilot/co
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6851de4899 | ||
|
|
79aa26becd | ||
|
|
eb9852499c |
2
.github/workflows/iwyu.yaml
vendored
2
.github/workflows/iwyu.yaml
vendored
@@ -35,6 +35,8 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
- run: |
|
||||
sudo dnf -y install clang-tools-extra
|
||||
- name: Generate compilation database
|
||||
run: |
|
||||
cmake \
|
||||
|
||||
@@ -244,10 +244,7 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
|
||||
|
||||
// Check if two JSON-encoded values match with the CONTAINS relation
|
||||
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
|
||||
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
if (!v2.IsObject() || v2.MemberCount() == 0) {
|
||||
if (!v1) {
|
||||
return false;
|
||||
}
|
||||
const auto& kv1 = *v1->MemberBegin();
|
||||
|
||||
@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
}
|
||||
|
||||
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
|
||||
if (_should_add_to_response) {
|
||||
if (_should_add_to_reponse) {
|
||||
auto consumption = rjson::empty_object();
|
||||
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
|
||||
rjson::add(response, "ConsumedCapacity", std::move(consumption));
|
||||
@@ -53,9 +53,7 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
|
||||
}
|
||||
|
||||
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
|
||||
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
|
||||
// by using division with modulo instead of addition before division
|
||||
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
|
||||
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
|
||||
|
||||
if (is_quorum) {
|
||||
half_units *= 2;
|
||||
|
||||
@@ -28,9 +28,9 @@ namespace alternator {
|
||||
class consumed_capacity_counter {
|
||||
public:
|
||||
consumed_capacity_counter() = default;
|
||||
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
|
||||
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
|
||||
bool operator()() const noexcept {
|
||||
return _should_add_to_response;
|
||||
return _should_add_to_reponse;
|
||||
}
|
||||
|
||||
consumed_capacity_counter& operator +=(uint64_t bytes);
|
||||
@@ -44,7 +44,7 @@ public:
|
||||
uint64_t _total_bytes = 0;
|
||||
static bool should_add_capacity(const rjson::value& request);
|
||||
protected:
|
||||
bool _should_add_to_response = false;
|
||||
bool _should_add_to_reponse = false;
|
||||
};
|
||||
|
||||
class rcu_consumed_capacity_counter : public consumed_capacity_counter {
|
||||
|
||||
@@ -834,13 +834,11 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// A race condition is possible: if a DescribeTable request arrives on a different shard before
|
||||
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
|
||||
// 1. Both calculations will cache their results with an expiry time
|
||||
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
|
||||
// 3. Even if expiry times match, different shards may briefly return different table sizes
|
||||
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
|
||||
// exact precision for DescribeTable size information
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,6 +52,13 @@ static const class_registrator<
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
|
||||
struct record final {
|
||||
sstring name;
|
||||
bool is_superuser;
|
||||
bool can_login;
|
||||
role_set member_of;
|
||||
};
|
||||
|
||||
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
|
||||
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
|
||||
return db::consistency_level::QUORUM;
|
||||
@@ -60,13 +67,13 @@ static db::consistency_level consistency_for_role(std::string_view role_name) no
|
||||
return db::consistency_level::LOCAL_ONE;
|
||||
}
|
||||
|
||||
future<std::optional<standard_role_manager::record>> standard_role_manager::legacy_find_record(std::string_view role_name) {
|
||||
static future<std::optional<record>> find_record(cql3::query_processor& qp, std::string_view role_name) {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE {} = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
get_auth_ks_name(qp),
|
||||
meta::roles_table::name,
|
||||
meta::roles_table::role_col_name);
|
||||
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
const auto results = co_await qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_query_state(),
|
||||
@@ -86,25 +93,8 @@ future<std::optional<standard_role_manager::record>> standard_role_manager::lega
|
||||
: role_set())});
|
||||
}
|
||||
|
||||
future<std::optional<standard_role_manager::record>> standard_role_manager::find_record(std::string_view role_name) {
|
||||
if (legacy_mode(_qp)) {
|
||||
return legacy_find_record(role_name);
|
||||
}
|
||||
auto name = sstring(role_name);
|
||||
auto role = _cache.get(name);
|
||||
if (!role) {
|
||||
return make_ready_future<std::optional<record>>(std::nullopt);
|
||||
}
|
||||
return make_ready_future<std::optional<record>>(std::make_optional(record{
|
||||
.name = std::move(name),
|
||||
.is_superuser = role->is_superuser,
|
||||
.can_login = role->can_login,
|
||||
.member_of = role->member_of
|
||||
}));
|
||||
}
|
||||
|
||||
future<standard_role_manager::record> standard_role_manager::require_record(std::string_view role_name) {
|
||||
return find_record(role_name).then([role_name](std::optional<record> mr) {
|
||||
static future<record> require_record(cql3::query_processor& qp, std::string_view role_name) {
|
||||
return find_record(qp, role_name).then([role_name](std::optional<record> mr) {
|
||||
if (!mr) {
|
||||
throw nonexistant_role(role_name);
|
||||
}
|
||||
@@ -396,7 +386,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
|
||||
return fmt::to_string(fmt::join(assignments, ", "));
|
||||
};
|
||||
|
||||
return require_record(role_name).then([this, role_name, &u, &mc](record) {
|
||||
return require_record(_qp, role_name).then([this, role_name, &u, &mc](record) {
|
||||
if (!u.is_superuser && !u.can_login) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -630,17 +620,18 @@ standard_role_manager::revoke(std::string_view revokee_name, std::string_view ro
|
||||
});
|
||||
}
|
||||
|
||||
future<> standard_role_manager::collect_roles(
|
||||
static future<> collect_roles(
|
||||
cql3::query_processor& qp,
|
||||
std::string_view grantee_name,
|
||||
bool recurse,
|
||||
role_set& roles) {
|
||||
return require_record(grantee_name).then([this, &roles, recurse](standard_role_manager::record r) {
|
||||
return do_with(std::move(r.member_of), [this, &roles, recurse](const role_set& memberships) {
|
||||
return do_for_each(memberships.begin(), memberships.end(), [this, &roles, recurse](const sstring& role_name) {
|
||||
return require_record(qp, grantee_name).then([&qp, &roles, recurse](record r) {
|
||||
return do_with(std::move(r.member_of), [&qp, &roles, recurse](const role_set& memberships) {
|
||||
return do_for_each(memberships.begin(), memberships.end(), [&qp, &roles, recurse](const sstring& role_name) {
|
||||
roles.insert(role_name);
|
||||
|
||||
if (recurse) {
|
||||
return collect_roles(role_name, true, roles);
|
||||
return collect_roles(qp, role_name, true, roles);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
@@ -655,7 +646,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
|
||||
return do_with(
|
||||
role_set{sstring(grantee_name)},
|
||||
[this, grantee_name, recurse](role_set& roles) {
|
||||
return collect_roles(grantee_name, recurse, roles).then([&roles] { return roles; });
|
||||
return collect_roles(_qp, grantee_name, recurse, roles).then([&roles] { return roles; });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -715,21 +706,27 @@ future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
|
||||
}
|
||||
|
||||
future<bool> standard_role_manager::exists(std::string_view role_name) {
|
||||
return find_record(role_name).then([](std::optional<record> mr) {
|
||||
return find_record(_qp, role_name).then([](std::optional<record> mr) {
|
||||
return static_cast<bool>(mr);
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
|
||||
return require_record(role_name).then([](record r) {
|
||||
return require_record(_qp, role_name).then([](record r) {
|
||||
return r.is_superuser;
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> standard_role_manager::can_login(std::string_view role_name) {
|
||||
return require_record(role_name).then([](record r) {
|
||||
return r.can_login;
|
||||
});
|
||||
if (legacy_mode(_qp)) {
|
||||
const auto r = co_await require_record(_qp, role_name);
|
||||
co_return r.can_login;
|
||||
}
|
||||
auto role = _cache.get(sstring(role_name));
|
||||
if (!role) {
|
||||
throw nonexistant_role(role_name);
|
||||
}
|
||||
co_return role->can_login;
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
|
||||
@@ -90,12 +90,6 @@ public:
|
||||
|
||||
private:
|
||||
enum class membership_change { add, remove };
|
||||
struct record final {
|
||||
sstring name;
|
||||
bool is_superuser;
|
||||
bool can_login;
|
||||
role_set member_of;
|
||||
};
|
||||
|
||||
future<> create_legacy_metadata_tables_if_missing() const;
|
||||
|
||||
@@ -113,14 +107,6 @@ private:
|
||||
future<> legacy_modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change);
|
||||
|
||||
future<> modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change, ::service::group0_batch& mc);
|
||||
|
||||
future<std::optional<record>> legacy_find_record(std::string_view role_name);
|
||||
future<std::optional<record>> find_record(std::string_view role_name);
|
||||
future<record> require_record(std::string_view role_name);
|
||||
future<> collect_roles(
|
||||
std::string_view grantee_name,
|
||||
bool recurse,
|
||||
role_set& roles);
|
||||
};
|
||||
|
||||
} // namespace auth
|
||||
|
||||
@@ -329,13 +329,13 @@ public:
|
||||
auto it = candidates.begin();
|
||||
auto& first_sstable = *it;
|
||||
it++;
|
||||
dht::token first = first_sstable->get_first_decorated_key()._token;
|
||||
dht::token last = first_sstable->get_last_decorated_key()._token;
|
||||
dht::token first = first_sstable->get_first_decorated_key().token();
|
||||
dht::token last = first_sstable->get_last_decorated_key().token();
|
||||
while (it != candidates.end()) {
|
||||
auto& candidate_sstable = *it;
|
||||
it++;
|
||||
dht::token first_candidate = candidate_sstable->get_first_decorated_key()._token;
|
||||
dht::token last_candidate = candidate_sstable->get_last_decorated_key()._token;
|
||||
dht::token first_candidate = candidate_sstable->get_first_decorated_key().token();
|
||||
dht::token last_candidate = candidate_sstable->get_last_decorated_key().token();
|
||||
|
||||
first = first <= first_candidate? first : first_candidate;
|
||||
last = last >= last_candidate ? last : last_candidate;
|
||||
@@ -345,7 +345,7 @@ public:
|
||||
|
||||
template <typename T>
|
||||
static std::vector<sstables::shared_sstable> overlapping(const schema& s, const sstables::shared_sstable& sstable, const T& others) {
|
||||
return overlapping(s, sstable->get_first_decorated_key()._token, sstable->get_last_decorated_key()._token, others);
|
||||
return overlapping(s, sstable->get_first_decorated_key().token(), sstable->get_last_decorated_key().token(), others);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -359,7 +359,7 @@ public:
|
||||
auto range = ::wrapping_interval<dht::token>::make(start, end);
|
||||
|
||||
for (auto& candidate : sstables) {
|
||||
auto candidate_range = ::wrapping_interval<dht::token>::make(candidate->get_first_decorated_key()._token, candidate->get_last_decorated_key()._token);
|
||||
auto candidate_range = ::wrapping_interval<dht::token>::make(candidate->get_first_decorated_key().token(), candidate->get_last_decorated_key().token());
|
||||
|
||||
if (range.overlaps(candidate_range, dht::token_comparator())) {
|
||||
overlapped.push_back(candidate);
|
||||
|
||||
42
db/config.cc
42
db/config.cc
@@ -621,6 +621,25 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
* @GroupDescription: Provides an overview of the group.
|
||||
*/
|
||||
/**
|
||||
* @Group Ungrouped properties
|
||||
*/
|
||||
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
|
||||
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
|
||||
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
|
||||
"true: auto-adjust memtable shares for flush processes")
|
||||
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
|
||||
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
|
||||
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
|
||||
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
|
||||
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
|
||||
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
|
||||
"Set to 0 to disable automatic flushing all tables before major compaction.")
|
||||
/**
|
||||
* @Group Initialization properties
|
||||
* @GroupDescription The minimal properties needed for configuring a cluster.
|
||||
*/
|
||||
@@ -1375,10 +1394,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
|
||||
, reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2,
|
||||
"Admit new reads while there are less than this number of requests that need CPU.")
|
||||
, reader_concurrency_semaphore_preemptive_abort_factor(this, "reader_concurrency_semaphore_preemptive_abort_factor", liveness::LiveUpdate, value_status::Used, 0.3,
|
||||
"Admit new reads while their remaining time is more than this factor times their timeout times when arrived to a semaphore. Its vale means\n"
|
||||
"* <= 0.0 means new reads will never get rejected during admission\n"
|
||||
"* >= 1.0 means new reads will always get rejected during admission\n")
|
||||
, view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2,
|
||||
"Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.")
|
||||
, view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4,
|
||||
@@ -1587,25 +1602,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
|
||||
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
|
||||
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
|
||||
/**
|
||||
* @Group Ungrouped properties
|
||||
*/
|
||||
, background_writer_scheduling_quota(this, "background_writer_scheduling_quota", value_status::Deprecated, 1.0,
|
||||
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5.")
|
||||
, auto_adjust_flush_quota(this, "auto_adjust_flush_quota", value_status::Deprecated, false,
|
||||
"true: auto-adjust memtable shares for flush processes")
|
||||
, memtable_flush_static_shares(this, "memtable_flush_static_shares", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
|
||||
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
|
||||
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
|
||||
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
|
||||
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
|
||||
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400)."
|
||||
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments."
|
||||
"Set to 0 to disable automatic flushing all tables before major compaction.")
|
||||
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")
|
||||
|
||||
16
db/config.hh
16
db/config.hh
@@ -185,6 +185,13 @@ public:
|
||||
* All values and documentation taken from
|
||||
* http://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html
|
||||
*/
|
||||
named_value<double> background_writer_scheduling_quota;
|
||||
named_value<bool> auto_adjust_flush_quota;
|
||||
named_value<float> memtable_flush_static_shares;
|
||||
named_value<float> compaction_static_shares;
|
||||
named_value<float> compaction_max_shares;
|
||||
named_value<bool> compaction_enforce_min_threshold;
|
||||
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
|
||||
named_value<sstring> cluster_name;
|
||||
named_value<sstring> listen_address;
|
||||
named_value<sstring> listen_interface;
|
||||
@@ -439,7 +446,6 @@ public:
|
||||
named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier;
|
||||
named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier;
|
||||
named_value<uint32_t> reader_concurrency_semaphore_cpu_concurrency;
|
||||
named_value<float> reader_concurrency_semaphore_preemptive_abort_factor;
|
||||
named_value<uint32_t> view_update_reader_concurrency_semaphore_serialize_limit_multiplier;
|
||||
named_value<uint32_t> view_update_reader_concurrency_semaphore_kill_limit_multiplier;
|
||||
named_value<uint32_t> view_update_reader_concurrency_semaphore_cpu_concurrency;
|
||||
@@ -606,14 +612,6 @@ public:
|
||||
named_value<float> size_based_balance_threshold_percentage;
|
||||
named_value<uint64_t> minimal_tablet_size_for_balancing;
|
||||
|
||||
named_value<double> background_writer_scheduling_quota;
|
||||
named_value<bool> auto_adjust_flush_quota;
|
||||
named_value<float> memtable_flush_static_shares;
|
||||
named_value<float> compaction_static_shares;
|
||||
named_value<float> compaction_max_shares;
|
||||
named_value<bool> compaction_enforce_min_threshold;
|
||||
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
|
||||
|
||||
static const sstring default_tls_priority;
|
||||
private:
|
||||
template<typename T>
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
#include "readers/forwardable.hh"
|
||||
#include "readers/nonforwardable.hh"
|
||||
#include "cache_mutation_reader.hh"
|
||||
#include "replica/partition_snapshot_reader.hh"
|
||||
#include "partition_snapshot_reader.hh"
|
||||
#include "keys/clustering_key_filter.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
@@ -845,7 +845,7 @@ mutation_reader row_cache::make_nonpopulating_reader(schema_ptr schema, reader_p
|
||||
cache_entry& e = *i;
|
||||
upgrade_entry(e);
|
||||
tracing::trace(ts, "Reading partition {} from cache", pos);
|
||||
return replica::make_partition_snapshot_reader<false, dummy_accounter>(
|
||||
return make_partition_snapshot_flat_reader<false, dummy_accounter>(
|
||||
schema,
|
||||
std::move(permit),
|
||||
e.key(),
|
||||
|
||||
@@ -30,11 +30,13 @@ namespace dht {
|
||||
// Total ordering defined by comparators is compatible with Origin's ordering.
|
||||
class decorated_key {
|
||||
public:
|
||||
dht::token _token;
|
||||
// Store only the token data as int64_t to avoid the bloat of storing
|
||||
// token_kind, which is always token_kind::key for decorated_key.
|
||||
int64_t _token_data;
|
||||
partition_key _key;
|
||||
|
||||
decorated_key(dht::token t, partition_key k)
|
||||
: _token(std::move(t))
|
||||
: _token_data(t._data)
|
||||
, _key(std::move(k)) {
|
||||
}
|
||||
|
||||
@@ -56,8 +58,8 @@ public:
|
||||
std::strong_ordering tri_compare(const schema& s, const decorated_key& other) const;
|
||||
std::strong_ordering tri_compare(const schema& s, const ring_position& other) const;
|
||||
|
||||
const dht::token& token() const noexcept {
|
||||
return _token;
|
||||
dht::token token() const noexcept {
|
||||
return dht::token(_token_data);
|
||||
}
|
||||
|
||||
const partition_key& key() const {
|
||||
@@ -65,7 +67,7 @@ public:
|
||||
}
|
||||
|
||||
size_t external_memory_usage() const {
|
||||
return _key.external_memory_usage() + _token.external_memory_usage();
|
||||
return _key.external_memory_usage();
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
@@ -102,6 +104,6 @@ template <> struct fmt::formatter<dht::decorated_key> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
template <typename FormatContext>
|
||||
auto format(const dht::decorated_key& dk, FormatContext& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{{key: {}, token: {}}}", dk._key, dk._token);
|
||||
return fmt::format_to(ctx.out(), "{{key: {}, token: {}}}", dk._key, dk.token());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -95,7 +95,7 @@ std::unique_ptr<dht::i_partitioner> make_partitioner(sstring partitioner_name) {
|
||||
|
||||
bool
|
||||
decorated_key::equal(const schema& s, const decorated_key& other) const {
|
||||
if (_token == other._token) {
|
||||
if (_token_data == other._token_data) {
|
||||
return _key.legacy_equal(s, other._key);
|
||||
}
|
||||
return false;
|
||||
@@ -103,7 +103,7 @@ decorated_key::equal(const schema& s, const decorated_key& other) const {
|
||||
|
||||
std::strong_ordering
|
||||
decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
|
||||
auto r = _token <=> other._token;
|
||||
auto r = _token_data <=> other._token_data;
|
||||
if (r != 0) {
|
||||
return r;
|
||||
} else {
|
||||
@@ -113,13 +113,24 @@ decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
|
||||
|
||||
std::strong_ordering
|
||||
decorated_key::tri_compare(const schema& s, const ring_position& other) const {
|
||||
auto r = _token <=> other.token();
|
||||
if (r != 0) {
|
||||
return r;
|
||||
} else if (other.has_key()) {
|
||||
return _key.legacy_tri_compare(s, *other.key());
|
||||
// decorated_key tokens are always of token_kind::key, so we need to
|
||||
// account for ring_position tokens that might be before_all_keys or after_all_keys
|
||||
const auto& other_token = other.token();
|
||||
if (other_token._kind == token_kind::key) [[likely]] {
|
||||
auto r = _token_data <=> other_token._data;
|
||||
if (r != 0) {
|
||||
return r;
|
||||
} else if (other.has_key()) {
|
||||
return _key.legacy_tri_compare(s, *other.key());
|
||||
}
|
||||
return 0 <=> other.relation_to_keys();
|
||||
} else if (other_token._kind == token_kind::before_all_keys) {
|
||||
// decorated_key (token_kind::key) > before_all_keys
|
||||
return std::strong_ordering::greater;
|
||||
} else {
|
||||
// decorated_key (token_kind::key) < after_all_keys
|
||||
return std::strong_ordering::less;
|
||||
}
|
||||
return 0 <=> other.relation_to_keys();
|
||||
}
|
||||
|
||||
bool
|
||||
|
||||
@@ -93,12 +93,12 @@ public:
|
||||
{ }
|
||||
|
||||
ring_position(const dht::decorated_key& dk)
|
||||
: _token(dk._token)
|
||||
: _token(dk.token())
|
||||
, _key(std::make_optional(dk._key))
|
||||
{ }
|
||||
|
||||
ring_position(dht::decorated_key&& dk)
|
||||
: _token(std::move(dk._token))
|
||||
: _token(dk.token())
|
||||
, _key(std::make_optional(std::move(dk._key)))
|
||||
{ }
|
||||
|
||||
|
||||
@@ -78,7 +78,6 @@ Permits are in one of the following states:
|
||||
* `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability;
|
||||
* `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed;
|
||||
* `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume;
|
||||
* `preemptive_aborted` - the permit timed out or was rejected during admission as it was detected the read might time out later during execution;
|
||||
|
||||
Note that some older releases will have different names for some of these states or lack some of the states altogether:
|
||||
|
||||
|
||||
@@ -124,7 +124,6 @@ There are several test directories that are excluded from orchestration by `test
|
||||
- test/cql
|
||||
- test/cqlpy
|
||||
- test/rest_api
|
||||
- test/scylla_gdb
|
||||
|
||||
This means that `test.py` will not run tests directly, but will delegate all work to `pytest`.
|
||||
That's why all these directories do not have `suite.yaml` files.
|
||||
|
||||
@@ -31,7 +31,6 @@ fi
|
||||
|
||||
debian_base_packages=(
|
||||
clang
|
||||
clang-tools
|
||||
gdb
|
||||
cargo
|
||||
wabt
|
||||
@@ -73,7 +72,6 @@ debian_base_packages=(
|
||||
|
||||
fedora_packages=(
|
||||
clang
|
||||
clang-tools-extra
|
||||
compiler-rt
|
||||
libasan
|
||||
libubsan
|
||||
|
||||
@@ -316,7 +316,7 @@ auto fmt::formatter<mutation>::format(const mutation& m, fmt::format_context& ct
|
||||
++column_iterator;
|
||||
}
|
||||
|
||||
return fmt::format_to(out, "token: {}}}, {}\n}}", dk._token, mutation_partition::printer(s, m.partition()));
|
||||
return fmt::format_to(out, "token: {}}}, {}\n}}", dk.token(), mutation_partition::printer(s, m.partition()));
|
||||
}
|
||||
|
||||
namespace mutation_json {
|
||||
|
||||
@@ -126,7 +126,7 @@ public:
|
||||
const partition_key& key() const { return _ptr->_dk._key; };
|
||||
const dht::decorated_key& decorated_key() const { return _ptr->_dk; };
|
||||
dht::ring_position ring_position() const { return { decorated_key() }; }
|
||||
const dht::token& token() const { return _ptr->_dk._token; }
|
||||
dht::token token() const { return _ptr->_dk.token(); }
|
||||
const schema_ptr& schema() const { return _ptr->_schema; }
|
||||
const mutation_partition& partition() const { return _ptr->_p; }
|
||||
mutation_partition& partition() { return _ptr->_p; }
|
||||
|
||||
@@ -9,7 +9,9 @@
|
||||
#pragma once
|
||||
|
||||
#include "mutation/partition_version.hh"
|
||||
#include "readers/mutation_reader_fwd.hh"
|
||||
#include "readers/mutation_reader.hh"
|
||||
#include "readers/range_tombstone_change_merger.hh"
|
||||
#include "keys/clustering_key_filter.hh"
|
||||
#include "query/query-request.hh"
|
||||
#include "db/partition_snapshot_row_cursor.hh"
|
||||
@@ -17,10 +19,8 @@
|
||||
|
||||
extern seastar::logger mplog;
|
||||
|
||||
namespace replica {
|
||||
|
||||
template <bool Reversing, typename Accounter>
|
||||
class partition_snapshot_reader : public mutation_reader::impl, public Accounter {
|
||||
class partition_snapshot_flat_reader : public mutation_reader::impl, public Accounter {
|
||||
struct row_info {
|
||||
mutation_fragment_v2 row;
|
||||
tombstone rt_for_row;
|
||||
@@ -232,7 +232,7 @@ private:
|
||||
}
|
||||
public:
|
||||
template <typename... Args>
|
||||
partition_snapshot_reader(schema_ptr s, reader_permit permit, dht::decorated_key dk, partition_snapshot_ptr snp,
|
||||
partition_snapshot_flat_reader(schema_ptr s, reader_permit permit, dht::decorated_key dk, partition_snapshot_ptr snp,
|
||||
query::clustering_key_filter_ranges crr, bool digest_requested,
|
||||
logalloc::region& region, logalloc::allocating_section& read_section,
|
||||
std::any pointer_to_container, Args&&... args)
|
||||
@@ -285,7 +285,7 @@ public:
|
||||
|
||||
template <bool Reversing, typename Accounter, typename... Args>
|
||||
inline mutation_reader
|
||||
make_partition_snapshot_reader(schema_ptr s,
|
||||
make_partition_snapshot_flat_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
dht::decorated_key dk,
|
||||
query::clustering_key_filter_ranges crr,
|
||||
@@ -297,7 +297,7 @@ make_partition_snapshot_reader(schema_ptr s,
|
||||
streamed_mutation::forwarding fwd,
|
||||
Args&&... args)
|
||||
{
|
||||
auto res = make_mutation_reader<partition_snapshot_reader<Reversing, Accounter>>(std::move(s), std::move(permit), std::move(dk),
|
||||
auto res = make_mutation_reader<partition_snapshot_flat_reader<Reversing, Accounter>>(std::move(s), std::move(permit), std::move(dk),
|
||||
snp, std::move(crr), digest_requested, region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
|
||||
if (fwd) {
|
||||
return make_forwardable(std::move(res)); // FIXME: optimize
|
||||
@@ -305,5 +305,3 @@ make_partition_snapshot_reader(schema_ptr s,
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
@@ -148,7 +148,6 @@ public:
|
||||
};
|
||||
|
||||
private:
|
||||
const db::timeout_clock::time_point _created;
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
schema_ptr _schema;
|
||||
|
||||
@@ -238,25 +237,17 @@ private:
|
||||
break;
|
||||
case state::inactive:
|
||||
_semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time);
|
||||
// Return here on purpose. The evicted permit is destroyed when closing a reader.
|
||||
// As a consequence, any member access beyond this point is invalid.
|
||||
return;
|
||||
break;
|
||||
case state::evicted:
|
||||
case state::preemptive_aborted:
|
||||
break;
|
||||
}
|
||||
|
||||
// The function call not only sets state to reader_permit::state::preemptive_aborted
|
||||
// but also correctly decreases the statistics i.e. need_cpu_permits and awaits_permits.
|
||||
on_permit_inactive(reader_permit::state::preemptive_aborted);
|
||||
}
|
||||
|
||||
public:
|
||||
struct value_tag {};
|
||||
|
||||
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _created(db::timeout_clock::now())
|
||||
, _semaphore(semaphore)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(std::move(schema))
|
||||
, _op_name_view(op_name)
|
||||
, _base_resources(base_resources)
|
||||
@@ -267,8 +258,7 @@ public:
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _created(db::timeout_clock::now())
|
||||
, _semaphore(semaphore)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(std::move(schema))
|
||||
, _op_name(std::move(op_name))
|
||||
, _op_name_view(_op_name)
|
||||
@@ -370,17 +360,6 @@ public:
|
||||
on_permit_active();
|
||||
}
|
||||
|
||||
void on_preemptive_aborted() {
|
||||
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
|
||||
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
|
||||
}
|
||||
|
||||
_ttl_timer.cancel();
|
||||
_state = reader_permit::state::preemptive_aborted;
|
||||
_aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name));
|
||||
_semaphore.on_permit_preemptive_aborted();
|
||||
}
|
||||
|
||||
void on_register_as_inactive() {
|
||||
SCYLLA_ASSERT(_state == reader_permit::state::active || _state == reader_permit::state::active_need_cpu || _state == reader_permit::state::waiting_for_memory);
|
||||
on_permit_inactive(reader_permit::state::inactive);
|
||||
@@ -488,10 +467,6 @@ public:
|
||||
return _semaphore.do_wait_admission(*this);
|
||||
}
|
||||
|
||||
db::timeout_clock::time_point created() const noexcept {
|
||||
return _created;
|
||||
}
|
||||
|
||||
db::timeout_clock::time_point timeout() const noexcept {
|
||||
return _ttl_timer.armed() ? _ttl_timer.get_timeout() : db::no_timeout;
|
||||
}
|
||||
@@ -714,9 +689,6 @@ auto fmt::formatter<reader_permit::state>::format(reader_permit::state s, fmt::f
|
||||
case reader_permit::state::evicted:
|
||||
name = "evicted";
|
||||
break;
|
||||
case reader_permit::state::preemptive_aborted:
|
||||
name = "preemptive_aborted";
|
||||
break;
|
||||
}
|
||||
return formatter<string_view>::format(name, ctx);
|
||||
}
|
||||
@@ -1066,7 +1038,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
utils::updateable_value<float> preemptive_abort_factor,
|
||||
register_metrics metrics)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
@@ -1076,7 +1047,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
|
||||
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
|
||||
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
|
||||
, _cpu_concurrency(cpu_concurrency)
|
||||
, _preemptive_abort_factor(preemptive_abort_factor)
|
||||
, _close_readers_gate(format("[reader_concurrency_semaphore {}] close_readers", _name))
|
||||
, _permit_gate(format("[reader_concurrency_semaphore {}] permit", _name))
|
||||
{
|
||||
@@ -1144,7 +1114,6 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
utils::updateable_value(float(0.0)),
|
||||
metrics) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
@@ -1520,25 +1489,6 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
auto& permit = _wait_list.front();
|
||||
dequeue_permit(permit);
|
||||
try {
|
||||
// Do not admit the read as it is unlikely to finish before its timeout. The condition is:
|
||||
// permit's remaining time <= preemptive_abort_factor * permit's time budget
|
||||
//
|
||||
// The additional check for remaining_time > 0 is to avoid preemptive aborting reads
|
||||
// that already timed out but are still in the wait list due to scheduling delays.
|
||||
// It also effectively disables preemptive aborting when the factor is set to 0.
|
||||
const auto time_budget = permit.timeout() - permit.created();
|
||||
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
|
||||
if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) {
|
||||
permit.on_preemptive_aborted();
|
||||
using ms = std::chrono::milliseconds;
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
|
||||
_name,
|
||||
std::chrono::duration_cast<ms>(time_budget - remaining_time),
|
||||
std::chrono::duration_cast<ms>(time_budget),
|
||||
_preemptive_abort_factor());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
_blessed_permit = &permit;
|
||||
permit.on_granted_memory();
|
||||
@@ -1599,11 +1549,7 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
|
||||
case reader_permit::state::waiting_for_admission:
|
||||
case reader_permit::state::waiting_for_memory:
|
||||
case reader_permit::state::waiting_for_execution:
|
||||
if (_stats.waiters > 0) {
|
||||
--_stats.waiters;
|
||||
} else {
|
||||
on_internal_error_noexcept(rcslog, "reader_concurrency_semaphore::dequeue_permit(): invalid state: no waiters yet dequeueing a waiting permit");
|
||||
}
|
||||
--_stats.waiters;
|
||||
break;
|
||||
case reader_permit::state::inactive:
|
||||
case reader_permit::state::evicted:
|
||||
@@ -1612,17 +1558,12 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
|
||||
case reader_permit::state::active:
|
||||
case reader_permit::state::active_need_cpu:
|
||||
case reader_permit::state::active_await:
|
||||
case reader_permit::state::preemptive_aborted:
|
||||
on_internal_error_noexcept(rcslog, format("reader_concurrency_semaphore::dequeue_permit(): unrecognized queued state: {}", permit.get_state()));
|
||||
}
|
||||
permit.unlink();
|
||||
_permit_list.push_back(permit);
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_preemptive_aborted() noexcept {
|
||||
++_stats.total_reads_shed_due_to_overload;
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
|
||||
_permit_gate.enter();
|
||||
_permit_list.push_back(permit);
|
||||
|
||||
@@ -42,7 +42,7 @@ using mutation_reader_opt = optimized_optional<mutation_reader>;
|
||||
/// number of waiting readers becomes equal or greater than
|
||||
/// `max_queue_length` (upon calling `obtain_permit()`) an exception of
|
||||
/// type `std::runtime_error` is thrown. Optionally, some additional
|
||||
/// code can be executed just before throwing (`prethrow_action`
|
||||
/// code can be executed just before throwing (`prethrow_action`
|
||||
/// constructor parameter).
|
||||
///
|
||||
/// The semaphore has 3 layers of defense against consuming more memory
|
||||
@@ -89,7 +89,6 @@ public:
|
||||
// Total number of failed reads executed through this semaphore.
|
||||
uint64_t total_failed_reads = 0;
|
||||
// Total number of reads rejected because the admission queue reached its max capacity
|
||||
// or rejected due to a high probability of not getting finalized on time.
|
||||
uint64_t total_reads_shed_due_to_overload = 0;
|
||||
// Total number of reads killed due to the memory consumption reaching the kill limit.
|
||||
uint64_t total_reads_killed_due_to_kill_limit = 0;
|
||||
@@ -193,8 +192,6 @@ private:
|
||||
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _kill_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _cpu_concurrency;
|
||||
utils::updateable_value<float> _preemptive_abort_factor;
|
||||
|
||||
stats _stats;
|
||||
std::optional<seastar::metrics::metric_groups> _metrics;
|
||||
bool _stopped = false;
|
||||
@@ -253,8 +250,6 @@ private:
|
||||
void on_permit_created(reader_permit::impl&);
|
||||
void on_permit_destroyed(reader_permit::impl&) noexcept;
|
||||
|
||||
void on_permit_preemptive_aborted() noexcept;
|
||||
|
||||
void on_permit_need_cpu() noexcept;
|
||||
void on_permit_not_need_cpu() noexcept;
|
||||
|
||||
@@ -292,7 +287,6 @@ public:
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
utils::updateable_value<float> preemptive_abort_factor,
|
||||
register_metrics metrics);
|
||||
|
||||
reader_concurrency_semaphore(
|
||||
@@ -302,12 +296,9 @@ public:
|
||||
size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
utils::updateable_value<float> preemptive_abort_factor,
|
||||
register_metrics metrics)
|
||||
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
|
||||
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency),
|
||||
std::move(preemptive_abort_factor), metrics)
|
||||
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value<uint32_t>(1), metrics)
|
||||
{ }
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
@@ -327,10 +318,9 @@ public:
|
||||
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t> cpu_concurrency = utils::updateable_value<uint32_t>(1),
|
||||
utils::updateable_value<float> preemptive_abort_factor = utils::updateable_value<float>(0.0f),
|
||||
register_metrics metrics = register_metrics::no)
|
||||
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
|
||||
std::move(kill_limit_multipler), std::move(cpu_concurrency), std::move(preemptive_abort_factor), metrics)
|
||||
std::move(kill_limit_multipler), std::move(cpu_concurrency), metrics)
|
||||
{}
|
||||
|
||||
virtual ~reader_concurrency_semaphore();
|
||||
|
||||
@@ -70,8 +70,7 @@ reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(
|
||||
_max_queue_length,
|
||||
_serialize_limit_multiplier,
|
||||
_kill_limit_multiplier,
|
||||
_cpu_concurrency,
|
||||
_preemptive_abort_factor
|
||||
_cpu_concurrency
|
||||
);
|
||||
auto&& it = result.first;
|
||||
// since we serialize all group changes this change wait will be queues and no further operations
|
||||
|
||||
@@ -26,7 +26,6 @@ class reader_concurrency_semaphore_group {
|
||||
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _kill_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _cpu_concurrency;
|
||||
utils::updateable_value<float> _preemptive_abort_factor;
|
||||
|
||||
friend class database_test_wrapper;
|
||||
|
||||
@@ -37,12 +36,11 @@ class reader_concurrency_semaphore_group {
|
||||
weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
utils::updateable_value<float> preemptive_abort_factor)
|
||||
utils::updateable_value<uint32_t> cpu_concurrency)
|
||||
: weight(shares)
|
||||
, memory_share(0)
|
||||
, sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier),
|
||||
std::move(cpu_concurrency), std::move(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::yes) {}
|
||||
std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {}
|
||||
};
|
||||
|
||||
std::unordered_map<scheduling_group, weighted_reader_concurrency_semaphore> _semaphores;
|
||||
@@ -56,7 +54,6 @@ public:
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
utils::updateable_value<float> preemptive_abort_factor,
|
||||
std::optional<sstring> name_prefix = std::nullopt)
|
||||
: _total_memory(memory)
|
||||
, _total_weight(0)
|
||||
@@ -65,7 +62,6 @@ public:
|
||||
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
|
||||
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
|
||||
, _cpu_concurrency(std::move(cpu_concurrency))
|
||||
, _preemptive_abort_factor(std::move(preemptive_abort_factor))
|
||||
, _operations_serializer(1)
|
||||
, _name_prefix(std::move(name_prefix)) { }
|
||||
|
||||
|
||||
@@ -92,7 +92,6 @@ public:
|
||||
active_await,
|
||||
inactive,
|
||||
evicted,
|
||||
preemptive_aborted,
|
||||
};
|
||||
|
||||
class impl;
|
||||
|
||||
@@ -412,7 +412,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
utils::updateable_value(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::yes)
|
||||
// No limits, just for accounting.
|
||||
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)
|
||||
@@ -424,8 +423,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
utils::updateable_value(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::yes)
|
||||
, _view_update_read_concurrency_semaphores_group(
|
||||
max_memory_concurrent_view_update_reads(),
|
||||
@@ -434,7 +431,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
|
||||
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,
|
||||
utils::updateable_value(0.0f),
|
||||
"view_update")
|
||||
, _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value<double>(), cache_tracker::register_metrics::yes)
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
@@ -464,8 +460,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(),
|
||||
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
|
||||
_cfg.reader_concurrency_semaphore_cpu_concurrency,
|
||||
_cfg.reader_concurrency_semaphore_preemptive_abort_factor)
|
||||
_cfg.reader_concurrency_semaphore_cpu_concurrency)
|
||||
, _stop_barrier(std::move(barrier))
|
||||
, _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); })
|
||||
, _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer()))
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
#include "memtable.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "replica/partition_snapshot_reader.hh"
|
||||
#include "partition_snapshot_reader.hh"
|
||||
#include "partition_builder.hh"
|
||||
#include "mutation/mutation_partition_view.hh"
|
||||
#include "readers/empty.hh"
|
||||
@@ -19,7 +19,7 @@
|
||||
|
||||
namespace replica {
|
||||
|
||||
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
|
||||
static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
|
||||
bool is_reversed,
|
||||
reader_permit permit,
|
||||
dht::decorated_key dk,
|
||||
@@ -482,7 +482,7 @@ public:
|
||||
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key());
|
||||
bool digest_requested = _slice.options.contains<query::partition_slice::option::with_digest>();
|
||||
bool is_reversed = _slice.is_reversed();
|
||||
_delegate = make_partition_snapshot_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
|
||||
_delegate = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
|
||||
_delegate->upgrade_schema(schema());
|
||||
} else {
|
||||
_end_of_stream = true;
|
||||
@@ -604,7 +604,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
|
||||
static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema(
|
||||
bool is_reversed,
|
||||
reader_permit permit,
|
||||
dht::decorated_key dk,
|
||||
@@ -617,10 +617,10 @@ static mutation_reader make_partition_snapshot_reader_from_snp_schema(
|
||||
streamed_mutation::forwarding fwd, memtable& memtable) {
|
||||
if (is_reversed) {
|
||||
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
|
||||
return make_partition_snapshot_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
return make_partition_snapshot_flat_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
} else {
|
||||
schema_ptr snp_schema = snp->schema();
|
||||
return make_partition_snapshot_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
return make_partition_snapshot_flat_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -660,7 +660,7 @@ private:
|
||||
update_last(key_and_snp->first);
|
||||
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key());
|
||||
auto snp_schema = key_and_snp->second->schema();
|
||||
_partition_reader = make_partition_snapshot_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
|
||||
_partition_reader = make_partition_snapshot_flat_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
|
||||
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
|
||||
_partition_reader->upgrade_schema(schema());
|
||||
}
|
||||
@@ -737,7 +737,7 @@ memtable::make_mutation_reader_opt(schema_ptr query_schema,
|
||||
auto dk = pos.as_decorated_key();
|
||||
auto cr = query::clustering_key_filter_ranges::get_ranges(*query_schema, slice, dk.key());
|
||||
bool digest_requested = slice.options.contains<query::partition_slice::option::with_digest>();
|
||||
auto rd = make_partition_snapshot_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this);
|
||||
auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this);
|
||||
rd.upgrade_schema(query_schema);
|
||||
return rd;
|
||||
} else {
|
||||
|
||||
175
replica/table.cc
175
replica/table.cc
@@ -1746,97 +1746,100 @@ table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) n
|
||||
}
|
||||
|
||||
future<>
|
||||
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit_) {
|
||||
table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
|
||||
co_await utils::get_local_injector().inject("flush_memtable_to_sstable_wait", utils::wait_for_message(60s));
|
||||
|
||||
auto permit = make_lw_shared(std::move(permit_));
|
||||
co_await coroutine::switch_to(_config.memtable_scheduling_group);
|
||||
// Note that due to our sharded architecture, it is possible that
|
||||
// in the face of a value change some shards will backup sstables
|
||||
// while others won't.
|
||||
//
|
||||
// This is, in theory, possible to mitigate through a rwlock.
|
||||
// However, this doesn't differ from the situation where all tables
|
||||
// are coming from a single shard and the toggle happens in the
|
||||
// middle of them.
|
||||
//
|
||||
// The code as is guarantees that we'll never partially backup a
|
||||
// single sstable, so that is enough of a guarantee.
|
||||
auto try_flush = [this, old = std::move(old), permit = make_lw_shared(std::move(permit)), &cg] () mutable -> future<> {
|
||||
// Note that due to our sharded architecture, it is possible that
|
||||
// in the face of a value change some shards will backup sstables
|
||||
// while others won't.
|
||||
//
|
||||
// This is, in theory, possible to mitigate through a rwlock.
|
||||
// However, this doesn't differ from the situation where all tables
|
||||
// are coming from a single shard and the toggle happens in the
|
||||
// middle of them.
|
||||
//
|
||||
// The code as is guarantees that we'll never partially backup a
|
||||
// single sstable, so that is enough of a guarantee.
|
||||
|
||||
auto newtabs = std::vector<sstables::shared_sstable>();
|
||||
auto metadata = mutation_source_metadata{};
|
||||
metadata.min_timestamp = old->get_min_timestamp();
|
||||
metadata.max_timestamp = old->get_max_timestamp();
|
||||
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
|
||||
auto newtabs = std::vector<sstables::shared_sstable>();
|
||||
auto metadata = mutation_source_metadata{};
|
||||
metadata.min_timestamp = old->get_min_timestamp();
|
||||
metadata.max_timestamp = old->get_max_timestamp();
|
||||
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
|
||||
|
||||
if (!cg.async_gate().is_closed()) {
|
||||
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
|
||||
}
|
||||
|
||||
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
|
||||
auto newtab = make_sstable();
|
||||
newtabs.push_back(newtab);
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
|
||||
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
|
||||
old->get_max_timestamp());
|
||||
|
||||
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await reader.close();
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
});
|
||||
|
||||
auto f = consumer(old->make_flush_reader(
|
||||
old->schema(),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
co_await coroutine::switch_to(default_scheduling_group());
|
||||
try {
|
||||
co_await std::move(f);
|
||||
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
|
||||
co_await newtab->open_data();
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
});
|
||||
|
||||
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
|
||||
return update_cache(cg, old, newtabs);
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
|
||||
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
if (this_table_name == handler.get("table_name")) {
|
||||
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
|
||||
handler.set("suspended", true);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
|
||||
}
|
||||
});
|
||||
|
||||
cg.memtables()->erase(old);
|
||||
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
||||
co_return;
|
||||
} catch (const std::exception& e) {
|
||||
for (auto& newtab : newtabs) {
|
||||
newtab->mark_for_deletion();
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
if (!cg.async_gate().is_closed()) {
|
||||
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.view_for_unrepaired_data());
|
||||
}
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
throw;
|
||||
}
|
||||
|
||||
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, estimated_partitions, &cg] (mutation_reader reader) mutable -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
|
||||
auto newtab = make_sstable();
|
||||
newtabs.push_back(newtab);
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
|
||||
auto monitor = database_sstable_write_monitor(permit, newtab, cg,
|
||||
old->get_max_timestamp());
|
||||
|
||||
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await reader.close();
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
});
|
||||
|
||||
auto f = consumer(old->make_flush_reader(
|
||||
old->schema(),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
auto post_flush = [this, old = std::move(old), &newtabs, f = std::move(f), &cg] () mutable -> future<> {
|
||||
try {
|
||||
co_await std::move(f);
|
||||
co_await coroutine::parallel_for_each(newtabs, [] (auto& newtab) -> future<> {
|
||||
co_await newtab->open_data();
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
});
|
||||
|
||||
co_await with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs, &cg] {
|
||||
return update_cache(cg, old, newtabs);
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
|
||||
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
||||
if (this_table_name == handler.get("table_name")) {
|
||||
tlogger.info("error injection handler replica_post_flush_after_update_cache: suspending flush for table {}", this_table_name);
|
||||
handler.set("suspended", true);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
tlogger.info("error injection handler replica_post_flush_after_update_cache: resuming flush for table {}", this_table_name);
|
||||
}
|
||||
});
|
||||
|
||||
cg.memtables()->erase(old);
|
||||
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
||||
co_return;
|
||||
} catch (const std::exception& e) {
|
||||
for (auto& newtab : newtabs) {
|
||||
newtab->mark_for_deletion();
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
}
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
throw;
|
||||
}
|
||||
};
|
||||
co_return co_await with_scheduling_group(default_scheduling_group(), std::ref(post_flush));
|
||||
};
|
||||
co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush));
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -45,8 +45,6 @@ sstables_manager::sstables_manager(
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
utils::updateable_value(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::no)
|
||||
, _dir_semaphore(dir_sem)
|
||||
, _resolve_host_id(std::move(resolve_host_id))
|
||||
|
||||
@@ -43,7 +43,7 @@ lazy_comparable_bytes_from_ring_position::lazy_comparable_bytes_from_ring_positi
|
||||
, _weight(bound_weight::equal)
|
||||
, _pk(std::move(dk._key))
|
||||
{
|
||||
init_first_fragment(dk._token);
|
||||
init_first_fragment(dk.token());
|
||||
}
|
||||
|
||||
void lazy_comparable_bytes_from_ring_position::init_first_fragment(dht::token dht_token) {
|
||||
|
||||
1
test.py
1
test.py
@@ -68,7 +68,6 @@ PYTEST_RUNNER_DIRECTORIES = [
|
||||
TEST_DIR / 'cqlpy',
|
||||
TEST_DIR / 'rest_api',
|
||||
TEST_DIR / 'nodetool',
|
||||
TEST_DIR / 'scylla_gdb',
|
||||
]
|
||||
|
||||
launch_time = time.monotonic()
|
||||
|
||||
@@ -201,7 +201,7 @@ SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_singular) {
|
||||
auto key1 = partition_key::from_single_value(*s1, b);
|
||||
auto dk1 = partitioner.decorate_key(*s1, key1);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(dk._token, dk1._token);
|
||||
BOOST_REQUIRE_EQUAL(dk.token(), dk1.token());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_two_components) {
|
||||
@@ -223,7 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_conversion_to_legacy_form_same_token_two_component
|
||||
auto key1 = partition_key::from_single_value(*s1, b);
|
||||
auto dk1 = partitioner.decorate_key(*s1, key1);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(dk._token, dk1._token);
|
||||
BOOST_REQUIRE_EQUAL(dk.token(), dk1.token());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_legacy_ordering_of_singular) {
|
||||
|
||||
@@ -2152,7 +2152,6 @@ struct scoped_execption_log_level {
|
||||
|
||||
SEASTAR_TEST_CASE(replica_read_timeout_no_exception) {
|
||||
cql_test_config cfg;
|
||||
cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0);
|
||||
const auto read_timeout = 10ms;
|
||||
const auto write_timeout = 10s;
|
||||
cfg.query_timeout.emplace(timeout_config{
|
||||
|
||||
@@ -1185,13 +1185,6 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
|
||||
#ifdef DEBUG
|
||||
// This test was observed to take multiple minutes to run in debug mode on CI machines.
|
||||
// This test checks that a certain behaviour is triggered when compaction falls behind.
|
||||
// Not critical to run in debug mode. Both compaction and memtable have their own
|
||||
// correctness tests, which do run in debug mode.
|
||||
return make_ready_future<>();
|
||||
#else
|
||||
BOOST_ASSERT(smp::count == 2);
|
||||
// The test simulates a situation where 2 threads issue flushes to 2
|
||||
// tables. Both issue small flushes, but one has injected reactor stalls.
|
||||
@@ -1266,7 +1259,6 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
|
||||
sleep_ms *= 2;
|
||||
}
|
||||
});
|
||||
#endif
|
||||
}
|
||||
|
||||
static future<> exceptions_in_flush_helper(std::unique_ptr<sstables::file_io_extension> mep, bool& should_fail, const bool& did_fail, const schema*& schema_filter, bool expect_isolate) {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#include "mutation/partition_version.hh"
|
||||
#include "db/partition_snapshot_row_cursor.hh"
|
||||
#include "partition_snapshot_reader.hh"
|
||||
#include "keys/clustering_interval_set.hh"
|
||||
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
|
||||
@@ -120,7 +120,7 @@ SEASTAR_THREAD_TEST_CASE(test_decorated_key_is_compatible_with_origin) {
|
||||
auto dk = partitioner.decorate_key(*s, key);
|
||||
|
||||
// Expected value was taken from Origin
|
||||
BOOST_REQUIRE_EQUAL(dk._token, token_from_long(4958784316840156970));
|
||||
BOOST_REQUIRE_EQUAL(dk.token(), token_from_long(4958784316840156970));
|
||||
BOOST_REQUIRE(dk._key.equal(*s, key));
|
||||
}
|
||||
|
||||
|
||||
@@ -517,38 +517,6 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_abort) {
|
||||
const auto preemptive_abort_factor = 0.5f;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost,
|
||||
100, utils::updateable_value(std::numeric_limits<uint32_t>::max()), utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t>(1), utils::updateable_value<float>(preemptive_abort_factor));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
BOOST_REQUIRE(semaphore.get_stats().total_reads_shed_due_to_overload == 0);
|
||||
|
||||
auto timeout = db::timeout_clock::now() + 500ms;
|
||||
|
||||
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get();
|
||||
|
||||
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
|
||||
|
||||
// The permits are rejected when the remaining time is less than half of its timeout when arrived to the semaphore.
|
||||
// Hence, sleep 300ms to reject the permits in the waitlist during admission.
|
||||
seastar::sleep(300ms).get();
|
||||
|
||||
permit1 = {};
|
||||
const auto futures_failed = eventually_true([&] { return permit2_fut.failed(); });
|
||||
BOOST_CHECK(futures_failed);
|
||||
BOOST_CHECK_THROW(std::rethrow_exception(permit2_fut.get_exception()), semaphore_aborted);
|
||||
BOOST_CHECK(semaphore.get_stats().total_reads_shed_due_to_overload > 0);
|
||||
}
|
||||
|
||||
// All units should have been deposited back.
|
||||
REQUIRE_EVENTUALLY_EQUAL<ssize_t>([&] { return semaphore.available_resources().memory; }, replica::new_reader_base_cost);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
|
||||
return async([&] () {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost, 2);
|
||||
@@ -629,8 +597,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
|
||||
|
||||
permit.resources = permit.permit->consume_resources(reader_resources(tests::random::get_int<unsigned>(0, 1), tests::random::get_int<unsigned>(1024, 16 * 1024 * 1024)));
|
||||
} else {
|
||||
//Ensure timeout_seconds > 0 to avoid permits being rejected during admission. The test will become flaky.
|
||||
const auto timeout_seconds = tests::random::get_int<unsigned>(1, 4);
|
||||
const auto timeout_seconds = tests::random::get_int<unsigned>(0, 3);
|
||||
|
||||
permit.permit_fut = semaphore.obtain_permit(
|
||||
schema,
|
||||
@@ -1259,14 +1226,11 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) {
|
||||
auto serialize_multiplier = utils::updateable_value_source<uint32_t>(2);
|
||||
auto kill_multiplier = utils::updateable_value_source<uint32_t>(3);
|
||||
auto cpu_concurrency = utils::updateable_value_source<uint32_t>(1);
|
||||
auto preemptive_abort_factor = utils::updateable_value_source<float>(0.0f);
|
||||
|
||||
reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000,
|
||||
utils::updateable_value(serialize_multiplier),
|
||||
utils::updateable_value(kill_multiplier),
|
||||
utils::updateable_value(cpu_concurrency),
|
||||
utils::updateable_value(preemptive_abort_factor));
|
||||
|
||||
utils::updateable_value(cpu_concurrency));
|
||||
auto stop_sem = deferred_stop(sem_group);
|
||||
|
||||
circular_buffer<scheduling_group> recycle_bin;
|
||||
@@ -1508,8 +1472,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
const auto kill_multiplier = 3;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
const size_t reader_count_target = 6;
|
||||
@@ -1762,8 +1726,9 @@ SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
|
||||
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
|
||||
@@ -1824,8 +1789,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_inactive) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
|
||||
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
simple_schema ss;
|
||||
@@ -1885,8 +1851,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_memory_goes_inactive) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
|
||||
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
|
||||
@@ -1930,7 +1897,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me
|
||||
// This test covers all the cases where eviction should **not** happen.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
|
||||
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
simple_schema ss;
|
||||
@@ -2020,7 +1990,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicti
|
||||
// Check that inactive reads are evicted when they are blocking admission
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
|
||||
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
simple_schema ss;
|
||||
@@ -2174,7 +2147,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
|
||||
// resources.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeup) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
|
||||
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
|
||||
@@ -2210,8 +2186,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
|
||||
const uint32_t initial_memory = 4 * 1024;
|
||||
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto cpu_concurrency = 1;
|
||||
const auto preemptive_abort_factor = 0.0f;
|
||||
|
||||
reader_concurrency_semaphore semaphore(
|
||||
utils::updateable_value(count),
|
||||
@@ -2220,8 +2194,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
|
||||
100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier),
|
||||
utils::updateable_value<uint32_t>(kill_multiplier),
|
||||
utils::updateable_value<uint32_t>(cpu_concurrency),
|
||||
utils::updateable_value<float>(preemptive_abort_factor),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -2241,7 +2214,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
|
||||
const uint32_t initial_memory = 4 * 1024;
|
||||
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
|
||||
const auto preemptive_abort_factor = 0.0f;
|
||||
|
||||
reader_concurrency_semaphore semaphore(
|
||||
utils::updateable_value<int>(initial_count),
|
||||
@@ -2251,7 +2223,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier),
|
||||
utils::updateable_value<uint32_t>(kill_multiplier),
|
||||
utils::updateable_value(cpu_concurrency),
|
||||
utils::updateable_value<float>(preemptive_abort_factor),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -2304,7 +2275,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c
|
||||
utils::updateable_value<uint32_t>(2),
|
||||
utils::updateable_value<uint32_t>(4),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
utils::updateable_value<float>(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -2358,7 +2328,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
|
||||
utils::updateable_value<uint32_t>(2),
|
||||
utils::updateable_value<uint32_t>(400),
|
||||
utils::updateable_value<uint32_t>(2),
|
||||
utils::updateable_value<float>(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -2423,7 +2392,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_perm
|
||||
utils::updateable_value<uint32_t>(200),
|
||||
utils::updateable_value<uint32_t>(400),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
utils::updateable_value<float>(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
@@ -2465,7 +2433,6 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
|
||||
utils::updateable_value<uint32_t>(200),
|
||||
utils::updateable_value<uint32_t>(400),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
utils::updateable_value<float>(0.0f),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
|
||||
@@ -4837,8 +4837,8 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
|
||||
// of course doesn't necessarily help release pressure on the semaphore.
|
||||
SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) {
|
||||
simple_schema s;
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 1, std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value<uint32_t>(1), utils::updateable_value<uint32_t>(1));
|
||||
reader_concurrency_semaphore semaphore(100, 1, get_name(), std::numeric_limits<size_t>::max(), utils::updateable_value<uint32_t>(1),
|
||||
utils::updateable_value<uint32_t>(1), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_semaphore = deferred_stop(semaphore);
|
||||
|
||||
cache_tracker tracker;
|
||||
|
||||
@@ -653,8 +653,8 @@ static bool key_range_overlaps(table_for_tests& cf, const dht::decorated_key& a,
|
||||
}
|
||||
|
||||
static bool sstable_overlaps(const lw_shared_ptr<replica::column_family>& cf, sstables::shared_sstable candidate1, sstables::shared_sstable candidate2) {
|
||||
auto range1 = wrapping_interval<dht::token>::make(candidate1->get_first_decorated_key()._token, candidate1->get_last_decorated_key()._token);
|
||||
auto range2 = wrapping_interval<dht::token>::make(candidate2->get_first_decorated_key()._token, candidate2->get_last_decorated_key()._token);
|
||||
auto range1 = wrapping_interval<dht::token>::make(candidate1->get_first_decorated_key().token(), candidate1->get_last_decorated_key().token());
|
||||
auto range2 = wrapping_interval<dht::token>::make(candidate2->get_first_decorated_key().token(), candidate2->get_last_decorated_key().token());
|
||||
return range1.overlaps(range2, dht::token_comparator());
|
||||
}
|
||||
|
||||
|
||||
@@ -2986,8 +2986,8 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
|
||||
sst->load(sst->get_schema()->get_sharder()).get();
|
||||
}
|
||||
|
||||
const auto t1 = muts.front().decorated_key()._token;
|
||||
const auto t2 = muts.back().decorated_key()._token;
|
||||
const auto t1 = muts.front().decorated_key().token();
|
||||
const auto t2 = muts.back().decorated_key().token();
|
||||
dht::partition_range_vector prs;
|
||||
|
||||
prs.emplace_back(dht::ring_position::starting_at(dht::token{t1.raw() - 200}), dht::ring_position::ending_at(dht::token{t1.raw() - 100}));
|
||||
|
||||
@@ -25,7 +25,6 @@ import pytest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.internal_types import ServerUpState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -198,14 +197,8 @@ ALTERNATOR_PROXY_SERVER_CONFIG = {
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def alternator_proxy_server(manager: ManagerClient):
|
||||
"""Fixture that creates a server with Alternator proxy protocol ports enabled.
|
||||
|
||||
Waits for SERVING state to ensure Alternator ports are ready.
|
||||
"""
|
||||
server = await manager.server_add(
|
||||
config=ALTERNATOR_PROXY_SERVER_CONFIG,
|
||||
expected_server_up_state=ServerUpState.SERVING
|
||||
)
|
||||
"""Fixture that creates a server with Alternator proxy protocol ports enabled."""
|
||||
server = await manager.server_add(config=ALTERNATOR_PROXY_SERVER_CONFIG)
|
||||
yield (server, manager)
|
||||
|
||||
|
||||
|
||||
@@ -322,7 +322,7 @@ future<> require_column_has_value(cql_test_env& e, const sstring& table_name,
|
||||
auto ckey = clustering_key::from_deeply_exploded(*schema, ck);
|
||||
auto exp = expected.type()->decompose(expected);
|
||||
auto dk = dht::decorate_key(*schema, pkey);
|
||||
auto shard = cf.get_effective_replication_map()->shard_for_reads(*schema, dk._token);
|
||||
auto shard = cf.get_effective_replication_map()->shard_for_reads(*schema, dk.token());
|
||||
return e.db().invoke_on(shard, [&e, dk = std::move(dk),
|
||||
ckey = std::move(ckey),
|
||||
column_name = std::move(column_name),
|
||||
|
||||
@@ -39,4 +39,3 @@ class ServerUpState(IntEnum):
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -44,7 +44,6 @@ import platform
|
||||
import contextlib
|
||||
import fcntl
|
||||
import urllib
|
||||
import socket
|
||||
|
||||
import psutil
|
||||
|
||||
@@ -386,10 +385,6 @@ class ScyllaServer:
|
||||
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
|
||||
)
|
||||
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
|
||||
# Unix socket for receiving sd_notify messages from Scylla
|
||||
self.notify_socket_path = pathlib.Path(self.maintenance_socket_dir.name) / "notify.sock"
|
||||
self.notify_socket: Optional[socket.socket] = None
|
||||
self._received_serving = False
|
||||
self.exe = pathlib.Path(version.path).resolve()
|
||||
self.vardir = pathlib.Path(vardir)
|
||||
self.logger = logger
|
||||
@@ -717,50 +712,6 @@ class ScyllaServer:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
if self.notify_socket is not None:
|
||||
return
|
||||
# Remove existing socket file if present
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
|
||||
def _cleanup_notify_socket(self) -> None:
|
||||
"""Clean up the sd_notify socket."""
|
||||
if self.notify_socket is not None:
|
||||
self.notify_socket.close()
|
||||
self.notify_socket = None
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
|
||||
def check_serving_notification(self) -> bool:
|
||||
"""Check if Scylla has sent the 'serving' sd_notify message.
|
||||
|
||||
Returns True if the SERVING state has been reached.
|
||||
"""
|
||||
if self._received_serving:
|
||||
return True
|
||||
if self.notify_socket is None:
|
||||
return False
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
try:
|
||||
data = self.notify_socket.recv(4096)
|
||||
# sd_notify message format: "STATUS=serving\n" or "READY=1\nSTATUS=serving\n"
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
self._received_serving = True
|
||||
self.logger.debug("Received sd_notify 'serving' message")
|
||||
return True
|
||||
except BlockingIOError:
|
||||
# No more messages available
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
return False
|
||||
|
||||
async def try_get_host_id(self, api: ScyllaRESTAPIClient) -> Optional[HostID]:
|
||||
"""Try to get the host id (also tests Scylla REST API is serving)"""
|
||||
|
||||
@@ -803,10 +754,6 @@ class ScyllaServer:
|
||||
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
|
||||
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
|
||||
|
||||
# Set up socket for receiving sd_notify messages from Scylla
|
||||
self._setup_notify_socket()
|
||||
env['NOTIFY_SOCKET'] = self.notify_socket_path
|
||||
|
||||
# Reopen log file if it was closed (e.g., after a previous stop)
|
||||
if self.log_file is None or self.log_file.closed:
|
||||
self.log_file = self.log_filename.open("ab") # append mode to preserve previous logs
|
||||
@@ -861,10 +808,7 @@ class ScyllaServer:
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if server_up_state == expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
await report_error(
|
||||
f"the node has reached {server_up_state} state,"
|
||||
@@ -903,14 +847,13 @@ class ScyllaServer:
|
||||
session.execute("DROP KEYSPACE k")
|
||||
|
||||
async def shutdown_control_connection(self) -> None:
|
||||
"""Shut down driver connection and notify socket"""
|
||||
"""Shut down driver connection"""
|
||||
if self.control_connection is not None:
|
||||
self.control_connection.shutdown()
|
||||
self.control_connection = None
|
||||
if self.control_cluster is not None:
|
||||
self.control_cluster.shutdown()
|
||||
self.control_cluster = None
|
||||
self._cleanup_notify_socket()
|
||||
|
||||
@stop_event
|
||||
@start_stop_lock
|
||||
|
||||
@@ -1,92 +1,80 @@
|
||||
# Copyright 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""Conftest for Scylla GDB tests"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
# This file configures pytest for all tests in this directory, and also
|
||||
# defines common test fixtures for all of them to use. A "fixture" is some
|
||||
# setup which an individual test requires to run; The fixture has setup code
|
||||
# and teardown code, and if multiple tests require the same fixture, it can
|
||||
# be set up only once - while still allowing the user to run individual tests
|
||||
# and automatically setting up the fixtures they need.
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
import os
|
||||
import sys
|
||||
|
||||
from test.pylib.suite.python import PythonTest
|
||||
from test.pylib.util import LogPrefixAdapter
|
||||
try:
|
||||
import gdb as gdb_library
|
||||
except:
|
||||
print('This test must be run inside gdb. Run ./run instead.')
|
||||
exit(1)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
async def scylla_server(testpy_test: PythonTest | None):
|
||||
"""Return a running Scylla server instance from the active test cluster."""
|
||||
logger_prefix = testpy_test.mode + "/" + testpy_test.uname
|
||||
logger = LogPrefixAdapter(
|
||||
logging.getLogger(logger_prefix), {"prefix": logger_prefix}
|
||||
)
|
||||
scylla_cluster = await testpy_test.suite.clusters.get(logger)
|
||||
scylla_server = next(iter(scylla_cluster.running.values()))
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption('--scylla-pid', action='store', default=None,
|
||||
help='Process ID of running Scylla to attach gdb to')
|
||||
parser.addoption('--scylla-tmp-dir', action='store', default=None,
|
||||
help='Temporary directory where Scylla runs')
|
||||
|
||||
yield scylla_server
|
||||
|
||||
await testpy_test.suite.clusters.put(scylla_cluster, is_dirty=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def gdb_process(scylla_server, request):
|
||||
"""Spawn an interactive GDB attached to the Scylla process.
|
||||
|
||||
Loads `scylla-gdb.py` and test helpers (`gdb_utils.py`) so tests can run GDB/Python helpers
|
||||
against the live Scylla process.
|
||||
"""
|
||||
scylla_gdb_py = os.path.join(request.fspath.dirname, "..", "..", "scylla-gdb.py")
|
||||
script_py = os.path.join(request.fspath.dirname, "gdb_utils.py")
|
||||
cmd = (
|
||||
f"gdb -q "
|
||||
"--nx "
|
||||
"-iex 'set confirm off' "
|
||||
"-iex 'set pagination off' "
|
||||
f"-se {scylla_server.exe} "
|
||||
f"-p {scylla_server.cmd.pid} "
|
||||
f"-ex set python print-stack full "
|
||||
f"-x {scylla_gdb_py} "
|
||||
f"-x {script_py}"
|
||||
)
|
||||
gdb_process = pexpect.spawn(cmd, maxread=10, searchwindowsize=10)
|
||||
gdb_process.expect_exact("(gdb)")
|
||||
|
||||
yield gdb_process
|
||||
|
||||
gdb_process.terminate()
|
||||
|
||||
|
||||
def execute_gdb_command(
|
||||
gdb_process, scylla_command: str = None, full_command: str = None
|
||||
):
|
||||
"""
|
||||
Execute a command in an interactive GDB session and return its output.
|
||||
|
||||
The command can be provided either as a Scylla GDB command (which will be
|
||||
wrapped and executed via GDB's Python interface) or as a full raw GDB
|
||||
command string.
|
||||
|
||||
The function waits for the GDB prompt to reappear, enforces a timeout,
|
||||
and fails the test if the command does not complete or if GDB reports an
|
||||
error.
|
||||
|
||||
Args:
|
||||
gdb_process (pexpect.pty_spawn.spawn): An active GDB process spawned via pexpect
|
||||
scylla_command (str, optional): A GDB Scylla command (from scylla-gdb.py) to execute.
|
||||
full_command (str, optional): A raw GDB command string to execute.
|
||||
"""
|
||||
command = f"python gdb.execute('scylla {scylla_command}')"
|
||||
if full_command:
|
||||
command = full_command
|
||||
|
||||
gdb_process.sendline(command)
|
||||
# Scylla's "scylla-gdb.py" does two things: It configures gdb to add new
|
||||
# "scylla" commands, and it implements a bunch of useful functions in Python.
|
||||
# Doing just the former is easy (just add "-x scylla-gdb.py" when running
|
||||
# gdb), but we also want the latter - we want to be able to use some of those
|
||||
# extra functions in the test code. For that, we need to actually import the
|
||||
# scylla-gdb.py module from the test code here - and remember the module
|
||||
# object.
|
||||
@pytest.fixture(scope="session")
|
||||
def scylla_gdb(request):
|
||||
save_sys_path = sys.path
|
||||
sys.path.insert(1, sys.path[0] + '/../..')
|
||||
# Unfortunately, the file's name includes a dash which requires some
|
||||
# funky workarounds to import.
|
||||
import importlib
|
||||
try:
|
||||
gdb_process.expect_exact("(gdb)", timeout=180)
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
gdb_process.sendcontrol("c")
|
||||
gdb_process.expect_exact("(gdb)", timeout=1)
|
||||
pytest.fail("GDB command did not complete within the timeout period")
|
||||
result = gdb_process.before.decode("utf-8")
|
||||
mod = importlib.import_module("scylla-gdb")
|
||||
except Exception as e:
|
||||
pytest.exit(f'Failed to load scylla-gdb: {e}')
|
||||
sys.path = save_sys_path
|
||||
yield mod
|
||||
|
||||
assert "Error" not in result
|
||||
return result
|
||||
# "gdb" fixture, attaching to a running Scylla and letting the tests
|
||||
# run gdb commands on it. The fixture returns a module
|
||||
# The gdb fixture depends on scylla_gdb, to ensure that the "scylla"
|
||||
# subcommands are loaded into gdb.
|
||||
@pytest.fixture(scope="session")
|
||||
def gdb(request, scylla_gdb):
|
||||
try:
|
||||
gdb_library.lookup_type('size_t')
|
||||
except:
|
||||
pytest.exit('ERROR: Scylla executable was compiled without debugging '
|
||||
'information (-g) so cannot be used to test gdb. Please '
|
||||
'set SCYLLA environment variable.')
|
||||
|
||||
# The gdb tests are known to be broken on aarch64 (see
|
||||
# https://sourceware.org/bugzilla/show_bug.cgi?id=27886) and untested
|
||||
# on anything else. So skip them.
|
||||
if os.uname().machine != 'x86_64':
|
||||
pytest.skip('test/scylla-gdb/conftest.py: gdb tests skipped for non-x86_64')
|
||||
gdb_library.execute('set python print-stack full')
|
||||
scylla_pid = request.config.getoption('scylla_pid')
|
||||
gdb_library.execute(f'attach {scylla_pid}')
|
||||
# FIXME: We can start the test here, but at this point Scylla may be
|
||||
# completely idle. To make the situation more interesting (and, e.g., have
|
||||
# live live tasks for test_misc.py::task()), we can set a breakpoint and
|
||||
# let Scylla run a bit more and stop in the middle of its work. However,
|
||||
# I'm not sure where to set a break point that is actually guaranteed to
|
||||
# happen :(
|
||||
#gdb_library.execute('handle SIG34 SIG35 SIGUSR1 nostop noprint pass')
|
||||
#gdb_library.execute('break sstables::compact_sstables')
|
||||
#gdb_library.execute('continue')
|
||||
yield gdb_library
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""
|
||||
GDB helper functions for `scylla_gdb` tests.
|
||||
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
|
||||
when loaded, they can be run in gdb e.g. `python get_sstables()`
|
||||
|
||||
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
|
||||
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
|
||||
get_seastar_memory_start_and_size).
|
||||
"""
|
||||
|
||||
import gdb
|
||||
import uuid
|
||||
|
||||
|
||||
def get_schema():
|
||||
"""Execute GDB commands to get schema information."""
|
||||
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
|
||||
table = next(for_each_table(db))
|
||||
ptr = seastar_lw_shared_ptr(table['_schema']).get()
|
||||
print('schema=', ptr)
|
||||
|
||||
|
||||
def get_sstables():
|
||||
"""Execute GDB commands to get sstables information."""
|
||||
sst = next(find_sstables())
|
||||
print(f"sst=(sstables::sstable *)", sst)
|
||||
|
||||
|
||||
def get_task():
|
||||
"""
|
||||
Some commands need a task to work on. The following fixture finds one.
|
||||
Because we stopped Scylla while it was idle, we don't expect to find
|
||||
any ready task with get_local_tasks(), but we can find one with a
|
||||
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
|
||||
to it for "scylla fiber") is one from http_server::do_accept_one.
|
||||
"""
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
|
||||
if name and 'do_accept_one' in name:
|
||||
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
|
||||
break
|
||||
|
||||
|
||||
def get_coroutine():
|
||||
"""Similar to get_task(), but looks for a coroutine frame."""
|
||||
target = 'service::topology_coordinator::run() [clone .resume]'
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr)
|
||||
if name and name.strip() == target:
|
||||
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
|
||||
|
||||
|
||||
def coroutine_debug_config(tmpdir):
|
||||
"""
|
||||
Check if scylla_find agrees with find_vptrs, for debugging.
|
||||
|
||||
Execute GDB commands for coroutine debugging with detailed output.
|
||||
This test fails sometimes, but rarely and unreliably.
|
||||
We want to get a coredump from it the next time it fails.
|
||||
Sending a SIGSEGV should induce that.
|
||||
https://github.com/scylladb/scylladb/issues/22501
|
||||
"""
|
||||
target = 'service::topology_coordinator::run() [clone .resume]'
|
||||
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
|
||||
find_command = f"scylla find -a 0x{target_addr:x}"
|
||||
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
|
||||
mem_range = get_seastar_memory_start_and_size()
|
||||
gdb.execute(find_command)
|
||||
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
|
||||
gdb.write("Found coroutines:\n")
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr)
|
||||
if name and '.resume' in name.strip():
|
||||
gdb.write(f"{name}\n")
|
||||
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
|
||||
gdb.execute(f"gcore {core_filename}")
|
||||
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")
|
||||
119
test/scylla_gdb/run
Executable file
119
test/scylla_gdb/run
Executable file
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Use the run.py library from ../cqlpy:
|
||||
sys.path.insert(1, sys.path[0] + '/../cqlpy')
|
||||
import run
|
||||
|
||||
print('Scylla is: ' + run.find_scylla() + '.')
|
||||
|
||||
# Gdb will only work if the executable was built with debug symbols
|
||||
# (e.g., Scylla's release or debug build modes mode, but not dev mode).
|
||||
# Check this quickly up-front, instead of waiting for gdb to fail in a
|
||||
# mysterious way when it can't look up types.
|
||||
if not '.debug_info' in subprocess.run(['objdump', '-h', run.scylla],
|
||||
capture_output=True, text=True).stdout:
|
||||
print(f'Scylla executable was compiled without debugging information (-g)')
|
||||
print(f'so cannot be used to test gdb. Please set SCYLLA environment variable.')
|
||||
exit(1)
|
||||
|
||||
# Run Scylla, waiting until it can respond to CQL
|
||||
pid = run.run_with_temporary_dir(run.run_scylla_cmd)
|
||||
ip = run.pid_to_ip(pid)
|
||||
run.wait_for_services(pid, [lambda: run.check_cql(ip)])
|
||||
|
||||
# We do something strange here: We start pytest *inside* gdb's Python
|
||||
# interpreter. This will allow us to test various gdb commands added
|
||||
# by scylla-gdb.py inside gdb using the pytest framework.
|
||||
# TODO: consider a more straightforward implementation, where we don't
|
||||
# run pytest inside gdb - and instead run gdb as a separate process and
|
||||
# pytest just sends commands to it.
|
||||
# TODO: think if we can avoid code duplication with run.run_ptest().
|
||||
def run_pytest_in_gdb(pytest_dir, executable, additional_parameters):
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# child:
|
||||
run.run_with_temporary_dir_pids = set() # no children to clean up on child
|
||||
run.run_pytest_pids = set()
|
||||
os.chdir(pytest_dir)
|
||||
pytest_args = ['-o', 'junit_family=xunit2'] + additional_parameters
|
||||
pytest_cmd = f'print("Starting pytest {" ".join(pytest_args)}"); import pytest; sys.argv[0]="pytest"; sys.exit(pytest.main({str(pytest_args)}))'
|
||||
print(f'Starting gdb {executable}')
|
||||
sys.stdout.flush()
|
||||
args = ['gdb',
|
||||
'-batch', '-n',
|
||||
'-ex', 'set python print-stack full',
|
||||
'-ex', 'python ' + pytest_cmd,
|
||||
]
|
||||
if executable:
|
||||
args += ['-se', executable]
|
||||
os.execvp('gdb', args)
|
||||
exit(1)
|
||||
# parent:
|
||||
run.run_pytest_pids.add(pid)
|
||||
if os.waitpid(pid, 0)[1]:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def modify_junit_xml_filename(args, filename_modifier):
|
||||
"""
|
||||
Modify the filename part of --junit-xml parameter while preserving the path.
|
||||
|
||||
Args:
|
||||
args: List of command line arguments
|
||||
filename_modifier: suffix to append to the filename part of --junit-xml
|
||||
|
||||
Returns:
|
||||
Modified list of arguments
|
||||
"""
|
||||
if filename_modifier is None:
|
||||
return args
|
||||
|
||||
modified_args = []
|
||||
for arg in args:
|
||||
if arg.startswith('--junit-xml='):
|
||||
junit_xml = Path(arg.split('=', 1)[1])
|
||||
new_junit_xml = junit_xml.with_suffix(f'.{filename_modifier}{junit_xml.suffix}')
|
||||
modified_args.append(f'--junit-xml={new_junit_xml}')
|
||||
else:
|
||||
modified_args.append(arg)
|
||||
|
||||
return modified_args
|
||||
|
||||
# Interesting note: We must use "--scylla-tmp-dir=DIR" here instead of
|
||||
# "--scylla-tmp-dir DIR": While the latter does work, pytest has a bug that
|
||||
# its command-line parser finds the given directory name in the original
|
||||
# command line, saves it as "initialpaths", and uses it to print what it
|
||||
# thinks are nice (but are really incorrect) relative paths for "nodes" (test
|
||||
# source files).
|
||||
success = True
|
||||
for with_scylla in [True, False]:
|
||||
modified_argv = None
|
||||
if with_scylla:
|
||||
args = ['--scylla-pid='+str(pid),
|
||||
'--scylla-tmp-dir='+run.pid_to_dir(pid),
|
||||
'-m', 'not without_scylla']
|
||||
executable = run.scylla
|
||||
modified_argv = modify_junit_xml_filename(sys.argv[1:], 'with_scylla')
|
||||
else:
|
||||
args = ['-m', 'without_scylla']
|
||||
executable = ''
|
||||
modified_argv = modify_junit_xml_filename(sys.argv[1:], 'without_scylla')
|
||||
if not run_pytest_in_gdb(sys.path[0], executable, args + modified_argv):
|
||||
success = False
|
||||
|
||||
run.summary = 'Scylla GDB tests pass' if success else 'Scylla GDB tests failure'
|
||||
|
||||
exit(0 if success else 1)
|
||||
|
||||
# Note that the run.cleanup_all() function runs now, just like on any exit
|
||||
# for any reason in this script. It will delete the temporary files and
|
||||
# announce the failure or success of the test (printing run.summary).
|
||||
3
test/scylla_gdb/suite.yaml
Normal file
3
test/scylla_gdb/suite.yaml
Normal file
@@ -0,0 +1,3 @@
|
||||
type: Run
|
||||
run_in_release:
|
||||
- run
|
||||
@@ -1,76 +0,0 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""
|
||||
Basic tests for commands that does not require additional options.
|
||||
Each only checks that the command does not fail - but not what it does or returns.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug"],
|
||||
reason="Scylla was built without debug symbols; use release mode",
|
||||
),
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug", "release"],
|
||||
platform_key="aarch64",
|
||||
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command",
|
||||
[
|
||||
"features",
|
||||
"compaction-tasks",
|
||||
"databases",
|
||||
"commitlog",
|
||||
"tables",
|
||||
"table system.local",
|
||||
"tablet-metadata",
|
||||
"keyspaces",
|
||||
"active-sstables",
|
||||
"sstables",
|
||||
"memtables",
|
||||
"repairs",
|
||||
"gms",
|
||||
"heapprof",
|
||||
"io-queues",
|
||||
"cache",
|
||||
"mem-range",
|
||||
"mem-ranges",
|
||||
"memory",
|
||||
"segment-descs",
|
||||
"small-object -o 32 --random-page",
|
||||
"small-object -o 64 --summarize",
|
||||
"large-objects -o 131072 --random-page",
|
||||
"large-objects -o 32768 --summarize",
|
||||
"lsa",
|
||||
"netw",
|
||||
"smp-queues",
|
||||
"task-queues",
|
||||
"task_histogram",
|
||||
"task_histogram -a",
|
||||
"tasks",
|
||||
"threads",
|
||||
"timers",
|
||||
"get-config-value compaction_static_shares",
|
||||
"read-stats",
|
||||
"prepared-statements",
|
||||
],
|
||||
)
|
||||
def test_scylla_commands(gdb_process, command):
|
||||
execute_gdb_command(gdb_process, command)
|
||||
|
||||
|
||||
def test_nonexistent_scylla_command(gdb_process):
|
||||
"""Verifies that running unknown command will produce correct error message"""
|
||||
with pytest.raises(
|
||||
AssertionError, match=r'Undefined scylla command: "nonexistent_command"'
|
||||
):
|
||||
execute_gdb_command(gdb_process, "nonexistent_command")
|
||||
@@ -1 +0,0 @@
|
||||
type: Python
|
||||
234
test/scylla_gdb/test_misc.py
Normal file
234
test/scylla_gdb/test_misc.py
Normal file
@@ -0,0 +1,234 @@
|
||||
import pytest
|
||||
import re
|
||||
import uuid
|
||||
|
||||
# Convenience function to execute a scylla command in gdb, returning its
|
||||
# output as a string - or a gdb.error exception.
|
||||
def scylla(gdb, cmd):
|
||||
return gdb.execute('scylla ' + cmd, from_tty=False, to_string=True)
|
||||
|
||||
# Check that trying an unknown subcommand of the "scylla" subcommand
|
||||
# produces the right error message.
|
||||
def test_nonexistent_scylla_command(gdb):
|
||||
with pytest.raises(gdb.error, match='Undefined scylla command'):
|
||||
scylla(gdb, 'nonexistent_command')
|
||||
|
||||
# Minimal test for some of the commands. Each only checks that the command
|
||||
# does not fail - but not what it does or returns. These tests are still
|
||||
# useful - importantly, they can detect that one of the commands relies on
|
||||
# some internal implementation detail which no longer works, and needs to
|
||||
# be fixed.
|
||||
|
||||
def test_features(gdb):
|
||||
scylla(gdb, 'features')
|
||||
|
||||
def test_compaction_tasks(gdb):
|
||||
scylla(gdb, 'compaction-tasks')
|
||||
|
||||
def test_databases(gdb):
|
||||
scylla(gdb, 'databases')
|
||||
|
||||
def test_commitlog(gdb):
|
||||
scylla(gdb, 'commitlog')
|
||||
|
||||
def test_tables(gdb):
|
||||
scylla(gdb, 'tables')
|
||||
|
||||
def test_table(gdb):
|
||||
scylla(gdb, 'table system.local')
|
||||
|
||||
def test_tablet_metadata(gdb):
|
||||
scylla(gdb, 'tablet-metadata')
|
||||
|
||||
def test_keyspaces(gdb):
|
||||
scylla(gdb, 'keyspaces')
|
||||
|
||||
def test_active_sstables(gdb):
|
||||
scylla(gdb, 'active-sstables')
|
||||
|
||||
def test_sstables(gdb):
|
||||
scylla(gdb, 'sstables')
|
||||
|
||||
def test_memtables(gdb):
|
||||
scylla(gdb, 'memtables')
|
||||
|
||||
def test_repairs(gdb):
|
||||
scylla(gdb, 'repairs')
|
||||
|
||||
def test_gms(gdb):
|
||||
scylla(gdb, 'gms')
|
||||
|
||||
def test_heapprof(gdb):
|
||||
scylla(gdb, 'heapprof')
|
||||
|
||||
def test_io_queues(gdb):
|
||||
scylla(gdb, 'io-queues')
|
||||
|
||||
def test_cache(gdb):
|
||||
scylla(gdb, 'cache')
|
||||
|
||||
def test_mem_range(gdb):
|
||||
scylla(gdb, 'mem-range')
|
||||
|
||||
def test_mem_ranges(gdb):
|
||||
scylla(gdb, 'mem-ranges')
|
||||
|
||||
def test_memory(gdb):
|
||||
scylla(gdb, 'memory')
|
||||
|
||||
def test_segment_descs(gdb):
|
||||
scylla(gdb, 'segment-descs')
|
||||
|
||||
def test_small_object_1(gdb):
|
||||
scylla(gdb, 'small-object -o 32 --random-page')
|
||||
|
||||
def test_small_object_2(gdb):
|
||||
scylla(gdb, 'small-object -o 64 --summarize')
|
||||
|
||||
def test_large_objects_1(gdb):
|
||||
scylla(gdb, 'large-objects -o 131072 --random-page')
|
||||
|
||||
def test_large_objects_2(gdb):
|
||||
scylla(gdb, 'large-objects -o 32768 --summarize')
|
||||
|
||||
def test_lsa(gdb):
|
||||
scylla(gdb, 'lsa')
|
||||
|
||||
def test_netw(gdb):
|
||||
scylla(gdb, 'netw')
|
||||
|
||||
def test_smp_queues(gdb):
|
||||
scylla(gdb, 'smp-queues')
|
||||
|
||||
def test_task_queues(gdb):
|
||||
scylla(gdb, 'task-queues')
|
||||
|
||||
def test_task_histogram(gdb):
|
||||
scylla(gdb, 'task_histogram')
|
||||
|
||||
def test_task_histogram_coro(gdb):
|
||||
h = scylla(gdb, 'task_histogram -a')
|
||||
if re.search(r'\) \[clone \.\w+\]', h) is None:
|
||||
raise gdb.error('no coroutine entries in task histogram')
|
||||
|
||||
def test_tasks(gdb):
|
||||
scylla(gdb, 'tasks')
|
||||
|
||||
def test_threads(gdb):
|
||||
scylla(gdb, 'threads')
|
||||
|
||||
def test_timers(gdb):
|
||||
scylla(gdb, 'timers')
|
||||
|
||||
# Some commands need a schema to work on. The following fixture finds
|
||||
# one (the schema of the first table - note that even without any user
|
||||
# tables, we will always have system tables).
|
||||
@pytest.fixture(scope="module")
|
||||
def schema(gdb, scylla_gdb):
|
||||
db = scylla_gdb.sharded(gdb.parse_and_eval('::debug::the_database')).local()
|
||||
table = next(scylla_gdb.for_each_table(db))
|
||||
gdb.set_convenience_variable('schema',
|
||||
scylla_gdb.seastar_lw_shared_ptr(table['_schema']).get())
|
||||
yield '$schema'
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def sstable(gdb, scylla_gdb):
|
||||
db = scylla_gdb.sharded(gdb.parse_and_eval('::debug::the_database')).local()
|
||||
sst = next(scylla_gdb.find_sstables())
|
||||
gdb.set_convenience_variable('sst', sst)
|
||||
yield '$sst'
|
||||
|
||||
def test_schema(gdb, schema):
|
||||
scylla(gdb, f'schema {schema}')
|
||||
|
||||
def test_find(gdb, schema):
|
||||
scylla(gdb, f'find -r {schema}')
|
||||
|
||||
def test_ptr(gdb, schema):
|
||||
scylla(gdb, f'ptr {schema}')
|
||||
|
||||
def test_generate_object_graph(gdb, schema, request):
|
||||
tmpdir = request.config.getoption('scylla_tmp_dir')
|
||||
scylla(gdb, f'generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}')
|
||||
|
||||
# Some commands need a task to work on. The following fixture finds one.
|
||||
# Because we stopped Scylla while it was idle, we don't expect to find
|
||||
# any ready task with get_local_tasks(), but we can find one with a
|
||||
# find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
|
||||
# to it for "scylla fiber") is one from http_server::do_accept_one.
|
||||
@pytest.fixture(scope="module")
|
||||
def task(gdb, scylla_gdb):
|
||||
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
|
||||
name = scylla_gdb.resolve(vtable_addr, startswith='vtable for seastar::continuation')
|
||||
if name and 'do_accept_one' in name:
|
||||
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
|
||||
raise gdb.error("no tasks found with expected name")
|
||||
|
||||
def test_fiber(gdb, task):
|
||||
scylla(gdb, f'fiber {task}')
|
||||
|
||||
# Similar to task(), but looks for a coroutine frame.
|
||||
@pytest.fixture(scope="module")
|
||||
def coro_task(gdb, scylla_gdb, request):
|
||||
target = 'service::topology_coordinator::run() [clone .resume]'
|
||||
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
|
||||
name = scylla_gdb.resolve(vtable_addr)
|
||||
if name and name.strip() == target:
|
||||
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
|
||||
# Something is wrong. We should have found the target.
|
||||
# Check if scylla_find agrees with find_vptrs, for debugging.
|
||||
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
|
||||
find_command = f"scylla find -a 0x{target_addr:x}"
|
||||
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
|
||||
mem_range = scylla_gdb.get_seastar_memory_start_and_size()
|
||||
gdb.execute(find_command)
|
||||
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
|
||||
gdb.write(f"Found coroutines:\n")
|
||||
for obj_addr, vtable_addr in scylla_gdb.find_vptrs():
|
||||
name = scylla_gdb.resolve(vtable_addr)
|
||||
if name and '.resume' in name.strip():
|
||||
gdb.write(f'{name}\n')
|
||||
# This test fails sometimes, but rarely and unreliably.
|
||||
# We want to get a coredump from it the next time it fails.
|
||||
# https://github.com/scylladb/scylladb/issues/22501
|
||||
tmpdir = request.config.getoption('scylla_tmp_dir')
|
||||
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
|
||||
gdb.execute(f"gcore {core_filename}")
|
||||
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")
|
||||
|
||||
def test_coro_frame(gdb, coro_task):
|
||||
# Note the offset by two words.
|
||||
# This moves the pointer from the outer coroutine frame to the inner seastar::task.
|
||||
# $coro_frame expects a seastar::task*.
|
||||
gdb.execute(f'p *$coro_frame({coro_task} + 16)')
|
||||
|
||||
def test_sstable_summary(gdb, sstable):
|
||||
scylla(gdb, f'sstable-summary {sstable}')
|
||||
|
||||
def test_sstable_index_cache(gdb, sstable):
|
||||
scylla(gdb, f'sstable-index-cache {sstable}')
|
||||
|
||||
def test_read_stats(gdb, sstable):
|
||||
scylla(gdb, f'read-stats')
|
||||
|
||||
def test_get_config_value(gdb):
|
||||
scylla(gdb, f'get-config-value compaction_static_shares')
|
||||
|
||||
def test_prepared_statements(gdb):
|
||||
scylla(gdb, f'prepared-statements')
|
||||
|
||||
@pytest.mark.without_scylla
|
||||
def test_run_without_scylla(scylla_gdb):
|
||||
# just try to load the scylla-gdb module without attaching to scylla.
|
||||
#
|
||||
# please note, if this test fails, there are good chances that scylla-gdb.py
|
||||
# is unable to load without debug symbols. Calls to "gdb.lookup_type()" and
|
||||
# similar functions that rely on debug symbols should be made within GDB
|
||||
# commands themselves when they get exuecuted. To address potential
|
||||
# failures, consider moving code that references debug symbols into a code
|
||||
# path executed only when debug symbols is loaded. If the value of the
|
||||
# symbol is a constant, consider caching it. using functools.cache
|
||||
# decorator.
|
||||
_ = scylla_gdb
|
||||
|
||||
# FIXME: need a simple test for lsa-segment
|
||||
@@ -1,57 +0,0 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""
|
||||
Tests for commands, that need a schema to work on.
|
||||
Each only checks that the command does not fail - but not what it does or returns.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import re
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug"],
|
||||
reason="Scylla was built without debug symbols; use release mode",
|
||||
),
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug", "release"],
|
||||
platform_key="aarch64",
|
||||
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def schema(gdb_process):
|
||||
"""
|
||||
Returns pointer to schema of the first table it finds
|
||||
Even without any user tables, we will always have system tables.
|
||||
"""
|
||||
result = execute_gdb_command(gdb_process, full_command="python get_schema()")
|
||||
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
|
||||
assert match, f"Failed to find schema pointer in response: {result}"
|
||||
schema_pointer = match.group(1) if match else None
|
||||
|
||||
return schema_pointer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command",
|
||||
[
|
||||
"find -r",
|
||||
"ptr",
|
||||
"schema (const schema *)", # `schema` requires type-casted pointer
|
||||
],
|
||||
)
|
||||
def test_schema(gdb_process, command, schema):
|
||||
execute_gdb_command(gdb_process, f"{command} {schema}")
|
||||
|
||||
|
||||
def test_generate_object_graph(gdb_process, schema, request):
|
||||
tmpdir = request.config.getoption("--tmpdir")
|
||||
execute_gdb_command(
|
||||
gdb_process, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
|
||||
)
|
||||
@@ -1,46 +0,0 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""
|
||||
Tests for commands, that need a sstable to work on.
|
||||
Each only checks that the command does not fail - but not what it does or returns.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import re
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug"],
|
||||
reason="Scylla was built without debug symbols; use release mode",
|
||||
),
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug", "release"],
|
||||
platform_key="aarch64",
|
||||
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def sstable(gdb_process):
|
||||
"""Finds sstable"""
|
||||
result = execute_gdb_command(gdb_process, full_command="python get_sstables()")
|
||||
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
|
||||
assert match is not None, "No sstable was present in result.stdout"
|
||||
sstable_pointer = match.group(0).strip() if match else None
|
||||
|
||||
return sstable_pointer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command",
|
||||
[
|
||||
"sstable-summary",
|
||||
"sstable-index-cache",
|
||||
],
|
||||
)
|
||||
def test_sstable(gdb_process, command, sstable):
|
||||
execute_gdb_command(gdb_process, f"{command} {sstable}")
|
||||
@@ -1,85 +0,0 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
"""
|
||||
Tests for commands, that need a some task to work on.
|
||||
Each only checks that the command does not fail - but not what it does or returns.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug"],
|
||||
reason="Scylla was built without debug symbols; use release mode",
|
||||
),
|
||||
pytest.mark.skip_mode(
|
||||
mode=["dev", "debug", "release"],
|
||||
platform_key="aarch64",
|
||||
reason="GDB is broken on aarch64: https://sourceware.org/bugzilla/show_bug.cgi?id=27886",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def task(gdb_process):
|
||||
"""
|
||||
Finds a Scylla fiber task using a `find_vptrs()` loop.
|
||||
|
||||
Since Scylla is fresh‑booted, `get_local_tasks()` returns nothing.
|
||||
Nevertheless, a `find_vptrs()` scan can still discover the first task
|
||||
skeleton created by `http_server::do_accept_one` (often the earliest
|
||||
“Scylla fiber” to appear).
|
||||
"""
|
||||
result = execute_gdb_command(gdb_process, full_command="python get_task()")
|
||||
match = re.search(r"task=(\d+)", result)
|
||||
assert match is not None, f"No task was present in {result.stdout}"
|
||||
task = match.group(1) if match else None
|
||||
return task
|
||||
|
||||
|
||||
def test_fiber(gdb_process, task):
|
||||
execute_gdb_command(gdb_process, f"fiber {task}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def coroutine_task(gdb_process, scylla_server):
|
||||
"""
|
||||
Finds a coroutine task, similar to the `task` fixture.
|
||||
|
||||
This fixture executes the `coroutine_config` script in GDB to locate a
|
||||
specific coroutine task. If the task is not found, the `coroutine_debug_config`
|
||||
debugging script is called which checks if scylla_find agrees with find_vptrs.
|
||||
|
||||
This debugging script then forces a coredump to capture additional
|
||||
diagnostic information before the test is marked as failed.
|
||||
Coredump is saved to `testlog/release/{scylla}`.
|
||||
"""
|
||||
result = execute_gdb_command(gdb_process, full_command="python get_coroutine()")
|
||||
match = re.search(r"coroutine_config=\s*(.*)", result)
|
||||
if not match:
|
||||
result = execute_gdb_command(
|
||||
gdb_process,
|
||||
full_command=f"python coroutine_debug_config('{scylla_server.workdir}')",
|
||||
)
|
||||
pytest.fail(
|
||||
f"Failed to find coroutine task. Debugging logs have been collected\n"
|
||||
f"Debugging code result: {result}\n"
|
||||
)
|
||||
|
||||
return match.group(1).strip()
|
||||
|
||||
|
||||
def test_coroutine_frame(gdb_process, coroutine_task):
|
||||
"""
|
||||
Offsets the pointer by two words to shift from the outer coroutine frame
|
||||
to the inner `seastar::task`, as required by `$coro_frame`, which expects
|
||||
a `seastar::task*`.
|
||||
"""
|
||||
execute_gdb_command(
|
||||
gdb_process, full_command=f"p *$coro_frame({coroutine_task} + 16)"
|
||||
)
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-43-20260128
|
||||
docker.io/scylladb/scylla-toolchain:fedora-43-20260113
|
||||
|
||||
Reference in New Issue
Block a user