strong_consistency: cache leader location for non-replica nodes

When a non-replica node handles a strongly consistent write, it must
forward the request to a replica. If the closest replica is not the
leader, the request gets redirected again, causing an extra roundtrip.

Add a leader location cache in groups_manager, keyed by raft group_id.
After a write request is forwarded, the CQL transport layer records the
final node as the leader in the cache. Subsequent write requests from
the same node for the same group are forwarded directly to the cached
leader, eliminating the extra roundtrip.

The cache is only used for writes. Reads can be served by any replica,
so they skip the cache and use proximity-based routing instead.

Cache entries are validated at use time: if the cached leader is no
longer a replica (e.g. after tablet migration), the entry is evicted
and the normal closest-replica path is taken. This prevents a scenario
where two nodes keep redirecting to each other because both think that
the other is the leader but actually both are non-replicas - such loop
is broken as soon as the tablet maps are updated.

On token_metadata updates, entries for groups that no longer exist
(e.g. table dropped, tablet merged) are evicted. Entries for groups
that still exist are kept — use-time validation handles staleness.

An on_node_resolved callback is propagated through the redirect/bounce
path so the transport layer can update the cache generically without
coupling to the strong-consistency coordinator. The coordinator creates
the callback only for writes (capturing the groups_manager and
group_id) and attaches it to the bounce message; the transport layer
invokes it once the final node is known, keeping the forwarding
infrastructure subsystem-agnostic.

We also add a test which verifies that after the initial redirect,
following requests to the same node avoid the extra redirect and
forward directly to the leader.

Fixes: SCYLLADB-1064

Closes scylladb/scylladb#29392
This commit is contained in:
Wojciech Mitros
2026-04-09 00:08:55 +02:00
committed by Piotr Dulikowski
parent cc034f84c5
commit 13c043903d
14 changed files with 306 additions and 85 deletions

View File

@@ -1289,9 +1289,14 @@ shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_s
return ::make_shared<cql_transport::messages::result_message::bounce>(my_host_id, shard, std::move(cached_fn_calls));
}
shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_node(locator::tablet_replica replica, cql3::computed_function_values cached_fn_calls, seastar::lowres_clock::time_point timeout, bool is_write) {
shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_node(
locator::tablet_replica replica,
cql3::computed_function_values cached_fn_calls,
seastar::lowres_clock::time_point timeout,
bool is_write,
noncopyable_function<void(locator::host_id)> on_node_resolved) {
get_cql_stats().forwarded_requests++;
return ::make_shared<cql_transport::messages::result_message::bounce>(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write);
return ::make_shared<cql_transport::messages::result_message::bounce>(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write, std::move(on_node_resolved));
}
query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) {

View File

@@ -522,7 +522,8 @@ public:
locator::tablet_replica replica,
cql3::computed_function_values cached_fn_calls,
seastar::lowres_clock::time_point timeout,
bool is_write);
bool is_write,
noncopyable_function<void(locator::host_id)> on_node_resolved = {});
void update_authorized_prepared_cache_config();

View File

@@ -62,7 +62,7 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
const auto mutate_result = co_await coordinator.get().mutate(_statement->s,
auto mutate_result = co_await coordinator.get().mutate(_statement->s,
keys[0].start()->value().token(),
[&](api::timestamp_type ts) {
const auto prefetch_data = update_parameters::prefetch_data(_statement->s);
@@ -78,9 +78,9 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
}, timeout, qs.get_client_state().get_abort_source());
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
if (auto* redirect = get_if<need_redirect>(&mutate_result)) {
bool is_write = true;
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats());
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved));
}
utils::get_local_injector().inject("sc_modification_statement_timeout", [&] {
throw exceptions::mutation_write_timeout_exception{"", "", options.get_consistency(), 0, 0, db::write_type::SIMPLE};

View File

@@ -55,9 +55,9 @@ future<::shared_ptr<result_message>> select_statement::do_execute(query_processo
key_ranges, state.get_trace_state(), timeout, state.get_client_state().get_abort_source());
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&query_result)) {
if (auto* redirect = get_if<need_redirect>(&query_result)) {
bool is_write = false;
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats());
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved));
}
co_return co_await process_results(get<lw_shared_ptr<query::result>>(std::move(query_result)),

View File

@@ -20,13 +20,14 @@ future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement
const locator::tablet_replica& target,
db::timeout_clock::time_point timeout,
bool is_write,
service::strong_consistency::stats& stats)
service::strong_consistency::stats& stats,
noncopyable_function<void(locator::host_id)> on_node_resolved)
{
auto&& func_values_cache = const_cast<cql3::query_options&>(options).take_cached_pk_function_calls();
const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id();
if (target.host != my_host_id) {
++(is_write ? stats.write_node_bounces : stats.read_node_bounces);
co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write);
co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write, std::move(on_node_resolved));
}
++(is_write ? stats.write_shard_bounces : stats.read_shard_bounces);
co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache));

View File

@@ -10,6 +10,7 @@
#include "cql3/cql_statement.hh"
#include "locator/tablets.hh"
#include <seastar/util/noncopyable_function.hh>
namespace service::strong_consistency { struct stats; }
@@ -21,7 +22,8 @@ future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement
const locator::tablet_replica& target,
db::timeout_clock::time_point timeout,
bool is_write,
service::strong_consistency::stats& stats);
service::strong_consistency::stats& stats,
noncopyable_function<void(locator::host_id)> on_node_resolved = {});
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name);

View File

@@ -181,7 +181,24 @@ static locator::tablet_replica select_closest_replica(const gms::gossiper& gossi
return *it;
}
auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as)
static need_redirect redirect_to_leader(locator::tablet_replica target, groups_manager& gm, raft::group_id group_id) {
return {
.target = target,
// The `local()` here is needed to update the cache on the shard handling
// the client request which may be different from the shard currently
// executing the statement.
.on_node_resolved = [container = &gm.container(), group_id] (locator::host_id leader) {
container->local().leader_cache().put(group_id, leader);
},
};
}
static need_redirect redirect_to_replica(locator::tablet_replica target) {
// When redirecting to a replica, there's no need to update the leader cache
return { .target = target };
}
auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as, bool use_leader_cache)
-> future<value_or_redirect<operation_ctx>>
{
auto erm = schema.table().get_effective_replication_map();
@@ -204,14 +221,27 @@ auto coordinator::create_operation_ctx(const schema& schema, const dht::token& t
const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(schema.id());
const auto tablet_id = tablet_map.get_tablet_id(token);
const auto& tablet_info = tablet_map.get_tablet_info(tablet_id);
const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id);
if (!contains(tablet_info.replicas, this_replica)) {
co_return need_redirect {
select_closest_replica(_gossiper, tablet_info.replicas, token,
erm->get_token_metadata().get_topology())
};
// For writes, check the leader cache to avoid an extra roundtrip.
// For now, reads skip the cache because any replica can serve them.
if (use_leader_cache) {
if (const auto cached = _groups_manager.leader_cache().get(raft_info.group_id)) {
if (const auto* target = find_replica(tablet_info, *cached)) {
co_return redirect_to_leader(*target, _groups_manager, raft_info.group_id);
}
// Cached leader is no longer a replica, evict it.
_groups_manager.leader_cache().erase(raft_info.group_id);
}
}
auto target = select_closest_replica(_gossiper, tablet_info.replicas, token,
erm->get_token_metadata().get_topology());
if (use_leader_cache) {
co_return redirect_to_leader(target, _groups_manager, raft_info.group_id);
}
co_return redirect_to_replica(target);
}
const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id);
co_await utils::get_local_injector().inject("sc_coordinator_wait_before_acquire_server",
utils::wait_for_message(5min));
@@ -250,9 +280,9 @@ future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
bool commit_status_unknown_ex = false;
try {
auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source());
if (const auto* redirect = get_if<need_redirect>(&op_result)) {
co_return *redirect;
auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source(), true);
if (auto* redirect = get_if<need_redirect>(&op_result)) {
co_return std::move(*redirect);
}
auto& op = get<operation_ctx>(op_result);
@@ -270,7 +300,7 @@ future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
schema->ks_name(), schema->cf_name(), op.tablet_id,
leader_host_id, op.tablet_info.replicas));
}
co_return need_redirect{*target};
co_return redirect_to_leader(*target, _groups_manager, op.raft_info.group_id);
}
if (auto* wait_for_leader = get_if<raft_server::need_wait_for_leader>(&disposition)) {
co_await std::move(wait_for_leader->future);
@@ -373,9 +403,9 @@ auto coordinator::query(schema_ptr schema,
auto mark_read_latency = defer([this, &lc] { _stats.read.mark(lc.stop().latency()); });
try {
auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source());
if (const auto* redirect = get_if<need_redirect>(&op_result)) {
co_return *redirect;
auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source(), false);
if (auto* redirect = get_if<need_redirect>(&op_result)) {
co_return std::move(*redirect);
}
auto& op = get<operation_ctx>(op_result);

View File

@@ -12,6 +12,7 @@
#include "query/query-result.hh"
#include "utils/histogram.hh"
#include <seastar/core/metrics.hh>
#include <seastar/util/noncopyable_function.hh>
namespace gms {
@@ -25,6 +26,7 @@ class groups_manager;
struct need_redirect {
locator::tablet_replica target;
noncopyable_function<void(locator::host_id)> on_node_resolved;
};
template <typename T = std::monostate>
using value_or_redirect = std::variant<T, need_redirect>;
@@ -61,7 +63,8 @@ private:
struct operation_ctx;
future<value_or_redirect<operation_ctx>> create_operation_ctx(const schema& schema,
const dht::token& token,
abort_source& as);
abort_source& as,
bool use_leader_cache);
public:
coordinator(groups_manager& groups_manager, replica::database& db, gms::gossiper& gossiper);

View File

@@ -48,28 +48,6 @@ public:
}
};
static void for_each_sc_tablet(const token_metadata& tm,
noncopyable_function<void(global_tablet_id, raft::group_id)>&& func)
{
const auto this_replica = locator::tablet_replica {
.host = tm.get_my_id(),
.shard = this_shard_id()
};
const auto& tablets = tm.tablets();
for (const auto& [table_id, _]: tablets.all_table_groups()) {
const auto& tablet_map = tablets.get_tablet_map(table_id);
if (!tablet_map.has_raft_info()) {
continue;
}
for (const auto& tablet_id: tablet_map.tablet_ids()) {
if (tablet_map.has_replica(tablet_id, this_replica)) {
const auto group_id = tablet_map.get_tablet_raft_info(tablet_id).group_id;
func(global_tablet_id{table_id, tablet_id}, group_id);
}
}
}
}
raft_server::raft_server(groups_manager::raft_group_state& state, gate::holder holder)
: _state(state)
, _holder(std::move(holder))
@@ -368,44 +346,64 @@ void groups_manager::update(token_metadata_ptr new_tm) {
state.has_tablet = false;
}
for_each_sc_tablet(*new_tm, [&](global_tablet_id tablet, raft::group_id id) {
auto& state = _raft_groups[id];
state.has_tablet = true;
// Don't start the raft server if it is already (started or starting) and not stopping.
if (state.gate && !state.gate->is_closed()) {
return;
const auto this_replica = locator::tablet_replica {
.host = new_tm->get_my_id(),
.shard = this_shard_id()
};
_leader_cache.begin_sweep();
const auto& tablets = new_tm->tablets();
for (const auto& [table_id, _]: tablets.all_table_groups()) {
const auto& tablet_map = tablets.get_tablet_map(table_id);
if (!tablet_map.has_raft_info()) {
continue;
}
for (const auto& tid: tablet_map.tablet_ids()) {
const auto id = tablet_map.get_tablet_raft_info(tid).group_id;
const auto tablet = global_tablet_id{table_id, tid};
logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id);
state.gate = make_lw_shared<gate>();
_starting_groups.push_back(state);
state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> {
co_await state.server_control_op.get_future();
co_await start_raft_group(tablet, id, std::move(new_tm));
state.server = &_raft_gr.get_server(id);
state.leader_info_updater = leader_info_updater(state, tablet, id);
_leader_cache.mark_seen(id);
if (!tablet_map.has_replica(tid, this_replica)) {
continue;
}
auto& state = _raft_groups[id];
state.has_tablet = true;
// We want to make sure the server is ready to serve requests before
// we report it as started in wait_for_groups_to_start().
abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60));
while (true) {
auto srv = raft_server(state, state.gate->hold());
auto res = srv.begin_mutate(aoe.abort_source());
if (auto w = get_if<raft_server::need_wait_for_leader>(&res)) {
co_await std::move(w->future);
} else {
break;
}
// Don't start the raft server if it is already (started or starting) and not stopping.
if (state.gate && !state.gate->is_closed()) {
continue;
}
_starting_groups.erase(_starting_groups.iterator_to(state));
logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id);
state.gate = make_lw_shared<gate>();
_starting_groups.push_back(state);
state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> {
co_await state.server_control_op.get_future();
co_await start_raft_group(tablet, id, std::move(new_tm));
state.server = &_raft_gr.get_server(id);
state.leader_info_updater = leader_info_updater(state, tablet, id);
logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id);
});
});
// We want to make sure the server is ready to serve requests before
// we report it as started in wait_for_groups_to_start().
abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60));
while (true) {
auto srv = raft_server(state, state.gate->hold());
auto res = srv.begin_mutate(aoe.abort_source());
if (auto w = get_if<raft_server::need_wait_for_leader>(&res)) {
co_await std::move(w->future);
} else {
break;
}
}
_starting_groups.erase(_starting_groups.iterator_to(state));
logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id);
});
}
}
schedule_raft_groups_deletion(false);
_leader_cache.end_sweep();
}
future<raft_server> groups_manager::acquire_server(table_id table_id, raft::group_id group_id, abort_source& as) {

View File

@@ -29,6 +29,57 @@ namespace service::strong_consistency {
class raft_server;
/// A cache of leader locations for raft groups where this node is not a replica.
/// Populated by the CQL transport layer after a redirect reveals the actual leader.
///
/// Uses a sweep-based eviction strategy tied to token_metadata updates:
/// begin_sweep() before iterating tablets, mark_seen() for each existing group,
/// end_sweep() to evict entries whose groups no longer exist.
class tablet_group_leader_cache {
struct entry {
locator::host_id leader;
bool seen = false;
};
std::unordered_map<raft::group_id, entry> _entries;
public:
void put(raft::group_id group, locator::host_id leader) {
auto [it, inserted] = _entries.try_emplace(group, entry{leader});
if (!inserted) {
it->second.leader = leader;
}
}
std::optional<locator::host_id> get(raft::group_id group) const {
auto it = _entries.find(group);
if (it != _entries.end()) {
return it->second.leader;
}
return std::nullopt;
}
void erase(raft::group_id group) {
_entries.erase(group);
}
void begin_sweep() {
for (auto& [_, e] : _entries) {
e.seen = false;
}
}
void mark_seen(raft::group_id group) {
auto it = _entries.find(group);
if (it != _entries.end()) {
it->second.seen = true;
}
}
void end_sweep() {
std::erase_if(_entries, [](const auto& p) { return !p.second.seen; });
}
};
/// A sharded service responsible for the lifecycle and access
/// management of all Raft groups for strongly consistent tablets hosted on this node.
///
@@ -88,6 +139,8 @@ class groups_manager : public peering_sharded_service<groups_manager> {
locator::token_metadata_ptr _pending_tm = nullptr;
bool _started = false;
tablet_group_leader_cache _leader_cache;
// Should be called on the shard that hosts the Raft group
future<> start_raft_group(locator::global_tablet_id tablet,
raft::group_id group_id,
@@ -132,6 +185,8 @@ public:
// until the raft groups for those tablets are started and ready to serve queries.
// For the local node, waits directly without an RPC.
future<> wait_for_table_raft_groups_on_all_hosts(table_id table, lowres_clock::time_point timeout);
tablet_group_leader_cache& leader_cache() { return _leader_cache; }
};
/// A temporary, RAII-style handle to an active Raft group server instance,

View File

@@ -1367,3 +1367,102 @@ async def test_abort_state_machine_apply_during_shutdown(manager: ManagerClient)
rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 0", host=target_host)
assert len(rows) == 1
assert rows[0].v == 13
async def test_leader_cache_eliminates_redirect(manager: ManagerClient):
"""
Verify that after a non-replica node learns the leader location via a redirect,
subsequent write requests from that node go directly to the leader without a redirect.
Uses 4 nodes in 2 racks (2 per rack) with RF=2 and 1 tablet.
Tablet-aware replication places one replica per rack.
The non-replica node in the non-leader rack always picks the same-rack
(non-leader) replica as closest, causing a redirect to the leader.
After the first write populates the cache, subsequent writes skip the redirect.
We verify this via the scylla_transport_requests_forwarded_redirected metric
on the non-replica node.
"""
cmdline = [
'--logger-log-level', 'cql_server=trace',
'--experimental-features', 'strongly-consistent-tables',
]
# 2 racks with 2 nodes each
property_file = [
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack2"},
{"dc": "dc1", "rack": "rack2"},
]
servers = await manager.servers_add(4, cmdline=cmdline, property_file=property_file)
(cql, hosts) = await manager.get_ready_cql(servers)
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
def host_by_host_id(host_id):
for hid, host in zip(host_ids, hosts):
if hid == host_id:
return host
raise RuntimeError(f"Can't find host for host_id {host_id}")
ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}"
" AND tablets = {'initial': 1} AND consistency = 'global'")
async with new_test_keyspace(manager, ks_opts) as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table:
table_name = table.split('.')[-1]
group_id = await get_table_raft_group_id(manager, ks, table_name)
try:
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
except:
leader_host_id = await wait_for_leader(manager, servers[1], group_id)
tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, table_name, 0)
assert len(tablet_replicas) == 2
replica_host_ids = [replica[0] for replica in tablet_replicas]
# Find the rack of the leader
leader_server = next(s for hid, s in zip(host_ids, servers) if str(hid) == leader_host_id)
leader_rack = leader_server.rack
# Pick the non-replica in the non-leader rack.
# This node's closest replica is the same-rack (non-leader) replica,
# so writes always cause a redirect on the first attempt.
non_replica_host_id = None
for hid, s in zip(host_ids, servers):
if str(hid) not in replica_host_ids and s.rack != leader_rack:
non_replica_host_id = hid
break
assert non_replica_host_id is not None, "Could not find non-replica in non-leader rack"
non_replica_server = next(s for hid, s in zip(host_ids, servers) if hid == non_replica_host_id)
non_replica_host = host_by_host_id(non_replica_host_id)
logger.info(f"Non-replica node: {non_replica_host} (rack {non_replica_server.rack}), leader: {leader_host_id} (rack {leader_rack})")
# Warmup phase: run enough requests to populate the leader cache
# on all shards of the non-replica node. The driver may distribute
# requests across multiple connections (shards), so we need to
# warm them all up. 20 requests should be enough to cover all shards.
for i in range(20):
await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host)
# Measure redirect counter after warmup (all shards should be warm).
metrics = await manager.metrics.query(non_replica_host.address)
redirects_before = metrics.get('scylla_transport_requests_forwarded_redirected') or 0
# Run another batch of requests; all should use the cached leader
# and NOT cause any redirects.
num_requests = 20
for i in range(20, 20 + num_requests):
await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host)
metrics = await manager.metrics.query(non_replica_host.address)
redirects_after = metrics.get('scylla_transport_requests_forwarded_redirected') or 0
new_redirects = redirects_after - redirects_before
logger.info(f"Redirects before: {redirects_before}, after: {redirects_after}, new: {new_redirects}")
assert new_redirects == 0, \
f"Expected no new redirects after cache warmup, but got {new_redirects}"
# Verify data correctness
rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 25", host=non_replica_host)
assert len(rows) == 1
assert rows[0].value == 25

View File

@@ -26,6 +26,7 @@
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/noncopyable_function.hh>
namespace cql_transport {
@@ -100,15 +101,18 @@ class result_message::bounce : public result_message {
cql3::computed_function_values _cached_fn_calls;
std::optional<seastar::lowres_clock::time_point> _timeout;
std::optional<bool> _is_write;
noncopyable_function<void(locator::host_id)> _on_node_resolved;
public:
bounce(locator::host_id host, unsigned shard, cql3::computed_function_values cached_fn_calls,
std::optional<seastar::lowres_clock::time_point> timeout = std::nullopt, std::optional<bool> is_write = std::nullopt)
std::optional<seastar::lowres_clock::time_point> timeout = std::nullopt, std::optional<bool> is_write = std::nullopt,
noncopyable_function<void(locator::host_id)> on_node_resolved = {})
: _host(host)
, _shard(shard)
, _cached_fn_calls(std::move(cached_fn_calls))
, _timeout(std::move(timeout))
, _is_write(std::move(is_write))
, _on_node_resolved(std::move(on_node_resolved))
{}
virtual void accept(result_message::visitor& v) const override {
v.visit(*this);
@@ -136,6 +140,10 @@ public:
cql3::computed_function_values&& take_cached_pk_function_calls() {
return std::move(_cached_fn_calls);
}
const noncopyable_function<void(locator::host_id)>& on_node_resolved() const {
return _on_node_resolved;
}
};
std::ostream& operator<<(std::ostream& os, const result_message::bounce& msg);

View File

@@ -538,7 +538,7 @@ future<forward_cql_execute_response> cql_server::handle_forward_execute(
};
}
future<foreign_ptr<std::unique_ptr<cql_transport::response>>>
future<cql_server::forward_cql_result>
cql_server::forward_cql(
locator::host_id target_host,
unsigned target_shard,
@@ -585,7 +585,10 @@ cql_server::forward_cql(
_stats.requests_forwarded_successfully++;
clogger.trace("Forwarded CQL request executed successfully on replica: {}", target_host);
tracing::trace(trace_state, "Forwarded CQL request executed successfully on replica: {}", target_host);
co_return std::make_unique<cql_transport::response>(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body));
co_return forward_cql_result{
.response = std::make_unique<cql_transport::response>(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body)),
.final_host = current_host,
};
case forward_cql_status::error: {
_stats.requests_forwarded_failed++;
clogger.trace("Forwarded CQL request failed on replica: {}", target_host);
@@ -593,9 +596,15 @@ cql_server::forward_cql(
auto result = std::make_unique<cql_transport::response>(stream, cql_binary_opcode::ERROR,
response.response_flags, std::move(response.response_body));
if (response.timeout) {
co_return co_await sleep_until_timeout_passes(*response.timeout, std::move(result));
co_return forward_cql_result{
.response = co_await sleep_until_timeout_passes(*response.timeout, std::move(result)),
.final_host = current_host,
};
}
co_return std::move(result);
co_return forward_cql_result{
.response = std::move(result),
.final_host = current_host,
};
}
case forward_cql_status::prepared_not_found: {
_stats.requests_forwarded_prepared_not_found++;
@@ -1909,11 +1918,16 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
.cached_fn_calls = std::move(cached_fn_calls),
};
auto response = co_await forward_cql(
auto result = co_await forward_cql(
target_host, shard, (*bounce_msg)->timeout().value(), (*bounce_msg)->is_write().value(),
stream, trace_state, std::move(req));
co_return cql_server::result_with_foreign_response_ptr(std::move(response));
const auto& on_node_resolved = (*bounce_msg)->on_node_resolved();
if (on_node_resolved) {
on_node_resolved(result.final_host);
}
co_return cql_server::result_with_foreign_response_ptr(std::move(result.response));
}
}
co_return std::move(msg);

View File

@@ -380,7 +380,12 @@ private:
void init_messaging_service();
future<> uninit_messaging_service();
future<forward_cql_execute_response> handle_forward_execute(service::query_state& qs, forward_cql_execute_request& req);
future<foreign_ptr<std::unique_ptr<cql_transport::response>>> forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout,
struct forward_cql_result {
foreign_ptr<std::unique_ptr<cql_transport::response>> response;
locator::host_id final_host;
};
future<forward_cql_result> forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout,
bool is_write, uint16_t stream, tracing::trace_state_ptr trace_state, forward_cql_execute_request req);
virtual shared_ptr<generic_server::connection> make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units<named_semaphore_exception_factory> initial_sem_units) override;