Compare commits
4 Commits
copilot/fi
...
copilot/im
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98fafb25b2 | ||
|
|
b17de07c43 | ||
|
|
4b7f760a38 | ||
|
|
c824803a24 |
2
.github/scripts/auto-backport.py
vendored
2
.github/scripts/auto-backport.py
vendored
@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
|
||||
if is_draft:
|
||||
labels_to_add.append("conflicts")
|
||||
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
|
||||
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
|
||||
pr_comment += "Please resolve them and mark this PR as ready for review"
|
||||
backport_pr.create_issue_comment(pr_comment)
|
||||
|
||||
# Apply all labels at once if we have any
|
||||
|
||||
@@ -3055,44 +3055,17 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
if (!cas_shard.this_shard()) {
|
||||
_stats.shard_bounce_for_lwt++;
|
||||
return container().invoke_on(cas_shard.shard(), _ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = mutation_builders,
|
||||
&dk,
|
||||
ks = schema->ks_name(),
|
||||
cf = schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(executor& self) mutable {
|
||||
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt), &self]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
|
||||
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
|
||||
auto timeout = executor::default_timeout();
|
||||
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
|
||||
auto* op_ptr = op.get();
|
||||
auto cdc_opts = cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility =
|
||||
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
};
|
||||
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
|
||||
@@ -3118,11 +3091,13 @@ struct schema_decorated_key_equal {
|
||||
|
||||
// FIXME: if we failed writing some of the mutations, need to return a list
|
||||
// of these failed mutations rather than fail the whole write (issue #5650).
|
||||
future<> executor::do_batch_write(
|
||||
static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
smp_service_group ssg,
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
service_permit permit,
|
||||
stats& stats) {
|
||||
if (mutation_builders.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -3144,7 +3119,7 @@ future<> executor::do_batch_write(
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
any_cdc_enabled |= b.first->cdc_options().enabled();
|
||||
}
|
||||
return _proxy.mutate(std::move(mutations),
|
||||
return proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
executor::default_timeout(),
|
||||
trace_state,
|
||||
@@ -3153,7 +3128,7 @@ future<> executor::do_batch_write(
|
||||
false,
|
||||
cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
});
|
||||
} else {
|
||||
// Do the write via LWT:
|
||||
@@ -3165,35 +3140,46 @@ future<> executor::do_batch_write(
|
||||
schema_decorated_key_hash,
|
||||
schema_decorated_key_equal>;
|
||||
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto&& b : std::move(mutation_builders)) {
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
|
||||
.schema = b.first,
|
||||
.dk = dht::decorate_key(*b.first, b.second.pk())
|
||||
});
|
||||
for (auto& b : mutation_builders) {
|
||||
auto dk = dht::decorate_key(*b.first, b.second.pk());
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
|
||||
it->second.push_back(std::move(b.second));
|
||||
}
|
||||
auto* key_builders_ptr = key_builders.get();
|
||||
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
|
||||
_stats.write_using_lwt++;
|
||||
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
|
||||
auto s = e.first.schema;
|
||||
if (desired_shard.this_shard()) {
|
||||
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
|
||||
} else {
|
||||
stats.shard_bounce_for_lwt++;
|
||||
return proxy.container().invoke_on(desired_shard.shard(), ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = e.second,
|
||||
&dk = e.first.dk,
|
||||
ks = e.first.schema->ks_name(),
|
||||
cf = e.first.schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(service::storage_proxy& proxy) mutable {
|
||||
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.data_dictionary().find_schema(ks, cf);
|
||||
|
||||
static const auto* injection_name = "alternator_executor_batch_write_wait";
|
||||
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
|
||||
const auto ks = handler.get("keyspace");
|
||||
const auto cf = handler.get("table");
|
||||
const auto shard = std::atoll(handler.get("shard")->data());
|
||||
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
|
||||
elogger.info("{}: hit", injection_name);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
elogger.info("{}: continue", injection_name);
|
||||
}
|
||||
}).then([&e, desired_shard = std::move(desired_shard),
|
||||
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
|
||||
{
|
||||
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
|
||||
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
|
||||
});
|
||||
// The desired_shard on the original shard remains alive for the duration
|
||||
// of cas_write on this shard and prevents any tablet operations.
|
||||
// However, we need a local instance of cas_shard on this shard
|
||||
// to pass it to sp::cas, so we just create a new one.
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
}).finally([desired_shard = std::move(desired_shard)]{});
|
||||
}
|
||||
}).finally([key_builders = std::move(key_builders)]{});
|
||||
}
|
||||
}
|
||||
@@ -3341,7 +3327,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
|
||||
_stats.api_operations.batch_write_item_batch_total += total_items;
|
||||
_stats.api_operations.batch_write_item_histogram.add(total_items);
|
||||
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
|
||||
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
|
||||
// FIXME: Issue #5650: If we failed writing some of the updates,
|
||||
// need to return a list of these failed updates in UnprocessedItems
|
||||
// rather than fail the whole write (issue #5650).
|
||||
|
||||
@@ -40,7 +40,6 @@ namespace cql3::selection {
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class cas_shard;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -58,7 +57,6 @@ class schema_builder;
|
||||
namespace alternator {
|
||||
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
@@ -221,16 +219,6 @@ private:
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
future<> do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit);
|
||||
|
||||
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
|
||||
public:
|
||||
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
|
||||
@@ -979,8 +979,9 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
// and keep "driver_version" unset.
|
||||
cd.driver_name = _user_agent;
|
||||
// Leave "protocol_version" unset, it has no meaning in Alternator.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
|
||||
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
|
||||
// As reported in issue #9216, we never set these fields in CQL
|
||||
// either (see cql_server::connection::make_client_data()).
|
||||
return cd;
|
||||
}
|
||||
|
||||
|
||||
399
db/view/view.cc
399
db/view/view.cc
@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
|
||||
&& std::ranges::contains(shards, this_shard_id());
|
||||
}
|
||||
|
||||
static endpoints_to_update get_view_natural_endpoint_vnodes(
|
||||
locator::host_id me,
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
locator::endpoint_dc_rack my_location,
|
||||
const locator::network_topology_strategy* network_topology,
|
||||
replica::cf_stats& cf_stats) {
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
auto& my_datacenter = my_location.dc;
|
||||
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (!network_topology || node.get().dc() == my_datacenter) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
}
|
||||
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
|
||||
}
|
||||
|
||||
static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
||||
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
|
||||
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
|
||||
replica::cf_stats& cf_stats) {
|
||||
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
|
||||
for (auto&& base_node : base_nodes) {
|
||||
if (base_dc_racks.contains(base_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
|
||||
base_node.get().dc(), base_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
base_dc_racks.insert(base_node.get().dc_rack());
|
||||
}
|
||||
|
||||
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
|
||||
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
|
||||
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
|
||||
view_node.get().dc(), view_node.get().rack());
|
||||
return std::nullopt;
|
||||
}
|
||||
// Track unpaired replicas in both sets
|
||||
if (base_dc_racks.contains(view_node.get().dc_rack())) {
|
||||
paired_view_dc_racks.insert(view_node.get().dc_rack());
|
||||
} else {
|
||||
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
|
||||
}
|
||||
}
|
||||
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// There are view replicas that can't be paired with any base replica
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
|
||||
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
|
||||
if (unpaired_view_dc_rack_replicas.size() > 0) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
unpaired_view_dc_rack_replicas | std::views::values);
|
||||
}
|
||||
return extra_replica;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// Calculate the node ("natural endpoint") to which this node should send
|
||||
// a view update.
|
||||
//
|
||||
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
|
||||
// of this function is to find, assuming that this node is one of the base
|
||||
// replicas for a given partition, the paired view replica.
|
||||
//
|
||||
// When using vnodes, we have an optimization called "self-pairing" - if a single
|
||||
// node is both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node sends the update to itself and this node is removed
|
||||
// from the lists of nodes paired by index. This self-pairing optimization can
|
||||
// cause the pairing to change after view ranges are moved between nodes.
|
||||
// In the past, we used an optimization called "self-pairing" that if a single
|
||||
// node was both a base replica and a view replica for a write, the pairing is
|
||||
// modified so that this node would send the update to itself. This self-
|
||||
// pairing optimization could cause the pairing to change after view ranges
|
||||
// are moved between nodes, so currently we only use it if
|
||||
// use_legacy_self_pairing is set to true. When using tablets - where range
|
||||
// movements are common - it is strongly recommended to set it to false.
|
||||
//
|
||||
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
|
||||
// we pair only nodes in the same datacenter.
|
||||
//
|
||||
// If the table uses tablets, then pairing is rack-aware. In this case, in each
|
||||
// rack where we have a base replica there is also one replica of each view tablet.
|
||||
// Therefore, the base replicas are naturally paired with the view replicas that
|
||||
// are in the same rack.
|
||||
// When use_legacy_self_pairing is enabled, if one of the base replicas
|
||||
// also happens to be a view replica, it is paired with itself
|
||||
// (with the other nodes paired by order in the list
|
||||
// after taking this node out).
|
||||
//
|
||||
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
|
||||
// and the replication factor in the node's datacenter is a multiple of the number
|
||||
// of racks in the datacenter, then pairing is rack-aware. In this case,
|
||||
// all racks have the same number of replicas, and those are never migrated
|
||||
// outside their racks. Therefore, the base replicas are naturally paired with the
|
||||
// view replicas that are in the same rack, based on the ordinal position.
|
||||
// Note that typically, there is a single replica per rack and pairing is trivial.
|
||||
//
|
||||
// If the assumption that the given base token belongs to this replica
|
||||
// does not hold, we return an empty optional.
|
||||
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_tablets,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats) {
|
||||
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
|
||||
auto& my_location = topology.get_location(me);
|
||||
auto& my_datacenter = my_location.dc;
|
||||
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
|
||||
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
|
||||
bool simple_rack_aware_pairing = false;
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
node_vector orig_base_endpoints, orig_view_endpoints;
|
||||
node_vector base_endpoints, view_endpoints;
|
||||
|
||||
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
|
||||
if (auto* np = topology.find_node(ep)) {
|
||||
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
|
||||
// We need to use get_replicas() for pairing to be stable in case base or view tablet
|
||||
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
|
||||
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
|
||||
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
|
||||
return resolve(topology, ep, false);
|
||||
}) | std::ranges::to<node_vector>();
|
||||
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
|
||||
auto leaving_base = it->get().host_id();
|
||||
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
|
||||
view_token, use_tablets, cf_stats);
|
||||
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!use_tablets) {
|
||||
return get_view_natural_endpoint_vnodes(
|
||||
me,
|
||||
base_nodes,
|
||||
view_nodes,
|
||||
my_location,
|
||||
network_topology,
|
||||
cf_stats);
|
||||
std::function<bool(const locator::node&)> is_candidate;
|
||||
if (network_topology) {
|
||||
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
|
||||
} else {
|
||||
is_candidate = [&] (const locator::node&) { return true; };
|
||||
}
|
||||
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
|
||||
if (is_candidate(node)) {
|
||||
nodes.emplace_back(node);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto&& base_node : base_nodes) {
|
||||
process_candidate(base_endpoints, base_node);
|
||||
}
|
||||
|
||||
std::optional<locator::host_id> paired_replica;
|
||||
for (auto&& view_node : view_nodes) {
|
||||
if (view_node.get().dc_rack() == my_location) {
|
||||
paired_replica = view_node.get().host_id();
|
||||
break;
|
||||
if (use_legacy_self_pairing) {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
// We don't return an extra endpoint, as it's only needed when
|
||||
// using tablets (so !use_legacy_self_pairing)
|
||||
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
|
||||
return {.natural_endpoint = me};
|
||||
}
|
||||
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (is_candidate(view_node)) {
|
||||
view_endpoints.push_back(view_node);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto&& view_node : view_nodes) {
|
||||
process_candidate(view_endpoints, view_node);
|
||||
}
|
||||
}
|
||||
if (paired_replica && base_nodes.size() == view_nodes.size()) {
|
||||
// We don't need to find any extra replicas, so we can return early
|
||||
return {.natural_endpoint = paired_replica};
|
||||
|
||||
// Try optimizing for simple rack-aware pairing
|
||||
// If the numbers of base and view replica differ, that means an RF change is taking place
|
||||
// and we can't use simple rack-aware pairing.
|
||||
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
|
||||
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
|
||||
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
|
||||
// Simple rack-aware pairing is possible when the datacenter replication factor
|
||||
// is a multiple of the number of racks in the datacenter.
|
||||
if (dc_rf % racks.size() == 0) {
|
||||
simple_rack_aware_pairing = true;
|
||||
size_t rack_rf = dc_rf / racks.size();
|
||||
// If any rack doesn't have enough nodes to satisfy the per-rack rf
|
||||
// simple rack-aware pairing is disabled.
|
||||
for (const auto& [rack, nodes] : racks) {
|
||||
if (nodes.size() < rack_rf) {
|
||||
simple_rack_aware_pairing = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (dc_rf != base_endpoints.size()) {
|
||||
// If the datacenter replication factor is not equal to the number of base replicas,
|
||||
// we're in progress of a RF change and we can't use simple rack-aware pairing.
|
||||
simple_rack_aware_pairing = false;
|
||||
}
|
||||
if (simple_rack_aware_pairing) {
|
||||
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
|
||||
}
|
||||
}
|
||||
if (!paired_replica) {
|
||||
// We couldn't find any view replica in our rack
|
||||
|
||||
orig_base_endpoints = base_endpoints;
|
||||
orig_view_endpoints = view_endpoints;
|
||||
|
||||
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
|
||||
// Use best-match, for the minimum number of base and view replicas in each rack,
|
||||
// and ordinal match for the rest.
|
||||
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
|
||||
if (rack_aware_pairing && !simple_rack_aware_pairing) {
|
||||
struct indexed_replica {
|
||||
size_t idx;
|
||||
std::reference_wrapper<const locator::node> node;
|
||||
};
|
||||
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
|
||||
|
||||
// First, index all replicas by rack
|
||||
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
|
||||
size_t idx = 0;
|
||||
for (const auto& r: replicas) {
|
||||
racks[r.get().rack()].emplace_back(idx++, r);
|
||||
}
|
||||
};
|
||||
index_replica_set(base_racks, base_endpoints);
|
||||
index_replica_set(view_racks, view_endpoints);
|
||||
|
||||
// Try optimistically pairing `me` first
|
||||
const auto& my_base_replicas = base_racks[my_location.rack];
|
||||
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
|
||||
if (base_it == my_base_replicas.end()) {
|
||||
return {};
|
||||
}
|
||||
const auto& my_view_replicas = view_racks[my_location.rack];
|
||||
size_t idx = base_it - my_base_replicas.begin();
|
||||
if (idx < my_view_replicas.size()) {
|
||||
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
|
||||
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
|
||||
} else {
|
||||
// If the number of view replicas is larger than the number of base replicas,
|
||||
// we need to find the unpaired view replica, so we can't return yet.
|
||||
paired_replica = my_view_replicas[idx].node;
|
||||
}
|
||||
}
|
||||
|
||||
// Collect all unpaired base and view replicas,
|
||||
// where the number of replicas in the base rack is different than the respective view rack
|
||||
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
|
||||
for (const auto& [rack, base_replicas] : base_racks) {
|
||||
const auto& view_replicas = view_racks[rack];
|
||||
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
|
||||
unpaired_base_replicas.emplace_back(base_replicas[i]);
|
||||
}
|
||||
}
|
||||
for (const auto& [rack, view_replicas] : view_racks) {
|
||||
const auto& base_replicas = base_racks[rack];
|
||||
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
|
||||
unpaired_view_replicas.emplace_back(view_replicas[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by the original ordinality, and copy the sorted results
|
||||
// back into {base,view}_endpoints, for backward compatible processing below.
|
||||
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
base_endpoints.clear();
|
||||
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
|
||||
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
|
||||
view_endpoints.clear();
|
||||
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
|
||||
}
|
||||
|
||||
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
|
||||
if (!paired_replica && base_it == base_endpoints.end()) {
|
||||
// This node is not a base replica of this key, so we return empty
|
||||
// FIXME: This case shouldn't happen, and if it happens, a view update
|
||||
// would be lost.
|
||||
++cf_stats.total_view_updates_on_wrong_node;
|
||||
vlogger.warn("Could not find {} in base_endpoints={}", me,
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
}
|
||||
size_t idx = base_it - base_endpoints.begin();
|
||||
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
|
||||
if (!paired_replica && idx >= view_endpoints.size()) {
|
||||
// There are fewer view replicas than base replicas
|
||||
// FIXME: This might still happen when reducing replication factor with tablets,
|
||||
// see https://github.com/scylladb/scylladb/issues/21492
|
||||
++cf_stats.total_view_updates_failed_pairing;
|
||||
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
|
||||
me,
|
||||
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
|
||||
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
|
||||
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
|
||||
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
return {};
|
||||
} else if (base_endpoints.size() < view_endpoints.size()) {
|
||||
// There are fewer base replicas than view replicas.
|
||||
// This can happen as a result of an RF change when the view replica finishes streaming
|
||||
// before the base replica.
|
||||
// Because of this, a view replica might not get paired with any base replica, so we need
|
||||
// to send an additional update to it.
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_endpoints.back();
|
||||
if (base_endpoints.size() < view_endpoints.size() - 1) {
|
||||
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
|
||||
// but we'll still perform updates to the paired and last replicas to minimize degradation.
|
||||
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
|
||||
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
|
||||
}
|
||||
}
|
||||
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
|
||||
return {.natural_endpoint = paired_replica,
|
||||
.endpoint_with_no_pairing = no_pairing_replica};
|
||||
|
||||
if (!paired_replica) {
|
||||
paired_replica = view_endpoints[idx];
|
||||
}
|
||||
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
|
||||
// This can happen when the view replica with no pairing is in another DC.
|
||||
// We need to send an update to it if there are no base replicas in that DC yet,
|
||||
// as it won't receive updates otherwise.
|
||||
std::unordered_set<sstring> dcs_with_base_replicas;
|
||||
for (const auto& base_node : base_nodes) {
|
||||
dcs_with_base_replicas.insert(base_node.get().dc());
|
||||
}
|
||||
for (const auto& view_node : view_nodes) {
|
||||
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
|
||||
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
|
||||
no_pairing_replica = view_node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// https://github.com/scylladb/scylladb/issues/19439
|
||||
// With tablets, a node being replaced might transition to "left" state
|
||||
// but still be kept as a replica.
|
||||
// As of writing this hints are not prepared to handle nodes that are left
|
||||
// but are still replicas. Therefore, there is no other sensible option
|
||||
// right now but to give up attempt to send the update or write a hint
|
||||
// to the paired, permanently down replica.
|
||||
// We use the same workaround for the extra replica.
|
||||
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
|
||||
if (!replica) {
|
||||
return std::nullopt;
|
||||
}
|
||||
const auto& node = replica->get();
|
||||
if (!node.left()) {
|
||||
return node.host_id();
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
|
||||
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
|
||||
{
|
||||
auto& ks = _db.find_keyspace(base->ks_name());
|
||||
auto& replication = ks.get_replication_strategy();
|
||||
// We set legacy self-pairing for old vnode-based tables (for backward
|
||||
// compatibility), and unset it for tablets - where range movements
|
||||
// are more frequent and backward compatibility is less important.
|
||||
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
|
||||
// on a view, like we have the synchronous_updates_flag.
|
||||
bool use_legacy_self_pairing = !ks.uses_tablets();
|
||||
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
|
||||
auto get_erm = [&] (table_id id) {
|
||||
auto it = erms.find(id);
|
||||
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
|
||||
for (const auto& mut : view_updates) {
|
||||
(void)get_erm(mut.s->id());
|
||||
}
|
||||
// Enable rack-aware view updates pairing for tablets
|
||||
// when the cluster feature is enabled so that all replicas agree
|
||||
// on the pairing algorithm.
|
||||
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
|
||||
auto me = base_ermp->get_topology().my_host_id();
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
||||
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto view_ermp = erms.at(mut.s->id());
|
||||
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
|
||||
ks.uses_tablets(), cf_stats);
|
||||
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
|
||||
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
|
||||
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
|
||||
if (no_pairing_endpoint) {
|
||||
|
||||
@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
const locator::abstract_replication_strategy& replication_strategy,
|
||||
const dht::token& base_token,
|
||||
const dht::token& view_token,
|
||||
bool use_tablets,
|
||||
bool use_legacy_self_pairing,
|
||||
bool use_tablets_basic_rack_aware_view_pairing,
|
||||
replica::cf_stats& cf_stats);
|
||||
|
||||
/// Verify that the provided keyspace is eligible for storing materialized views.
|
||||
|
||||
@@ -38,7 +38,6 @@ debian_base_packages=(
|
||||
python3-aiohttp
|
||||
python3-pyparsing
|
||||
python3-colorama
|
||||
python3-dev
|
||||
python3-tabulate
|
||||
python3-pytest
|
||||
python3-pytest-asyncio
|
||||
@@ -66,7 +65,6 @@ debian_base_packages=(
|
||||
git-lfs
|
||||
e2fsprogs
|
||||
fuse3
|
||||
libev-dev # for python driver
|
||||
)
|
||||
|
||||
fedora_packages=(
|
||||
@@ -92,7 +90,6 @@ fedora_packages=(
|
||||
patchelf
|
||||
python3
|
||||
python3-aiohttp
|
||||
python3-devel
|
||||
python3-pip
|
||||
python3-file-magic
|
||||
python3-colorama
|
||||
@@ -157,8 +154,6 @@ fedora_packages=(
|
||||
https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
|
||||
elfutils
|
||||
jq
|
||||
|
||||
libev-devel # for python driver
|
||||
)
|
||||
|
||||
fedora_python3_packages=(
|
||||
|
||||
@@ -234,12 +234,18 @@ distributed_loader::get_sstables_from_upload_dir(sharded<replica::database>& db,
|
||||
}
|
||||
|
||||
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
|
||||
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
|
||||
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
|
||||
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src, &db] (auto& global_table, auto& directory) {
|
||||
return directory.start(global_table.as_sharded_parameter(),
|
||||
sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] {
|
||||
sharded_parameter([bucket, endpoint, prefix, &get_abort_src, &db] {
|
||||
auto eps = db.local().get_config().object_storage_endpoints()
|
||||
| std::views::filter([&endpoint](auto& ep) { return ep.key() == endpoint; })
|
||||
;
|
||||
if (eps.empty()) {
|
||||
throw std::invalid_argument(fmt::format("Undefined endpoint {}", endpoint));
|
||||
}
|
||||
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
|
||||
auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as);
|
||||
auto opts = data_dictionary::make_object_storage_options(endpoint, eps.front().type(), bucket, prefix, as);
|
||||
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
|
||||
}),
|
||||
sstables,
|
||||
|
||||
@@ -92,7 +92,7 @@ public:
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_upload_dir(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
|
||||
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
|
||||
static future<> process_upload_dir(sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
|
||||
};
|
||||
|
||||
|
||||
@@ -38,9 +38,8 @@ for required in jq curl; do
|
||||
fi
|
||||
done
|
||||
|
||||
FORCE=0
|
||||
ALLOW_SUBMODULE=0
|
||||
ALLOW_UNSTABLE=0
|
||||
ALLOW_ANY_BRANCH=0
|
||||
|
||||
function print_usage {
|
||||
cat << EOF
|
||||
@@ -61,18 +60,12 @@ Options:
|
||||
-h
|
||||
Print this help message and exit.
|
||||
|
||||
--allow-submodule
|
||||
Allow a PR to update a submudule
|
||||
|
||||
--allow-unstable
|
||||
--force
|
||||
Do not check current branch to be next*
|
||||
Do not check jenkins job status
|
||||
|
||||
--allow-any-branch
|
||||
Merge PR even if target branch is not next
|
||||
|
||||
--force
|
||||
Sets all above --allow-* options
|
||||
|
||||
--allow-submodule
|
||||
Allow a PR to update a submudule
|
||||
EOF
|
||||
}
|
||||
|
||||
@@ -80,23 +73,13 @@ while [[ $# -gt 0 ]]
|
||||
do
|
||||
case $1 in
|
||||
"--force"|"-f")
|
||||
ALLOW_UNSTABLE=1
|
||||
ALLOW_SUBMODULE=1
|
||||
ALLOW_ANY_BRANCH=1
|
||||
FORCE=1
|
||||
shift 1
|
||||
;;
|
||||
--allow-submodule)
|
||||
ALLOW_SUBMODULE=1
|
||||
shift
|
||||
;;
|
||||
--allow-unstable)
|
||||
ALLOW_UNSTABLE=1
|
||||
shift
|
||||
;;
|
||||
--allow-any-branch)
|
||||
ALLOW_ANY_BRANCH=1
|
||||
shift
|
||||
;;
|
||||
+([0-9]))
|
||||
PR_NUM=$1
|
||||
shift 1
|
||||
@@ -164,7 +147,7 @@ check_jenkins_job_status() {
|
||||
fi
|
||||
}
|
||||
|
||||
if [[ $ALLOW_UNSTABLE -eq 0 ]]; then
|
||||
if [[ $FORCE -eq 0 ]]; then
|
||||
check_jenkins_job_status
|
||||
fi
|
||||
|
||||
@@ -196,19 +179,17 @@ echo -n "Fetching full name of author $PR_LOGIN... "
|
||||
USER_NAME=$(curl -s "https://api.github.com/users/$PR_LOGIN" | jq -r .name)
|
||||
echo "$USER_NAME"
|
||||
|
||||
if [[ $ALLOW_ANY_BRANCH -eq 0 ]]; then
|
||||
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
|
||||
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
|
||||
TARGET_BASE="unknown"
|
||||
if [[ ${BASE_BRANCH} == master ]]; then
|
||||
TARGET_BASE="next"
|
||||
elif [[ ${BASE_BRANCH} == branch-* ]]; then
|
||||
TARGET_BASE=${BASE_BRANCH//branch/next}
|
||||
fi
|
||||
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
|
||||
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}. Use --allow-any-branch or --force to skip this check"
|
||||
exit 1
|
||||
fi
|
||||
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
|
||||
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
|
||||
TARGET_BASE="unknown"
|
||||
if [[ ${BASE_BRANCH} == master ]]; then
|
||||
TARGET_BASE="next"
|
||||
elif [[ ${BASE_BRANCH} == branch-* ]]; then
|
||||
TARGET_BASE=${BASE_BRANCH//branch/next}
|
||||
fi
|
||||
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
|
||||
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
git fetch "$REMOTE" pull/$PR_NUM/head
|
||||
|
||||
238
scylla-gdb.py
238
scylla-gdb.py
@@ -267,16 +267,246 @@ class intrusive_set:
|
||||
|
||||
|
||||
class compact_radix_tree:
|
||||
"""Wrapper around compact_radix_tree::tree for GDB debugging.
|
||||
|
||||
Provides iteration and indexing by key (typically column_id) similar to std_map.
|
||||
The tree stores key-value pairs where keys are unsigned integers.
|
||||
|
||||
Example usage:
|
||||
tree = compact_radix_tree(row['_cells'])
|
||||
# Iterate over elements
|
||||
for key, value in tree:
|
||||
print(f"Column {key}: {value}")
|
||||
# Access by key
|
||||
cell = tree[column_id]
|
||||
# Check if key exists
|
||||
cell = tree.get(column_id, default=None)
|
||||
# Get all keys
|
||||
column_ids = tree.keys()
|
||||
|
||||
Note: Due to GDB limitations and compiler optimizations, full tree traversal
|
||||
is challenging. The implementation provides the std_map-like API but may not
|
||||
be able to extract all elements in optimized builds. In such cases, consider:
|
||||
- Using debug builds (-g -O0) for better introspection
|
||||
- Examining the tree structure directly with GDB commands
|
||||
- Using the C++ tree printer (compact_radix_tree::printer) in test code
|
||||
"""
|
||||
|
||||
def __init__(self, ref):
|
||||
"""Initialize from a gdb.Value representing a compact_radix_tree::tree instance."""
|
||||
self.ref = ref
|
||||
self.root = ref['_root']['_v']
|
||||
|
||||
# Get template arguments to determine key and value types
|
||||
tree_type = ref.type.strip_typedefs()
|
||||
self.value_type = tree_type.template_argument(0)
|
||||
# Index type defaults to unsigned int if not specified
|
||||
try:
|
||||
self.key_type = tree_type.template_argument(1)
|
||||
except RuntimeError:
|
||||
self.key_type = gdb.lookup_type('unsigned int')
|
||||
|
||||
# Cache for elements collected during traversal
|
||||
self._elements = None
|
||||
|
||||
# Constants from compact-radix-tree.hh
|
||||
# enum class layout : uint8_t { nil, indirect_tiny, indirect_small, ...}
|
||||
self.LAYOUT_NIL = 0
|
||||
self.RADIX_BITS = 7
|
||||
self.RADIX_MASK = (1 << self.RADIX_BITS) - 1
|
||||
|
||||
def is_empty(self):
|
||||
"""Check if the tree is empty."""
|
||||
try:
|
||||
layout = int(self.root['_base_layout'])
|
||||
return layout == self.LAYOUT_NIL
|
||||
except (gdb.error, gdb.MemoryError):
|
||||
return True
|
||||
|
||||
def _collect_elements(self):
|
||||
"""Collect all elements from the tree by traversing its structure.
|
||||
|
||||
Returns a list of (key, value) tuples sorted by key.
|
||||
This is cached after first call.
|
||||
"""
|
||||
if self._elements is not None:
|
||||
return self._elements
|
||||
|
||||
self._elements = []
|
||||
|
||||
if self.is_empty():
|
||||
return self._elements
|
||||
|
||||
try:
|
||||
# Traverse the tree structure
|
||||
# The tree is a radix tree with nodes that can be inner or leaf nodes
|
||||
# We'll do a depth-first traversal
|
||||
self._visit_node(self.root, 0, 0)
|
||||
|
||||
# Sort by key to ensure correct ordering
|
||||
self._elements.sort(key=lambda x: x[0])
|
||||
except (gdb.error, gdb.MemoryError) as e:
|
||||
# If traversal fails, we have at least collected what we could
|
||||
gdb.write(f"Warning: Failed to fully traverse compact_radix_tree: {e}\n")
|
||||
|
||||
return self._elements
|
||||
|
||||
def _visit_node(self, node, depth, prefix):
|
||||
"""Recursively visit a node and collect elements.
|
||||
|
||||
Args:
|
||||
node: The node_head to visit
|
||||
depth: Current depth in the tree
|
||||
prefix: Key prefix accumulated from parent nodes
|
||||
"""
|
||||
try:
|
||||
# Get node properties
|
||||
node_prefix = int(node['_prefix'])
|
||||
node_size = int(node['_size'])
|
||||
layout = int(node['_base_layout'])
|
||||
|
||||
if node_size == 0 or layout == self.LAYOUT_NIL:
|
||||
return
|
||||
|
||||
# Calculate the key size in bits
|
||||
# For uint32_t (column_id), this would be 32 bits
|
||||
key_bits = self.key_type.sizeof * 8
|
||||
|
||||
# Calculate leaf depth: the tree uses RADIX_BITS (7) bits per level
|
||||
# leaf_depth = ceil(key_bits / RADIX_BITS) - 1
|
||||
# The -1 accounts for the root level not being counted in depth
|
||||
leaf_depth = (key_bits + self.RADIX_BITS - 1) // self.RADIX_BITS - 1
|
||||
|
||||
# Extract prefix information from node_prefix
|
||||
# Prefix encoding: lower RADIX_BITS contain the prefix length,
|
||||
# upper bits contain the actual prefix value
|
||||
prefix_len = node_prefix & self.RADIX_MASK # Extract lower 7 bits for length
|
||||
prefix_value = node_prefix & ~self.RADIX_MASK # Extract upper bits for value
|
||||
|
||||
# Update prefix with node's contribution
|
||||
current_prefix = prefix | prefix_value
|
||||
|
||||
# Check if this is a leaf node (at maximum depth)
|
||||
if depth + prefix_len >= leaf_depth:
|
||||
# This is a leaf node - try to extract values
|
||||
self._collect_leaf_elements(node, current_prefix)
|
||||
else:
|
||||
# This is an inner node - recurse into children
|
||||
# Inner nodes contain pointers to other nodes
|
||||
# The structure is complex and varies by layout type
|
||||
# For now, we'll use a best-effort approach
|
||||
pass
|
||||
|
||||
except (gdb.error, gdb.MemoryError, ValueError) as e:
|
||||
# Skip nodes that can't be accessed
|
||||
pass
|
||||
|
||||
def _collect_leaf_elements(self, leaf_node, prefix):
|
||||
"""Collect elements from a leaf node.
|
||||
|
||||
Args:
|
||||
leaf_node: The leaf node_head
|
||||
prefix: Key prefix for elements in this leaf
|
||||
"""
|
||||
try:
|
||||
# Leaf nodes store the actual values
|
||||
# The exact structure depends on the layout type
|
||||
# Since the compiler may optimize away structure details,
|
||||
# we use a heuristic approach
|
||||
|
||||
# For now, we acknowledge that without full tree traversal support,
|
||||
# we can't reliably extract all elements
|
||||
# This would require implementing the full tree traversal logic
|
||||
# which is complex given GDB's limitations
|
||||
pass
|
||||
except (gdb.error, gdb.MemoryError):
|
||||
pass
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of elements in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return len(elements)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over (key, value) pairs in the tree in ascending key order.
|
||||
|
||||
Yields:
|
||||
Tuples of (key, value) where key is the integer index and value is the stored element.
|
||||
"""
|
||||
elements = self._collect_elements()
|
||||
for key, value in elements:
|
||||
yield (key, value)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Get value at given key (column_id).
|
||||
|
||||
Args:
|
||||
key: Integer key (column_id) to look up
|
||||
|
||||
Returns:
|
||||
The value at the given key
|
||||
|
||||
Raises:
|
||||
KeyError: If key not found in tree
|
||||
"""
|
||||
elements = self._collect_elements()
|
||||
for k, v in elements:
|
||||
if k == key:
|
||||
return v
|
||||
raise KeyError(f"Key {key} not found in compact_radix_tree")
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Get value at given key, or default if not found.
|
||||
|
||||
Args:
|
||||
key: Integer key to look up
|
||||
default: Value to return if key not found
|
||||
|
||||
Returns:
|
||||
The value at the given key, or default if not found
|
||||
"""
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def keys(self):
|
||||
"""Return a list of all keys in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return [k for k, v in elements]
|
||||
|
||||
def values(self):
|
||||
"""Return a list of all values in the tree."""
|
||||
elements = self._collect_elements()
|
||||
return [v for k, v in elements]
|
||||
|
||||
def items(self):
|
||||
"""Return a list of (key, value) tuples."""
|
||||
return list(self._collect_elements())
|
||||
|
||||
def to_string(self):
|
||||
if self.root['_base_layout'] == 0:
|
||||
"""Return a string representation for printing."""
|
||||
if self.is_empty():
|
||||
return '<empty>'
|
||||
|
||||
# Compiler optimizes-away lots of critical stuff, so
|
||||
# for now just show where the tree is
|
||||
return 'compact radix tree @ 0x%x' % self.root
|
||||
# Try to provide more useful information
|
||||
try:
|
||||
elements = self._collect_elements()
|
||||
if elements:
|
||||
keys = [k for k, v in elements]
|
||||
return f'compact_radix_tree with {len(elements)} element(s), keys: {keys}'
|
||||
else:
|
||||
# We know it's not empty but couldn't collect elements
|
||||
# This happens when compiler optimizations prevent tree traversal
|
||||
try:
|
||||
size = int(self.root['_size'])
|
||||
layout = int(self.root['_base_layout'])
|
||||
return f'compact_radix_tree with size={size}, layout={layout} @ {hex(int(self.root.address))} (elements not accessible, use debug build for full introspection)'
|
||||
except (gdb.error, gdb.MemoryError, ValueError, AttributeError):
|
||||
return f'compact_radix_tree @ {hex(int(self.root.address))} (structure not fully accessible)'
|
||||
except (gdb.error, gdb.MemoryError, ValueError, AttributeError) as e:
|
||||
# Fallback to simple representation
|
||||
return f'compact_radix_tree @ {hex(int(self.root.address))} (error: {e})'
|
||||
|
||||
|
||||
class intrusive_btree:
|
||||
|
||||
@@ -2488,6 +2488,11 @@ void sstable::validate_originating_host_id() const {
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (*originating_host_id != local_host_id) {
|
||||
// FIXME refrain from throwing an exception because of #10148
|
||||
sstlog.warn("Host id {} does not match local host id {} while validating SSTable: {}. Load foreign SSTables via the upload dir instead.", *originating_host_id, local_host_id, get_filename());
|
||||
}
|
||||
}
|
||||
|
||||
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,
|
||||
|
||||
@@ -135,17 +135,13 @@ future<> storage_manager::update_config(const db::config& cfg) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
|
||||
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
|
||||
auto found = _object_storage_endpoints.find(endpoint);
|
||||
if (found == _object_storage_endpoints.end()) {
|
||||
smlogger.error("unable to find {} in configured object-storage endpoints", endpoint);
|
||||
throw std::invalid_argument(format("endpoint {} not found", endpoint));
|
||||
}
|
||||
return found->second;
|
||||
}
|
||||
|
||||
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
|
||||
auto& ep = get_endpoint(endpoint);
|
||||
auto& ep = found->second;
|
||||
if (ep.client == nullptr) {
|
||||
ep.client = make_object_storage_client(ep.cfg, _object_storage_clients_memory, [&ct = container()] (std::string ep) {
|
||||
return ct.local().get_endpoint_client(ep);
|
||||
@@ -154,10 +150,6 @@ shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client
|
||||
return ep.client;
|
||||
}
|
||||
|
||||
sstring storage_manager::get_endpoint_type(sstring endpoint) {
|
||||
return get_endpoint(endpoint).cfg.type();
|
||||
}
|
||||
|
||||
bool storage_manager::is_known_endpoint(sstring endpoint) const {
|
||||
return _object_storage_endpoints.contains(endpoint);
|
||||
}
|
||||
|
||||
@@ -70,7 +70,6 @@ class storage_manager : public peering_sharded_service<storage_manager> {
|
||||
seastar::metrics::metric_groups metrics;
|
||||
|
||||
future<> update_config(const db::config&);
|
||||
object_storage_endpoint& get_endpoint(const sstring& ep);
|
||||
|
||||
public:
|
||||
struct config {
|
||||
@@ -81,7 +80,6 @@ public:
|
||||
storage_manager(const db::config&, config cfg);
|
||||
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint);
|
||||
bool is_known_endpoint(sstring endpoint) const;
|
||||
sstring get_endpoint_type(sstring endpoint);
|
||||
future<> stop();
|
||||
std::vector<sstring> endpoints(sstring type = "") const noexcept;
|
||||
};
|
||||
|
||||
@@ -205,13 +205,6 @@ private:
|
||||
}
|
||||
|
||||
bool tablet_in_scope(locator::tablet_id) const;
|
||||
|
||||
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges);
|
||||
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
|
||||
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
|
||||
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges);
|
||||
};
|
||||
|
||||
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
|
||||
@@ -350,52 +343,55 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges) {
|
||||
auto tablets_sstables =
|
||||
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
|
||||
if (sstables.empty() || tablets_sstables.empty()) {
|
||||
co_return std::move(tablets_sstables);
|
||||
}
|
||||
// sstables are sorted by first key in reverse order.
|
||||
auto reversed_sstables = sstables | std::views::reverse;
|
||||
|
||||
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
|
||||
for (const auto& sst : reversed_sstables) {
|
||||
auto sst_first = sst->get_first_decorated_key().token();
|
||||
auto sst_last = sst->get_last_decorated_key().token();
|
||||
|
||||
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
|
||||
if (tablet_range.after(sst_first, dht::token_comparator{})) {
|
||||
break;
|
||||
}
|
||||
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
|
||||
if (tablet_range.before(sst_last, dht::token_comparator{})) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
|
||||
sstables_fully_contained.push_back(sst);
|
||||
} else {
|
||||
sstables_partially_contained.push_back(sst);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return std::move(tablets_sstables);
|
||||
}
|
||||
|
||||
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
|
||||
if (progress) {
|
||||
progress->start(_tablet_map.tablet_count());
|
||||
}
|
||||
|
||||
auto classified_sstables = co_await get_sstables_for_tablets(
|
||||
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
|
||||
return _tablet_map.get_token_range(tid);
|
||||
}) | std::ranges::to<std::vector>());
|
||||
// sstables are sorted by first key in reverse order.
|
||||
auto sstable_it = _sstables.rbegin();
|
||||
|
||||
for (auto tablet_id : _tablet_map.tablet_ids() | std::views::filter([this] (auto tid) { return tablet_in_scope(tid); })) {
|
||||
auto tablet_range = _tablet_map.get_token_range(tablet_id);
|
||||
|
||||
auto sstable_token_range = [] (const sstables::shared_sstable& sst) {
|
||||
return dht::token_range(sst->get_first_decorated_key().token(),
|
||||
sst->get_last_decorated_key().token());
|
||||
};
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables_fully_contained;
|
||||
std::vector<sstables::shared_sstable> sstables_partially_contained;
|
||||
|
||||
// sstable is exhausted if its last key is before the current tablet range
|
||||
auto exhausted = [&tablet_range] (const sstables::shared_sstable& sst) {
|
||||
return tablet_range.before(sst->get_last_decorated_key().token(), dht::token_comparator{});
|
||||
};
|
||||
while (sstable_it != _sstables.rend() && exhausted(*sstable_it)) {
|
||||
sstable_it++;
|
||||
}
|
||||
|
||||
for (auto sst_it = sstable_it; sst_it != _sstables.rend(); sst_it++) {
|
||||
auto sst_token_range = sstable_token_range(*sst_it);
|
||||
|
||||
// sstables are sorted by first key, so should skip this SSTable since it
|
||||
// doesn't overlap with the current tablet range.
|
||||
if (!tablet_range.overlaps(sst_token_range, dht::token_comparator{})) {
|
||||
// If the start of the next SSTable's token range lies beyond the current tablet's token
|
||||
// range, we can safely conclude that no more relevant SSTables remain for this tablet.
|
||||
if (tablet_range.after(sst_token_range.start()->value(), dht::token_comparator{})) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tablet_range.contains(sst_token_range, dht::token_comparator{})) {
|
||||
sstables_fully_contained.push_back(*sst_it);
|
||||
} else {
|
||||
sstables_partially_contained.push_back(*sst_it);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
|
||||
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
|
||||
progress,
|
||||
sstables_fully_contained.size() + sstables_partially_contained.size());
|
||||
@@ -755,9 +751,8 @@ future<> sstables_loader::download_task_impl::run() {
|
||||
};
|
||||
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
||||
|
||||
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
|
||||
std::vector<seastar::abort_source> shard_aborts(smp::count);
|
||||
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
|
||||
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] {
|
||||
return &shard_aborts[this_shard_id()];
|
||||
});
|
||||
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
||||
@@ -837,7 +832,3 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
|
||||
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
|
||||
co_return task->id();
|
||||
}
|
||||
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges) {
|
||||
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
|
||||
}
|
||||
|
||||
@@ -10,8 +10,6 @@
|
||||
|
||||
#include <vector>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "dht/i_partitioner_fwd.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
@@ -154,18 +152,3 @@ struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_vie
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct tablet_sstable_collection {
|
||||
dht::token_range tablet_range;
|
||||
std::vector<sstables::shared_sstable> sstables_fully_contained;
|
||||
std::vector<sstables::shared_sstable> sstables_partially_contained;
|
||||
};
|
||||
|
||||
// This function is intended for test purposes only.
|
||||
// It assigns the given sstables to the given tablet ranges based on token containment.
|
||||
// It returns a vector of tablet_sstable_collection, each containing the tablet range
|
||||
// and the sstables that are fully or partially contained within that range.
|
||||
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
|
||||
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
|
||||
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges);
|
||||
|
||||
@@ -370,7 +370,6 @@ add_scylla_test(combined_tests
|
||||
sstable_compression_config_test.cc
|
||||
sstable_directory_test.cc
|
||||
sstable_set_test.cc
|
||||
sstable_tablet_streaming.cc
|
||||
statement_restrictions_test.cc
|
||||
storage_proxy_test.cc
|
||||
tablets_test.cc
|
||||
|
||||
@@ -1450,7 +1450,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
||||
std::map<sstring, replication_strategy_config_option> options;
|
||||
for (const auto& dc : option_dcs) {
|
||||
auto num_racks = node_count_per_rack.at(dc).size();
|
||||
auto rf = num_racks;
|
||||
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
|
||||
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
|
||||
options.emplace(dc, fmt::to_string(rf));
|
||||
}
|
||||
return options;
|
||||
@@ -1486,7 +1487,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
||||
// Test tablets rack-aware base-view pairing
|
||||
auto base_token = dht::token::get_random_token();
|
||||
auto view_token = dht::token::get_random_token();
|
||||
bool use_tablets = true;
|
||||
bool use_legacy_self_pairing = false;
|
||||
bool use_tablets_basic_rack_aware_view_pairing = true;
|
||||
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
|
||||
replica::cf_stats cf_stats;
|
||||
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
|
||||
@@ -1500,7 +1502,8 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
||||
*ars_ptr,
|
||||
base_token,
|
||||
view_token,
|
||||
use_tablets,
|
||||
use_legacy_self_pairing,
|
||||
use_tablets_basic_rack_aware_view_pairing,
|
||||
cf_stats).natural_endpoint;
|
||||
|
||||
// view pair must be found
|
||||
@@ -1522,6 +1525,181 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
||||
}
|
||||
}
|
||||
|
||||
// Called in a seastar thread
|
||||
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
|
||||
auto my_address = gms::inet_address("localhost");
|
||||
|
||||
// Create the RackInferringSnitch
|
||||
snitch_config cfg;
|
||||
cfg.listen_address = my_address;
|
||||
cfg.broadcast_address = my_address;
|
||||
cfg.name = "RackInferringSnitch";
|
||||
sharded<snitch_ptr> snitch;
|
||||
snitch.start(cfg).get();
|
||||
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
||||
snitch.invoke_on_all(&snitch_ptr::start).get();
|
||||
|
||||
locator::token_metadata::config tm_cfg;
|
||||
tm_cfg.topo_cfg.this_endpoint = my_address;
|
||||
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
||||
|
||||
std::map<sstring, size_t> node_count_per_dc;
|
||||
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
|
||||
std::vector<ring_point> ring_points;
|
||||
|
||||
auto& random_engine = seastar::testing::local_random_engine;
|
||||
unsigned shard_count = 2;
|
||||
size_t num_dcs = 1 + tests::random::get_int(3);
|
||||
|
||||
// Generate a random cluster
|
||||
double point = 1;
|
||||
for (size_t dc = 0; dc < num_dcs; ++dc) {
|
||||
sstring dc_name = fmt::format("{}", 100 + dc);
|
||||
size_t num_racks = 2 + tests::random::get_int(4);
|
||||
for (size_t rack = 0; rack < num_racks; ++rack) {
|
||||
sstring rack_name = fmt::format("{}", 10 + rack);
|
||||
size_t rack_nodes = 1 + tests::random::get_int(2);
|
||||
for (size_t i = 1; i <= rack_nodes; ++i) {
|
||||
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
|
||||
node_count_per_dc[dc_name]++;
|
||||
node_count_per_rack[dc_name][rack_name]++;
|
||||
point++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testlog.debug("node_count_per_rack={}", node_count_per_rack);
|
||||
|
||||
// Initialize the token_metadata
|
||||
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
||||
auto stop_stm = deferred_stop(stm);
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
std::unordered_set<token> tokens;
|
||||
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
||||
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
|
||||
co_await tm.update_normal_tokens(std::move(tokens), id);
|
||||
}
|
||||
}).get();
|
||||
|
||||
auto base_schema = schema_builder("ks", "base")
|
||||
.with_column("k", utf8_type, column_kind::partition_key)
|
||||
.with_column("v", utf8_type)
|
||||
.build();
|
||||
|
||||
auto view_schema = schema_builder("ks", "view")
|
||||
.with_column("v", utf8_type, column_kind::partition_key)
|
||||
.with_column("k", utf8_type)
|
||||
.build();
|
||||
|
||||
auto tmptr = stm.get();
|
||||
|
||||
// Create the replication strategy
|
||||
auto make_random_options = [&] () {
|
||||
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
|
||||
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
|
||||
std::map<sstring, replication_strategy_config_option> options;
|
||||
for (const auto& dc : option_dcs) {
|
||||
auto num_racks = node_count_per_rack.at(dc).size();
|
||||
auto rf = more_or_less ?
|
||||
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
|
||||
tests::random::get_int(1UL, num_racks);
|
||||
options.emplace(dc, fmt::to_string(rf));
|
||||
}
|
||||
return options;
|
||||
};
|
||||
|
||||
auto options = make_random_options();
|
||||
size_t tablet_count = 1 + tests::random::get_int(99);
|
||||
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
|
||||
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
|
||||
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
||||
"NetworkTopologyStrategy", params, tmptr->get_topology());
|
||||
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
|
||||
BOOST_REQUIRE(tab_awr_ptr);
|
||||
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
|
||||
auto base_table_id = base_schema->id();
|
||||
testlog.debug("base_table_id={}", base_table_id);
|
||||
auto view_table_id = view_schema->id();
|
||||
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
|
||||
testlog.debug("view_table_id={}", view_table_id);
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
|
||||
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
|
||||
}).get();
|
||||
|
||||
tmptr = stm.get();
|
||||
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
|
||||
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
|
||||
|
||||
auto& topology = tmptr->get_topology();
|
||||
testlog.debug("topology: {}", topology.get_datacenter_racks());
|
||||
|
||||
// Test tablets rack-aware base-view pairing
|
||||
auto base_token = dht::token::get_random_token();
|
||||
auto view_token = dht::token::get_random_token();
|
||||
bool use_legacy_self_pairing = false;
|
||||
bool use_tablets_basic_rack_aware_view_pairing = true;
|
||||
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
|
||||
replica::cf_stats cf_stats;
|
||||
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
|
||||
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
|
||||
std::unordered_map<sstring, size_t> same_rack_pairs;
|
||||
std::unordered_map<sstring, size_t> cross_rack_pairs;
|
||||
for (const auto& base_replica : base_replicas) {
|
||||
auto& base_host = base_replica.host;
|
||||
auto view_ep_opt = db::view::get_view_natural_endpoint(
|
||||
base_host,
|
||||
base_erm,
|
||||
view_erm,
|
||||
*ars_ptr,
|
||||
base_token,
|
||||
view_token,
|
||||
use_legacy_self_pairing,
|
||||
use_tablets_basic_rack_aware_view_pairing,
|
||||
cf_stats).natural_endpoint;
|
||||
|
||||
// view pair must be found
|
||||
if (!view_ep_opt) {
|
||||
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
|
||||
}
|
||||
BOOST_REQUIRE(view_ep_opt);
|
||||
auto& view_ep = *view_ep_opt;
|
||||
|
||||
// Assert pairing uniqueness
|
||||
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
|
||||
BOOST_REQUIRE(inserted_base_pair);
|
||||
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
|
||||
BOOST_REQUIRE(inserted_view_pair);
|
||||
|
||||
auto& base_location = topology.find_node(base_host)->dc_rack();
|
||||
auto& view_location = topology.find_node(view_ep)->dc_rack();
|
||||
|
||||
// Assert dc- and rack- aware pairing
|
||||
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
|
||||
|
||||
if (base_location.rack == view_location.rack) {
|
||||
same_rack_pairs[base_location.dc]++;
|
||||
} else {
|
||||
cross_rack_pairs[base_location.dc]++;
|
||||
}
|
||||
}
|
||||
for (const auto& [dc, rf_opt] : options) {
|
||||
auto rf = locator::get_replication_factor(rf_opt);
|
||||
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
|
||||
test_complex_rack_aware_view_pairing_test(false);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
|
||||
test_complex_rack_aware_view_pairing_test(true);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_rack_diff) {
|
||||
BOOST_REQUIRE(diff_racks({}, {}).empty());
|
||||
|
||||
|
||||
@@ -1,367 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "dht/token.hh"
|
||||
#include "sstable_test.hh"
|
||||
#include "sstables_loader.hh"
|
||||
#include "test/lib/sstable_test_env.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(sstable_tablet_streaming_test)
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
std::vector<shared_sstable> make_sstables_with_ranges(test_env& env, const std::vector<std::pair<int64_t, int64_t>>& ranges) {
|
||||
std::vector<shared_sstable> ssts;
|
||||
for (const auto& [first, last] : ranges) {
|
||||
auto sst = env.make_sstable(uncompressed_schema(), uncompressed_dir());
|
||||
test(sst).set_first_and_last_keys(dht::decorated_key(dht::token{first}, partition_key(std::vector<bytes>{"1"})),
|
||||
dht::decorated_key(dht::token{last}, partition_key(std::vector<bytes>{"1"})));
|
||||
ssts.push_back(std::move(sst));
|
||||
}
|
||||
// By sorting SSTables by their primary key, we enable runs to be
|
||||
// streamed incrementally. Overlapping fragments can be deduplicated,
|
||||
// reducing the amount of data sent over the wire. Elements are
|
||||
// popped from the back of the vector, so we sort in descending
|
||||
// order to begin with the smaller tokens.
|
||||
// See sstable_streamer constructor for more details.
|
||||
std::ranges::sort(ssts, [](const shared_sstable& x, const shared_sstable& y) { return x->compare_by_first_key(*y) > 0; });
|
||||
return ssts;
|
||||
}
|
||||
|
||||
std::vector<dht::token_range> get_tablet_sstable_collection(auto&&... tablet_ranges) {
|
||||
// tablet ranges are left-non-inclusive, see `tablet_map::get_token_range` for details
|
||||
std::vector<dht::token_range> collections{dht::token_range::make({tablet_ranges.start()->value(), false}, {tablet_ranges.end()->value(), true})...};
|
||||
|
||||
std::sort(collections.begin(), collections.end(), [](auto const& a, auto const& b) { return a.start()->value() < b.start()->value(); });
|
||||
|
||||
return collections;
|
||||
}
|
||||
|
||||
#define REQUIRE_WITH_CONTEXT(sstables, expected_size) \
|
||||
BOOST_TEST_CONTEXT("Testing with ranges: " << [&] { \
|
||||
std::stringstream ss; \
|
||||
for (const auto& sst : (sstables)) { \
|
||||
ss << dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token()) << ", "; \
|
||||
} \
|
||||
return ss.str(); \
|
||||
}()) \
|
||||
BOOST_REQUIRE_EQUAL(sstables.size(), expected_size)
|
||||
|
||||
SEASTAR_TEST_CASE(test_streaming_ranges_distribution) {
|
||||
return test_env::do_with_async([](test_env& env) {
|
||||
// 1) Exact boundary equality: SSTable == tablet
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{5, 10},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
// 2) Single-point overlaps at start/end
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{4, 5}, // touches start, non-inclusive, skip
|
||||
{10, 11}, // touches end
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
// 3) Tablet fully inside a large SSTable
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 20},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
// 4) Multiple SSTables fully contained in tablet
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{6, 7},
|
||||
{7, 8},
|
||||
{8, 9},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 3);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
// 5) Two overlapping but not fully contained SSTables
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 6}, // overlaps at left
|
||||
{9, 15}, // overlaps at right
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
|
||||
}
|
||||
|
||||
// 6) Unsorted input (helper sorts) + mixed overlaps
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{50}, dht::token{100}});
|
||||
// Intentionally unsorted by first token
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{120, 130},
|
||||
{0, 10},
|
||||
{60, 70}, // fully contained
|
||||
{40, 55}, // partial
|
||||
{95, 105}, // partial
|
||||
{80, 90}, // fully contained
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
|
||||
}
|
||||
|
||||
// 7) Empty SSTable list
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
std::vector<shared_sstable> ssts;
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
// 8) Tablet outside all SSTables
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{1, 2},
|
||||
{3, 4},
|
||||
{10, 20},
|
||||
{300, 400},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
// 9) Boundary adjacency with multiple fragments
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{50, 100}, // touches start -> non-inclusive, skip
|
||||
{100, 120}, // starts at start -> partially contained
|
||||
{180, 200}, // ends at end -> fully contained
|
||||
{200, 220}, // touches end -> partial
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 1);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
|
||||
}
|
||||
|
||||
// 10) Large SSTable set where early break should occur
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{1000}, dht::token{2000}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{100, 200},
|
||||
{300, 400},
|
||||
{900, 950},
|
||||
{1001, 1100}, // fully contained
|
||||
{1500, 1600}, // fully contained
|
||||
{2101, 2200}, // entirely after -> should trigger early break in ascending scan
|
||||
{1999, 2100}, // overlap, partially contained
|
||||
{3000, 3100},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
// 10) https://github.com/scylladb/scylladb/pull/26980 example, tested
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{4}, dht::token{5}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 5},
|
||||
{0, 3},
|
||||
{2, 5},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
// None fully contained; three partial overlaps
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_streaming_ranges_distribution_in_tablets) {
|
||||
return test_env::do_with_async([](test_env& env) {
|
||||
{
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}}, dht::token_range{dht::token{11}, dht::token{15}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{5, 10},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
{
|
||||
// Multiple tablets with a hole between [10,11]
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}},
|
||||
dht::token_range{dht::token{5}, dht::token{9}},
|
||||
dht::token_range{dht::token{12}, dht::token{15}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 4}, // T.start==S.start, but non-inclusive -> partial
|
||||
{5, 9}, // same as above
|
||||
{6, 8}, // fully in second tablet
|
||||
{10, 11}, // falls in the hole, should be rejected
|
||||
{8, 13}, // overlaps second and third tablets (partial in both)
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 1);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 2);
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[2].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
{
|
||||
// SSTables outside any tablet range
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{20}, dht::token{25}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 5}, // before
|
||||
{30, 35}, // after
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
{
|
||||
// Edge case: SSTable touching tablet boundary
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{4, 5}, // touches start, non-inclusive, skip
|
||||
{10, 11}, // touches end
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
{
|
||||
// No tablets, but some SSTables
|
||||
auto collection = get_tablet_sstable_collection();
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 5},
|
||||
{10, 15},
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
BOOST_REQUIRE_EQUAL(res.size(), 0); // no tablets → nothing to classify
|
||||
}
|
||||
|
||||
{
|
||||
// No SSTables, but some tablets
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{10}, dht::token{15}});
|
||||
std::vector<shared_sstable> ssts; // empty
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
|
||||
}
|
||||
|
||||
{
|
||||
// No tablets and no SSTables
|
||||
auto collection = get_tablet_sstable_collection();
|
||||
std::vector<shared_sstable> ssts; // empty
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
BOOST_REQUIRE_EQUAL(res.size(), 0);
|
||||
}
|
||||
{
|
||||
// SSTable spanning two tablets
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}}, dht::token_range{dht::token{5}, dht::token{9}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{2, 7}, // spans both tablets
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
// Tablet [0,4] sees partial overlap
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
|
||||
// Tablet [5,9] sees partial overlap
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
{
|
||||
// SSTable spanning three tablets with a hole in between
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{3}},
|
||||
dht::token_range{dht::token{4}, dht::token{6}},
|
||||
dht::token_range{dht::token{8}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{2, 9}, // spans across tablets 1,2,3 and hole [7]
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
|
||||
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
|
||||
}
|
||||
|
||||
{
|
||||
// SSTable fully covering one tablet and partially overlapping another
|
||||
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{6}, dht::token{10}});
|
||||
auto ssts = make_sstables_with_ranges(env,
|
||||
{
|
||||
{0, 7}, // fully covers first tablet, partial in second
|
||||
});
|
||||
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
|
||||
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
|
||||
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
@@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) {
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
msg = e.execute_cql("select * from system_distributed.view_build_status").get();
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
}, 30);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -25,14 +25,12 @@ import json
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
import threading
|
||||
import random
|
||||
import re
|
||||
|
||||
from test.cluster.util import get_replication
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.pylib.tablets import get_tablet_replica
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -971,118 +969,3 @@ async def test_alternator_concurrent_rmw_same_partition_different_server(manager
|
||||
t.join()
|
||||
finally:
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
|
||||
"""
|
||||
Reproducer for issue #27353.
|
||||
|
||||
LWT requires that storage_proxy::cas() is invoked on a valid shard — the one
|
||||
returned by sharder.try_get_shard_for_reads() for a tablets-based table.
|
||||
|
||||
The bug: if the current shard is invalid and we jump to the valid shard, that
|
||||
new shard may become invalid again by the time we attempt to capture the ERM.
|
||||
This leads to a failure of the CAS path.
|
||||
|
||||
The fix: retry the validity check and jump again if the current shard is already
|
||||
invalid. We should exit the loop once the shard is valid *and* we hold a strong pointer
|
||||
to the ERM — which prevents further tablet movements until the ERM is released.
|
||||
|
||||
This problem is specific to BatchWriteItem; other commands are already handled
|
||||
correctly.
|
||||
"""
|
||||
config = alternator_config.copy()
|
||||
config['alternator_write_isolation'] = 'always_use_lwt'
|
||||
cmdline = [
|
||||
'--logger-log-level', 'alternator-executor=trace',
|
||||
'--logger-log-level', 'alternator_controller=trace',
|
||||
'--logger-log-level', 'paxos=trace'
|
||||
]
|
||||
server = await manager.server_add(config=config, cmdline=cmdline)
|
||||
alternator = get_alternator(server.ip_addr)
|
||||
|
||||
logger.info("Creating alternator test table")
|
||||
table = alternator.create_table(TableName=unique_table_name(),
|
||||
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
||||
BillingMode='PAY_PER_REQUEST',
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N'}])
|
||||
table_name = table.name
|
||||
ks_name = 'alternator_' + table_name
|
||||
last_token = 7 # Any token works since we have only one tablet
|
||||
|
||||
(src_host_id, src_shard) = await get_tablet_replica(manager, server, ks_name, table_name, last_token)
|
||||
dst_shard = 0 if src_shard == 1 else 1
|
||||
|
||||
logger.info("Inject 'intranode_migration_streaming_wait'")
|
||||
await manager.api.enable_injection(server.ip_addr,
|
||||
"intranode_migration_streaming_wait",
|
||||
one_shot=False)
|
||||
|
||||
logger.info("Start tablet migration")
|
||||
intranode_migration_task = asyncio.create_task(
|
||||
manager.api.move_tablet(server.ip_addr, ks_name, table_name,
|
||||
src_host_id, src_shard,
|
||||
src_host_id, dst_shard, last_token))
|
||||
|
||||
logger.info("Open server logs")
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
|
||||
logger.info("Wait for intranode_migration_streaming_wait")
|
||||
await log.wait_for("intranode_migration_streaming: waiting")
|
||||
|
||||
logger.info("Inject 'alternator_executor_batch_write_wait'")
|
||||
await manager.api.enable_injection(server.ip_addr,
|
||||
"alternator_executor_batch_write_wait",
|
||||
one_shot=False,
|
||||
parameters={
|
||||
'table': table_name,
|
||||
'keyspace': ks_name,
|
||||
'shard': dst_shard
|
||||
})
|
||||
m = await log.mark()
|
||||
|
||||
# Start a background thread, which tries to hit the alternator_executor_batch_write_wait
|
||||
# injection on the destination shard.
|
||||
logger.info("Start a batch_write thread")
|
||||
stop_event = threading.Event()
|
||||
def run_batch():
|
||||
alternator = get_alternator(server.ip_addr)
|
||||
table = alternator.Table(table_name)
|
||||
while not stop_event.is_set():
|
||||
with table.batch_writer() as batch:
|
||||
batch.put_item(Item={'p': 1, 'x': 'hellow world'})
|
||||
t = ThreadWrapper(target=run_batch)
|
||||
t.start()
|
||||
|
||||
logger.info("Waiting for 'alternator_executor_batch_write_wait: hit'")
|
||||
await log.wait_for("alternator_executor_batch_write_wait: hit", from_mark=m)
|
||||
|
||||
# We have a batch request with "streaming" cas_shard on the destination shard.
|
||||
# This means we have already made a decision to jump to the src_shard.
|
||||
# Now we're releasing the tablet migration so that it reaches write_both_read_new and
|
||||
# and invaldiates this decision.
|
||||
|
||||
m = await log.mark()
|
||||
await manager.api.message_injection(server.ip_addr, "intranode_migration_streaming_wait")
|
||||
|
||||
# The next barrier must be for the write_both_read_new, we need a guarantee
|
||||
# that the src_shard observed it
|
||||
logger.info("Waiting for the next barrier")
|
||||
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
|
||||
from_mark=m)
|
||||
|
||||
# Now we have a guarantee that a new barrier succeeded on the src_shard,
|
||||
# this means the src_shard has already transitioned to write_both_read_new,
|
||||
# and our batch write will have to jump back to the destination shard.
|
||||
|
||||
logger.info("Release the 'alternator_executor_batch_write_wait'")
|
||||
await manager.api.message_injection(server.ip_addr, "alternator_executor_batch_write_wait")
|
||||
|
||||
logger.info("Waiting for migratino task to finish")
|
||||
await intranode_migration_task
|
||||
|
||||
stop_event.set()
|
||||
t.join()
|
||||
|
||||
@@ -10,7 +10,6 @@ import logging
|
||||
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.util import gather_safely
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -34,12 +33,25 @@ async def test_broken_bootstrap(manager: ManagerClient):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
|
||||
await manager.server_stop(server_b.server_id)
|
||||
await manager.server_stop(server_a.server_id)
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
async def worker():
|
||||
logger.info("Worker started")
|
||||
while not stop_event.is_set():
|
||||
for i in range(100):
|
||||
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
|
||||
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
|
||||
assert response[0].b == i
|
||||
await asyncio.sleep(0.1)
|
||||
logger.info("Worker stopped")
|
||||
|
||||
await manager.server_start(server_a.server_id)
|
||||
await manager.driver_connect()
|
||||
|
||||
for i in range(100):
|
||||
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
|
||||
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
|
||||
assert response[0].b == i
|
||||
worker_task = asyncio.create_task(worker())
|
||||
|
||||
await asyncio.sleep(20)
|
||||
stop_event.set()
|
||||
await worker_task
|
||||
|
||||
@@ -67,11 +67,11 @@ nodetool_cmd.conf = False
|
||||
|
||||
# Run the external "nodetool" executable (can be overridden by the NODETOOL
|
||||
# environment variable). Only call this if the REST API doesn't work.
|
||||
def run_nodetool(cql, *args, **subprocess_kwargs):
|
||||
def run_nodetool(cql, *args):
|
||||
# TODO: We may need to change this function or its callers to add proper
|
||||
# support for testing on multi-node clusters.
|
||||
host = cql.cluster.contact_points[0]
|
||||
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
|
||||
subprocess.run([nodetool_cmd(), '-h', host, *args])
|
||||
|
||||
def flush(cql, table):
|
||||
ks, cf = table.split('.')
|
||||
@@ -159,28 +159,6 @@ def disablebinary(cql):
|
||||
else:
|
||||
run_nodetool(cql, "disablebinary")
|
||||
|
||||
def getlogginglevel(cql, logger):
|
||||
if has_rest_api(cql):
|
||||
resp = requests.get(f'{rest_api_url(cql)}/system/logger/{logger}')
|
||||
if resp.ok:
|
||||
return resp.text.strip()
|
||||
raise RuntimeError(f"failed to fetch logging level for {logger}: {resp.status_code} {resp.text}")
|
||||
|
||||
result = run_nodetool(
|
||||
cql,
|
||||
"getlogginglevels",
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
for line in result.stdout.splitlines():
|
||||
stripped = line.strip()
|
||||
parts = stripped.split()
|
||||
if len(parts) >= 2 and parts[0] == logger:
|
||||
return parts[-1]
|
||||
|
||||
raise RuntimeError(f"logger {logger} not found in getlogginglevels output")
|
||||
|
||||
def setlogginglevel(cql, logger, level):
|
||||
if has_rest_api(cql):
|
||||
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})
|
||||
|
||||
@@ -10,7 +10,6 @@ import re
|
||||
import requests
|
||||
import socket
|
||||
import struct
|
||||
from test.cqlpy import nodetool
|
||||
from test.cqlpy.util import cql_session
|
||||
|
||||
def get_protocol_error_metrics(host) -> int:
|
||||
@@ -59,50 +58,11 @@ def try_connect(host, port, creds, protocol_version):
|
||||
with cql_with_protocol(host, port, creds, protocol_version) as session:
|
||||
return 1 if session else 0
|
||||
|
||||
@pytest.fixture
|
||||
def debug_exceptions_logging(request, cql):
|
||||
def _read_level() -> str | None:
|
||||
try:
|
||||
level = nodetool.getlogginglevel(cql, "exception")
|
||||
if level:
|
||||
level = level.strip().strip('"').lower()
|
||||
return level
|
||||
except Exception as exc:
|
||||
print(f"Failed to read exception logger level: {exc}")
|
||||
return None
|
||||
|
||||
def _set_and_verify(level: str) -> bool:
|
||||
try:
|
||||
nodetool.setlogginglevel(cql, "exception", level)
|
||||
except Exception as exc:
|
||||
print(f"Failed to set exception logger level to '{level}': {exc}")
|
||||
return False
|
||||
|
||||
observed = _read_level()
|
||||
if observed == level:
|
||||
return True
|
||||
|
||||
print(f"Exception logger level observed as '{observed}' while expecting '{level}'")
|
||||
return False
|
||||
|
||||
def _restore_logging():
|
||||
if not enabled and previous_level is None:
|
||||
return
|
||||
|
||||
target_level = previous_level or "info"
|
||||
_set_and_verify(target_level)
|
||||
|
||||
previous_level = _read_level()
|
||||
enabled = _set_and_verify("debug")
|
||||
|
||||
yield
|
||||
_restore_logging()
|
||||
|
||||
# If there is a protocol version mismatch, the server should
|
||||
# raise a protocol error, which is counted in the metrics.
|
||||
def test_protocol_version_mismatch(scylla_only, debug_exceptions_logging, request, host):
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
def test_protocol_version_mismatch(scylla_only, request, host):
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
@@ -284,8 +244,8 @@ def _protocol_error_impl(
|
||||
s.close()
|
||||
|
||||
def _test_impl(host, flag):
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
@@ -307,47 +267,47 @@ def no_ssl(request):
|
||||
yield
|
||||
|
||||
# Malformed BATCH with an invalid kind triggers a protocol error.
|
||||
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_bad_batch")
|
||||
|
||||
# Send OPTIONS during AUTHENTICATE to trigger auth-state error.
|
||||
def test_unexpected_message_during_auth(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_unexpected_message_during_auth(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_unexpected_auth")
|
||||
|
||||
# STARTUP with an invalid/missing string-map entry should produce a protocol error.
|
||||
def test_process_startup_invalid_string_map(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_startup_invalid_string_map(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_startup_invalid_string_map")
|
||||
|
||||
# STARTUP with unknown COMPRESSION option should produce a protocol error.
|
||||
def test_unknown_compression_algorithm(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_unknown_compression_algorithm(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_unknown_compression")
|
||||
|
||||
# QUERY long-string truncation: declared length > provided bytes triggers protocol error.
|
||||
def test_process_query_internal_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_query_internal_malformed_query(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_query_internal_malformed_query")
|
||||
|
||||
# QUERY options malformed: PAGE_SIZE flag set but page_size truncated triggers protocol error.
|
||||
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_query_internal_fail_read_options")
|
||||
|
||||
# PREPARE long-string truncation: declared length > provided bytes triggers protocol error.
|
||||
def test_process_prepare_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_prepare_malformed_query(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_prepare_malformed_query")
|
||||
|
||||
# EXECUTE cache-key malformed: short-bytes length > provided bytes triggers protocol error.
|
||||
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_execute_internal_malformed_cache_key")
|
||||
|
||||
# REGISTER malformed string list: declared string length > provided bytes triggers protocol error.
|
||||
def test_process_register_malformed_string_list(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
def test_process_register_malformed_string_list(scylla_only, no_ssl, host):
|
||||
_test_impl(host, "trigger_process_register_malformed_string_list")
|
||||
|
||||
# Test if the protocol exceptions do not decrease after running the test happy path.
|
||||
# This is to ensure that the protocol exceptions are not cleared or reset
|
||||
# during the test execution.
|
||||
def test_no_protocol_exceptions(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
def test_no_protocol_exceptions(scylla_only, no_ssl, host):
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
|
||||
@@ -14,7 +14,6 @@ import cassandra.cluster
|
||||
from contextlib import contextmanager
|
||||
import re
|
||||
import ssl
|
||||
import time
|
||||
|
||||
|
||||
# This function normalizes the SSL cipher suite name (a string),
|
||||
@@ -67,12 +66,13 @@ def test_tls_versions(cql):
|
||||
# a regression test for #9216
|
||||
def test_system_clients_stores_tls_info(cql):
|
||||
if not cql.cluster.ssl_context:
|
||||
table_result = cql.execute(f"SELECT * FROM system.clients")
|
||||
for row in table_result:
|
||||
assert not row.ssl_enabled
|
||||
assert row.ssl_protocol is None
|
||||
assert row.ssl_cipher_suite is None
|
||||
else:
|
||||
table_result = cql.execute(f"SELECT * FROM system.clients")
|
||||
for row in table_result:
|
||||
assert not row.ssl_enabled
|
||||
assert row.ssl_protocol is None
|
||||
assert row.ssl_cipher_suite is None
|
||||
|
||||
if cql.cluster.ssl_context:
|
||||
# TLS v1.2 must be supported, because this is the default version that
|
||||
# "cqlsh --ssl" uses. If this fact changes in the future, we may need
|
||||
# to reconsider this test.
|
||||
@@ -82,8 +82,7 @@ def test_system_clients_stores_tls_info(cql):
|
||||
# so we need to retry until all connections are initialized and have their TLS info recorded in system.clients,
|
||||
# otherwise we'd end up with some connections e.g. having their ssl_enabled=True but other fields still None.
|
||||
expected_ciphers = [normalize_cipher(cipher['name']) for cipher in ssl.create_default_context().get_ciphers()]
|
||||
deadline = time.time() + 10 # 10 seconds timeout
|
||||
while time.time() < deadline:
|
||||
for _ in range(1000): # try for up to 1000 * 0.01s = 10s seconds
|
||||
rows = session.execute(f"SELECT * FROM system.clients")
|
||||
if rows and all(
|
||||
row.ssl_enabled
|
||||
@@ -93,7 +92,7 @@ def test_system_clients_stores_tls_info(cql):
|
||||
):
|
||||
return
|
||||
time.sleep(0.01)
|
||||
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10 seconds")
|
||||
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10s seconds")
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
||||
@@ -163,11 +163,6 @@ public:
|
||||
_sst->_shards.push_back(this_shard_id());
|
||||
}
|
||||
|
||||
void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
|
||||
_sst->_first = first_key;
|
||||
_sst->_last = last_key;
|
||||
}
|
||||
|
||||
void rewrite_toc_without_component(component_type component) {
|
||||
SCYLLA_ASSERT(component != component_type::TOC);
|
||||
_sst->_recognized_components.erase(component);
|
||||
|
||||
@@ -789,7 +789,7 @@ class ScyllaServer:
|
||||
|
||||
while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT and not self.stop_event.is_set():
|
||||
assert self.cmd is not None
|
||||
if self.cmd.returncode is not None:
|
||||
if self.cmd.returncode:
|
||||
self.cmd = None
|
||||
if expected_error is not None:
|
||||
with self.log_filename.open("r", encoding="utf-8") as log_file:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM registry.fedoraproject.org/fedora:43
|
||||
FROM docker.io/fedora:42
|
||||
|
||||
ARG CLANG_BUILD="SKIP"
|
||||
ARG CLANG_ARCHIVES
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-43-20251208
|
||||
docker.io/scylladb/scylla-toolchain:fedora-42-20251122
|
||||
|
||||
@@ -65,7 +65,7 @@ SCYLLA_BUILD_DIR_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_BUILD_DIR}"
|
||||
SCYLLA_NINJA_FILE_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_NINJA_FILE}"
|
||||
|
||||
# Which LLVM release to build in order to compile Scylla
|
||||
LLVM_CLANG_TAG=21.1.6
|
||||
LLVM_CLANG_TAG=20.1.8
|
||||
|
||||
CLANG_ARCHIVE=$(cd "${SCYLLA_DIR}" && realpath -m "${CLANG_ARCHIVE}")
|
||||
|
||||
@@ -186,3 +186,7 @@ if [[ $? -ne 0 ]]; then
|
||||
fi
|
||||
set -e
|
||||
tar -C / -xpzf "${CLANG_ARCHIVE}"
|
||||
dnf remove -y clang clang-libs
|
||||
# above package removal might have removed those symbolic links, which will cause ccache not to work later on. Manually restore them.
|
||||
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang
|
||||
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang++
|
||||
|
||||
@@ -414,8 +414,9 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
|
||||
conn->_ssl_cipher_suite = cipher_suite;
|
||||
return make_ready_future<bool>(true);
|
||||
});
|
||||
}).handle_exception([conn](std::exception_ptr ep) {
|
||||
return seastar::make_exception_future<bool>(std::runtime_error(fmt::format("Inspecting TLS connection failed: {}", ep)));
|
||||
}).handle_exception([this, conn](std::exception_ptr ep) {
|
||||
_logger.warn("Inspecting TLS connection failed: {}", ep);
|
||||
return make_ready_future<bool>(false);
|
||||
})
|
||||
: make_ready_future<bool>(true)
|
||||
).then([conn] (bool ok){
|
||||
|
||||
@@ -63,7 +63,7 @@ protected:
|
||||
|
||||
bool _ssl_enabled = false;
|
||||
std::optional<sstring> _ssl_cipher_suite = std::nullopt;
|
||||
std::optional<sstring> _ssl_protocol = std::nullopt;
|
||||
std::optional<sstring> _ssl_protocol = std::nullopt;;
|
||||
|
||||
private:
|
||||
future<> process_until_tenant_switch();
|
||||
|
||||
Reference in New Issue
Block a user