mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-25 09:11:10 +00:00
cql: fix missing TABLETS_ROUTING_V1 payload after CAS shard bounce
After an internal CAS shard bounce, check_locality() was evaluating
against this_shard_id() of the post-bounce shard — which is the correct
tablet shard — so it returned nullopt, and LWT/SERIAL responses omitted
the tablets-routing-v1 custom payload. The client never learned the
correct tablet map.
Fix by recording the original entry shard in client_state (initialized
to this_shard_id() at construction, preserved across shard bounces via
client_state_for_another_shard) and passing it to check_locality() so
it compares against the client's actual routing decision.
No host_id tracking or forwarded_client_state IDL changes are needed
because CAS shard bounces are always intra-node.
Fixes SCYLLADB-2041
(cherry picked from commit 167a3c9c50)
This commit is contained in:
@@ -294,7 +294,7 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
|
||||
auto&& table = s->table();
|
||||
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {
|
||||
auto erm = table.get_effective_replication_map();
|
||||
auto tablet_info = erm->check_locality(token);
|
||||
auto tablet_info = erm->check_locality(token, qs.get_client_state().get_original_shard());
|
||||
if (tablet_info.has_value()) {
|
||||
result->add_tablet_info(tablet_info->tablet_replicas, tablet_info->token_range);
|
||||
}
|
||||
@@ -427,7 +427,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
|
||||
auto&& table = s->table();
|
||||
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {
|
||||
auto erm = table.get_effective_replication_map();
|
||||
tablet_info = erm->check_locality(token);
|
||||
tablet_info = erm->check_locality(token, qs.get_client_state().get_original_shard());
|
||||
}
|
||||
|
||||
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
|
||||
|
||||
@@ -479,7 +479,7 @@ select_statement::do_execute(query_processor& qp,
|
||||
token = key_ranges[0].start()->value().as_decorated_key().token();
|
||||
|
||||
auto erm = table.get_effective_replication_map();
|
||||
tablet_info = erm->check_locality(token);
|
||||
tablet_info = erm->check_locality(token, state.get_client_state().get_original_shard());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ host_id_vector_replica_set vnode_effective_replication_map::get_replicas_for_rea
|
||||
return *endpoints | std::ranges::to<host_id_vector_replica_set>();
|
||||
}
|
||||
|
||||
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token& token) const {
|
||||
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token&, unsigned) const {
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
@@ -306,7 +306,13 @@ public:
|
||||
/// replaced but not yet rebuilt.
|
||||
virtual host_id_vector_replica_set get_replicas(const token& search_token, bool is_vnode = false) const = 0;
|
||||
|
||||
virtual std::optional<tablet_routing_info> check_locality(const token& token) const = 0;
|
||||
/// Checks whether the request was routed to the correct tablet replica.
|
||||
/// \param token the partition token
|
||||
/// \param original_shard the shard where the CQL request originally entered the node;
|
||||
/// after an internal CAS shard bounce this differs from this_shard_id()
|
||||
/// \returns nullopt if routed correctly, otherwise the tablet routing info
|
||||
/// so the client can learn the correct tablet map
|
||||
virtual std::optional<tablet_routing_info> check_locality(const token& token, unsigned original_shard) const = 0;
|
||||
|
||||
|
||||
/// Returns true if there are any pending ranges for this endpoint.
|
||||
@@ -492,7 +498,7 @@ public: // effective_replication_map
|
||||
host_id_vector_topology_change get_pending_replicas(const token& search_token) const override;
|
||||
host_id_vector_replica_set get_replicas_for_reading(const token& token, bool is_vnode = false) const override;
|
||||
host_id_vector_replica_set get_replicas(const token& search_token, bool is_vnode = false) const override;
|
||||
std::optional<tablet_routing_info> check_locality(const token& token) const override;
|
||||
std::optional<tablet_routing_info> check_locality(const token& token, unsigned original_shard) const override;
|
||||
bool has_pending_ranges(locator::host_id endpoint) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
const dht::sharder& get_sharder(const schema& s) const override;
|
||||
@@ -604,7 +610,7 @@ public:
|
||||
host_id_vector_topology_change get_pending_replicas(const token& search_token) const override;
|
||||
host_id_vector_replica_set get_replicas_for_reading(const token& token, bool is_vnode = false) const override;
|
||||
host_id_vector_replica_set get_replicas(const token& search_token, bool is_vnode = false) const override;
|
||||
std::optional<tablet_routing_info> check_locality(const token& token) const override;
|
||||
std::optional<tablet_routing_info> check_locality(const token& token, unsigned original_shard) const override;
|
||||
bool has_pending_ranges(locator::host_id endpoint) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
const dht::sharder& get_sharder(const schema& s) const override;
|
||||
|
||||
@@ -61,7 +61,7 @@ host_id_vector_replica_set local_effective_replication_map::get_replicas(const t
|
||||
return _replica_set;
|
||||
}
|
||||
|
||||
std::optional<tablet_routing_info> local_effective_replication_map::check_locality(const token& token) const {
|
||||
std::optional<tablet_routing_info> local_effective_replication_map::check_locality(const token&, unsigned) const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -1356,12 +1356,11 @@ public:
|
||||
return get_for_reading_helper(search_token);
|
||||
}
|
||||
|
||||
std::optional<tablet_routing_info> check_locality(const token& search_token) const override {
|
||||
std::optional<tablet_routing_info> check_locality(const token& search_token, unsigned original_shard) const override {
|
||||
auto&& tablets = get_tablet_map();
|
||||
auto tid = tablets.get_tablet_id(search_token);
|
||||
auto&& info = tablets.get_tablet_info(tid);
|
||||
auto host = get_token_metadata().get_my_id();
|
||||
auto shard = this_shard_id();
|
||||
|
||||
auto make_tablet_routing_info = [&] {
|
||||
dht::token first_token;
|
||||
@@ -1376,7 +1375,7 @@ public:
|
||||
|
||||
for (auto&& r : info.replicas) {
|
||||
if (r.host == host) {
|
||||
if (r.shard == shard) {
|
||||
if (r.shard == original_shard) {
|
||||
return std::nullopt; // routed correctly
|
||||
} else {
|
||||
return make_tablet_routing_info();
|
||||
@@ -1385,7 +1384,7 @@ public:
|
||||
}
|
||||
|
||||
auto tinfo = tablets.get_tablet_transition_info(tid);
|
||||
if (tinfo && tinfo->pending_replica && tinfo->pending_replica->host == host && tinfo->pending_replica->shard == shard) {
|
||||
if (tinfo && tinfo->pending_replica && tinfo->pending_replica->host == host && tinfo->pending_replica->shard == original_shard) {
|
||||
return std::nullopt; // routed correctly
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,7 @@ private:
|
||||
, _default_timeout_config(cs->_default_timeout_config)
|
||||
, _timeout_config(cs->_timeout_config)
|
||||
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
|
||||
, _original_shard(cs->_original_shard)
|
||||
{}
|
||||
friend client_state_for_another_shard;
|
||||
private:
|
||||
@@ -436,6 +437,10 @@ private:
|
||||
|
||||
cql_transport::cql_protocol_extension_enum_set _enabled_protocol_extensions;
|
||||
|
||||
// The shard where the current CQL request originally entered the node.
|
||||
// After an internal CAS shard bounce this differs from this_shard_id().
|
||||
unsigned _original_shard = this_shard_id();
|
||||
|
||||
public:
|
||||
|
||||
bool is_protocol_extension_set(cql_transport::cql_protocol_extension ext) const {
|
||||
@@ -445,6 +450,10 @@ public:
|
||||
void set_protocol_extensions(cql_transport::cql_protocol_extension_enum_set exts) {
|
||||
_enabled_protocol_extensions = std::move(exts);
|
||||
}
|
||||
|
||||
unsigned get_original_shard() const noexcept {
|
||||
return _original_shard;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user