diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 31b7a73aeb..33a73a7f7f 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -1289,9 +1289,14 @@ shared_ptr query_processor::bounce_to_s return ::make_shared(my_host_id, shard, std::move(cached_fn_calls)); } -shared_ptr query_processor::bounce_to_node(locator::tablet_replica replica, cql3::computed_function_values cached_fn_calls, seastar::lowres_clock::time_point timeout, bool is_write) { +shared_ptr query_processor::bounce_to_node( + locator::tablet_replica replica, + cql3::computed_function_values cached_fn_calls, + seastar::lowres_clock::time_point timeout, + bool is_write, + noncopyable_function on_node_resolved) { get_cql_stats().forwarded_requests++; - return ::make_shared(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write); + return ::make_shared(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write, std::move(on_node_resolved)); } query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) { diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 2bcd8d584f..aca82d011b 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -522,7 +522,8 @@ public: locator::tablet_replica replica, cql3::computed_function_values cached_fn_calls, seastar::lowres_clock::time_point timeout, - bool is_write); + bool is_write, + noncopyable_function on_node_resolved = {}); void update_authorized_prepared_cache_config(); diff --git a/cql3/statements/strong_consistency/modification_statement.cc b/cql3/statements/strong_consistency/modification_statement.cc index 1e0783870f..ae02c1ca7f 100644 --- a/cql3/statements/strong_consistency/modification_statement.cc +++ b/cql3/statements/strong_consistency/modification_statement.cc @@ -62,7 +62,7 @@ future> modification_statement::execute_without_check auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator(); - const auto mutate_result = co_await coordinator.get().mutate(_statement->s, + auto mutate_result = co_await coordinator.get().mutate(_statement->s, keys[0].start()->value().token(), [&](api::timestamp_type ts) { const auto prefetch_data = update_parameters::prefetch_data(_statement->s); @@ -78,9 +78,9 @@ future> modification_statement::execute_without_check }, timeout, qs.get_client_state().get_abort_source()); using namespace service::strong_consistency; - if (const auto* redirect = get_if(&mutate_result)) { + if (auto* redirect = get_if(&mutate_result)) { bool is_write = true; - co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats()); + co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved)); } utils::get_local_injector().inject("sc_modification_statement_timeout", [&] { throw exceptions::mutation_write_timeout_exception{"", "", options.get_consistency(), 0, 0, db::write_type::SIMPLE}; diff --git a/cql3/statements/strong_consistency/select_statement.cc b/cql3/statements/strong_consistency/select_statement.cc index 7d0454b8a8..612eb3978a 100644 --- a/cql3/statements/strong_consistency/select_statement.cc +++ b/cql3/statements/strong_consistency/select_statement.cc @@ -55,9 +55,9 @@ future<::shared_ptr> select_statement::do_execute(query_processo key_ranges, state.get_trace_state(), timeout, state.get_client_state().get_abort_source()); using namespace service::strong_consistency; - if (const auto* redirect = get_if(&query_result)) { + if (auto* redirect = get_if(&query_result)) { bool is_write = false; - co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats()); + co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved)); } co_return co_await process_results(get>(std::move(query_result)), diff --git a/cql3/statements/strong_consistency/statement_helpers.cc b/cql3/statements/strong_consistency/statement_helpers.cc index 244eeefd29..e3e1106593 100644 --- a/cql3/statements/strong_consistency/statement_helpers.cc +++ b/cql3/statements/strong_consistency/statement_helpers.cc @@ -20,13 +20,14 @@ future<::shared_ptr> redirect_statement const locator::tablet_replica& target, db::timeout_clock::time_point timeout, bool is_write, - service::strong_consistency::stats& stats) + service::strong_consistency::stats& stats, + noncopyable_function on_node_resolved) { auto&& func_values_cache = const_cast(options).take_cached_pk_function_calls(); const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id(); if (target.host != my_host_id) { ++(is_write ? stats.write_node_bounces : stats.read_node_bounces); - co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write); + co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write, std::move(on_node_resolved)); } ++(is_write ? stats.write_shard_bounces : stats.read_shard_bounces); co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache)); diff --git a/cql3/statements/strong_consistency/statement_helpers.hh b/cql3/statements/strong_consistency/statement_helpers.hh index d0a6b33879..02db635be1 100644 --- a/cql3/statements/strong_consistency/statement_helpers.hh +++ b/cql3/statements/strong_consistency/statement_helpers.hh @@ -10,6 +10,7 @@ #include "cql3/cql_statement.hh" #include "locator/tablets.hh" +#include namespace service::strong_consistency { struct stats; } @@ -21,7 +22,8 @@ future<::shared_ptr> redirect_statement const locator::tablet_replica& target, db::timeout_clock::time_point timeout, bool is_write, - service::strong_consistency::stats& stats); + service::strong_consistency::stats& stats, + noncopyable_function on_node_resolved = {}); bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name); diff --git a/service/strong_consistency/coordinator.cc b/service/strong_consistency/coordinator.cc index ec55b8dcad..4e71f19ece 100644 --- a/service/strong_consistency/coordinator.cc +++ b/service/strong_consistency/coordinator.cc @@ -181,7 +181,24 @@ static locator::tablet_replica select_closest_replica(const gms::gossiper& gossi return *it; } -auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as) +static need_redirect redirect_to_leader(locator::tablet_replica target, groups_manager& gm, raft::group_id group_id) { + return { + .target = target, + // The `local()` here is needed to update the cache on the shard handling + // the client request which may be different from the shard currently + // executing the statement. + .on_node_resolved = [container = &gm.container(), group_id] (locator::host_id leader) { + container->local().leader_cache().put(group_id, leader); + }, + }; +} + +static need_redirect redirect_to_replica(locator::tablet_replica target) { + // When redirecting to a replica, there's no need to update the leader cache + return { .target = target }; +} + +auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as, bool use_leader_cache) -> future> { auto erm = schema.table().get_effective_replication_map(); @@ -204,14 +221,27 @@ auto coordinator::create_operation_ctx(const schema& schema, const dht::token& t const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(schema.id()); const auto tablet_id = tablet_map.get_tablet_id(token); const auto& tablet_info = tablet_map.get_tablet_info(tablet_id); + const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id); if (!contains(tablet_info.replicas, this_replica)) { - co_return need_redirect { - select_closest_replica(_gossiper, tablet_info.replicas, token, - erm->get_token_metadata().get_topology()) - }; + // For writes, check the leader cache to avoid an extra roundtrip. + // For now, reads skip the cache because any replica can serve them. + if (use_leader_cache) { + if (const auto cached = _groups_manager.leader_cache().get(raft_info.group_id)) { + if (const auto* target = find_replica(tablet_info, *cached)) { + co_return redirect_to_leader(*target, _groups_manager, raft_info.group_id); + } + // Cached leader is no longer a replica, evict it. + _groups_manager.leader_cache().erase(raft_info.group_id); + } + } + auto target = select_closest_replica(_gossiper, tablet_info.replicas, token, + erm->get_token_metadata().get_topology()); + if (use_leader_cache) { + co_return redirect_to_leader(target, _groups_manager, raft_info.group_id); + } + co_return redirect_to_replica(target); } - const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id); co_await utils::get_local_injector().inject("sc_coordinator_wait_before_acquire_server", utils::wait_for_message(5min)); @@ -250,9 +280,9 @@ future> coordinator::mutate(schema_ptr schema, bool commit_status_unknown_ex = false; try { - auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source()); - if (const auto* redirect = get_if(&op_result)) { - co_return *redirect; + auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source(), true); + if (auto* redirect = get_if(&op_result)) { + co_return std::move(*redirect); } auto& op = get(op_result); @@ -270,7 +300,7 @@ future> coordinator::mutate(schema_ptr schema, schema->ks_name(), schema->cf_name(), op.tablet_id, leader_host_id, op.tablet_info.replicas)); } - co_return need_redirect{*target}; + co_return redirect_to_leader(*target, _groups_manager, op.raft_info.group_id); } if (auto* wait_for_leader = get_if(&disposition)) { co_await std::move(wait_for_leader->future); @@ -373,9 +403,9 @@ auto coordinator::query(schema_ptr schema, auto mark_read_latency = defer([this, &lc] { _stats.read.mark(lc.stop().latency()); }); try { - auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source()); - if (const auto* redirect = get_if(&op_result)) { - co_return *redirect; + auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source(), false); + if (auto* redirect = get_if(&op_result)) { + co_return std::move(*redirect); } auto& op = get(op_result); diff --git a/service/strong_consistency/coordinator.hh b/service/strong_consistency/coordinator.hh index 57be3892a2..dd1500b345 100644 --- a/service/strong_consistency/coordinator.hh +++ b/service/strong_consistency/coordinator.hh @@ -12,6 +12,7 @@ #include "query/query-result.hh" #include "utils/histogram.hh" #include +#include namespace gms { @@ -25,6 +26,7 @@ class groups_manager; struct need_redirect { locator::tablet_replica target; + noncopyable_function on_node_resolved; }; template using value_or_redirect = std::variant; @@ -61,7 +63,8 @@ private: struct operation_ctx; future> create_operation_ctx(const schema& schema, const dht::token& token, - abort_source& as); + abort_source& as, + bool use_leader_cache); public: coordinator(groups_manager& groups_manager, replica::database& db, gms::gossiper& gossiper); diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index bc94a61d01..c395f0716f 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -48,28 +48,6 @@ public: } }; -static void for_each_sc_tablet(const token_metadata& tm, - noncopyable_function&& func) -{ - const auto this_replica = locator::tablet_replica { - .host = tm.get_my_id(), - .shard = this_shard_id() - }; - const auto& tablets = tm.tablets(); - for (const auto& [table_id, _]: tablets.all_table_groups()) { - const auto& tablet_map = tablets.get_tablet_map(table_id); - if (!tablet_map.has_raft_info()) { - continue; - } - for (const auto& tablet_id: tablet_map.tablet_ids()) { - if (tablet_map.has_replica(tablet_id, this_replica)) { - const auto group_id = tablet_map.get_tablet_raft_info(tablet_id).group_id; - func(global_tablet_id{table_id, tablet_id}, group_id); - } - } - } -} - raft_server::raft_server(groups_manager::raft_group_state& state, gate::holder holder) : _state(state) , _holder(std::move(holder)) @@ -368,44 +346,64 @@ void groups_manager::update(token_metadata_ptr new_tm) { state.has_tablet = false; } - for_each_sc_tablet(*new_tm, [&](global_tablet_id tablet, raft::group_id id) { - auto& state = _raft_groups[id]; - state.has_tablet = true; - - // Don't start the raft server if it is already (started or starting) and not stopping. - if (state.gate && !state.gate->is_closed()) { - return; + const auto this_replica = locator::tablet_replica { + .host = new_tm->get_my_id(), + .shard = this_shard_id() + }; + _leader_cache.begin_sweep(); + const auto& tablets = new_tm->tablets(); + for (const auto& [table_id, _]: tablets.all_table_groups()) { + const auto& tablet_map = tablets.get_tablet_map(table_id); + if (!tablet_map.has_raft_info()) { + continue; } + for (const auto& tid: tablet_map.tablet_ids()) { + const auto id = tablet_map.get_tablet_raft_info(tid).group_id; + const auto tablet = global_tablet_id{table_id, tid}; - logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id); - state.gate = make_lw_shared(); - _starting_groups.push_back(state); - state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> { - co_await state.server_control_op.get_future(); - co_await start_raft_group(tablet, id, std::move(new_tm)); - state.server = &_raft_gr.get_server(id); - state.leader_info_updater = leader_info_updater(state, tablet, id); + _leader_cache.mark_seen(id); + if (!tablet_map.has_replica(tid, this_replica)) { + continue; + } + auto& state = _raft_groups[id]; + state.has_tablet = true; - // We want to make sure the server is ready to serve requests before - // we report it as started in wait_for_groups_to_start(). - abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60)); - while (true) { - auto srv = raft_server(state, state.gate->hold()); - auto res = srv.begin_mutate(aoe.abort_source()); - if (auto w = get_if(&res)) { - co_await std::move(w->future); - } else { - break; - } + // Don't start the raft server if it is already (started or starting) and not stopping. + if (state.gate && !state.gate->is_closed()) { + continue; } - _starting_groups.erase(_starting_groups.iterator_to(state)); + logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id); + state.gate = make_lw_shared(); + _starting_groups.push_back(state); + state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> { + co_await state.server_control_op.get_future(); + co_await start_raft_group(tablet, id, std::move(new_tm)); + state.server = &_raft_gr.get_server(id); + state.leader_info_updater = leader_info_updater(state, tablet, id); - logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id); - }); - }); + // We want to make sure the server is ready to serve requests before + // we report it as started in wait_for_groups_to_start(). + abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60)); + while (true) { + auto srv = raft_server(state, state.gate->hold()); + auto res = srv.begin_mutate(aoe.abort_source()); + if (auto w = get_if(&res)) { + co_await std::move(w->future); + } else { + break; + } + } + + _starting_groups.erase(_starting_groups.iterator_to(state)); + + logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id); + }); + } + } schedule_raft_groups_deletion(false); + _leader_cache.end_sweep(); } future groups_manager::acquire_server(table_id table_id, raft::group_id group_id, abort_source& as) { diff --git a/service/strong_consistency/groups_manager.hh b/service/strong_consistency/groups_manager.hh index 09a231d4a9..e81de65c75 100644 --- a/service/strong_consistency/groups_manager.hh +++ b/service/strong_consistency/groups_manager.hh @@ -29,6 +29,57 @@ namespace service::strong_consistency { class raft_server; +/// A cache of leader locations for raft groups where this node is not a replica. +/// Populated by the CQL transport layer after a redirect reveals the actual leader. +/// +/// Uses a sweep-based eviction strategy tied to token_metadata updates: +/// begin_sweep() before iterating tablets, mark_seen() for each existing group, +/// end_sweep() to evict entries whose groups no longer exist. +class tablet_group_leader_cache { + struct entry { + locator::host_id leader; + bool seen = false; + }; + std::unordered_map _entries; + +public: + void put(raft::group_id group, locator::host_id leader) { + auto [it, inserted] = _entries.try_emplace(group, entry{leader}); + if (!inserted) { + it->second.leader = leader; + } + } + + std::optional get(raft::group_id group) const { + auto it = _entries.find(group); + if (it != _entries.end()) { + return it->second.leader; + } + return std::nullopt; + } + + void erase(raft::group_id group) { + _entries.erase(group); + } + + void begin_sweep() { + for (auto& [_, e] : _entries) { + e.seen = false; + } + } + + void mark_seen(raft::group_id group) { + auto it = _entries.find(group); + if (it != _entries.end()) { + it->second.seen = true; + } + } + + void end_sweep() { + std::erase_if(_entries, [](const auto& p) { return !p.second.seen; }); + } +}; + /// A sharded service responsible for the lifecycle and access /// management of all Raft groups for strongly consistent tablets hosted on this node. /// @@ -88,6 +139,8 @@ class groups_manager : public peering_sharded_service { locator::token_metadata_ptr _pending_tm = nullptr; bool _started = false; + tablet_group_leader_cache _leader_cache; + // Should be called on the shard that hosts the Raft group future<> start_raft_group(locator::global_tablet_id tablet, raft::group_id group_id, @@ -132,6 +185,8 @@ public: // until the raft groups for those tablets are started and ready to serve queries. // For the local node, waits directly without an RPC. future<> wait_for_table_raft_groups_on_all_hosts(table_id table, lowres_clock::time_point timeout); + + tablet_group_leader_cache& leader_cache() { return _leader_cache; } }; /// A temporary, RAII-style handle to an active Raft group server instance, diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 1ae12e97eb..b4e27dd583 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -1367,3 +1367,102 @@ async def test_abort_state_machine_apply_during_shutdown(manager: ManagerClient) rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 0", host=target_host) assert len(rows) == 1 assert rows[0].v == 13 + +async def test_leader_cache_eliminates_redirect(manager: ManagerClient): + """ + Verify that after a non-replica node learns the leader location via a redirect, + subsequent write requests from that node go directly to the leader without a redirect. + + Uses 4 nodes in 2 racks (2 per rack) with RF=2 and 1 tablet. + Tablet-aware replication places one replica per rack. + The non-replica node in the non-leader rack always picks the same-rack + (non-leader) replica as closest, causing a redirect to the leader. + After the first write populates the cache, subsequent writes skip the redirect. + + We verify this via the scylla_transport_requests_forwarded_redirected metric + on the non-replica node. + """ + cmdline = [ + '--logger-log-level', 'cql_server=trace', + '--experimental-features', 'strongly-consistent-tables', + ] + # 2 racks with 2 nodes each + property_file = [ + {"dc": "dc1", "rack": "rack1"}, + {"dc": "dc1", "rack": "rack1"}, + {"dc": "dc1", "rack": "rack2"}, + {"dc": "dc1", "rack": "rack2"}, + ] + servers = await manager.servers_add(4, cmdline=cmdline, property_file=property_file) + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}" + " AND tablets = {'initial': 1} AND consistency = 'global'") + async with new_test_keyspace(manager, ks_opts) as ks: + async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table: + table_name = table.split('.')[-1] + group_id = await get_table_raft_group_id(manager, ks, table_name) + + try: + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + except: + leader_host_id = await wait_for_leader(manager, servers[1], group_id) + + tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, table_name, 0) + assert len(tablet_replicas) == 2 + replica_host_ids = [replica[0] for replica in tablet_replicas] + + # Find the rack of the leader + leader_server = next(s for hid, s in zip(host_ids, servers) if str(hid) == leader_host_id) + leader_rack = leader_server.rack + + # Pick the non-replica in the non-leader rack. + # This node's closest replica is the same-rack (non-leader) replica, + # so writes always cause a redirect on the first attempt. + non_replica_host_id = None + for hid, s in zip(host_ids, servers): + if str(hid) not in replica_host_ids and s.rack != leader_rack: + non_replica_host_id = hid + break + assert non_replica_host_id is not None, "Could not find non-replica in non-leader rack" + non_replica_server = next(s for hid, s in zip(host_ids, servers) if hid == non_replica_host_id) + non_replica_host = host_by_host_id(non_replica_host_id) + + logger.info(f"Non-replica node: {non_replica_host} (rack {non_replica_server.rack}), leader: {leader_host_id} (rack {leader_rack})") + + # Warmup phase: run enough requests to populate the leader cache + # on all shards of the non-replica node. The driver may distribute + # requests across multiple connections (shards), so we need to + # warm them all up. 20 requests should be enough to cover all shards. + for i in range(20): + await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host) + + # Measure redirect counter after warmup (all shards should be warm). + metrics = await manager.metrics.query(non_replica_host.address) + redirects_before = metrics.get('scylla_transport_requests_forwarded_redirected') or 0 + + # Run another batch of requests; all should use the cached leader + # and NOT cause any redirects. + num_requests = 20 + for i in range(20, 20 + num_requests): + await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host) + + metrics = await manager.metrics.query(non_replica_host.address) + redirects_after = metrics.get('scylla_transport_requests_forwarded_redirected') or 0 + + new_redirects = redirects_after - redirects_before + logger.info(f"Redirects before: {redirects_before}, after: {redirects_after}, new: {new_redirects}") + assert new_redirects == 0, \ + f"Expected no new redirects after cache warmup, but got {new_redirects}" + + # Verify data correctness + rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 25", host=non_replica_host) + assert len(rows) == 1 + assert rows[0].value == 25 diff --git a/transport/messages/result_message.hh b/transport/messages/result_message.hh index b402970167..8efd67f7e5 100644 --- a/transport/messages/result_message.hh +++ b/transport/messages/result_message.hh @@ -26,6 +26,7 @@ #include #include #include +#include namespace cql_transport { @@ -100,15 +101,18 @@ class result_message::bounce : public result_message { cql3::computed_function_values _cached_fn_calls; std::optional _timeout; std::optional _is_write; + noncopyable_function _on_node_resolved; public: bounce(locator::host_id host, unsigned shard, cql3::computed_function_values cached_fn_calls, - std::optional timeout = std::nullopt, std::optional is_write = std::nullopt) + std::optional timeout = std::nullopt, std::optional is_write = std::nullopt, + noncopyable_function on_node_resolved = {}) : _host(host) , _shard(shard) , _cached_fn_calls(std::move(cached_fn_calls)) , _timeout(std::move(timeout)) , _is_write(std::move(is_write)) + , _on_node_resolved(std::move(on_node_resolved)) {} virtual void accept(result_message::visitor& v) const override { v.visit(*this); @@ -136,6 +140,10 @@ public: cql3::computed_function_values&& take_cached_pk_function_calls() { return std::move(_cached_fn_calls); } + + const noncopyable_function& on_node_resolved() const { + return _on_node_resolved; + } }; std::ostream& operator<<(std::ostream& os, const result_message::bounce& msg); diff --git a/transport/server.cc b/transport/server.cc index 9c909cfb29..15eb5b21e2 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -538,7 +538,7 @@ future cql_server::handle_forward_execute( }; } -future>> +future cql_server::forward_cql( locator::host_id target_host, unsigned target_shard, @@ -585,7 +585,10 @@ cql_server::forward_cql( _stats.requests_forwarded_successfully++; clogger.trace("Forwarded CQL request executed successfully on replica: {}", target_host); tracing::trace(trace_state, "Forwarded CQL request executed successfully on replica: {}", target_host); - co_return std::make_unique(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body)); + co_return forward_cql_result{ + .response = std::make_unique(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body)), + .final_host = current_host, + }; case forward_cql_status::error: { _stats.requests_forwarded_failed++; clogger.trace("Forwarded CQL request failed on replica: {}", target_host); @@ -593,9 +596,15 @@ cql_server::forward_cql( auto result = std::make_unique(stream, cql_binary_opcode::ERROR, response.response_flags, std::move(response.response_body)); if (response.timeout) { - co_return co_await sleep_until_timeout_passes(*response.timeout, std::move(result)); + co_return forward_cql_result{ + .response = co_await sleep_until_timeout_passes(*response.timeout, std::move(result)), + .final_host = current_host, + }; } - co_return std::move(result); + co_return forward_cql_result{ + .response = std::move(result), + .final_host = current_host, + }; } case forward_cql_status::prepared_not_found: { _stats.requests_forwarded_prepared_not_found++; @@ -1909,11 +1918,16 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c .cached_fn_calls = std::move(cached_fn_calls), }; - auto response = co_await forward_cql( + auto result = co_await forward_cql( target_host, shard, (*bounce_msg)->timeout().value(), (*bounce_msg)->is_write().value(), stream, trace_state, std::move(req)); - co_return cql_server::result_with_foreign_response_ptr(std::move(response)); + const auto& on_node_resolved = (*bounce_msg)->on_node_resolved(); + if (on_node_resolved) { + on_node_resolved(result.final_host); + } + + co_return cql_server::result_with_foreign_response_ptr(std::move(result.response)); } } co_return std::move(msg); diff --git a/transport/server.hh b/transport/server.hh index 7768afca33..ff02593996 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -380,7 +380,12 @@ private: void init_messaging_service(); future<> uninit_messaging_service(); future handle_forward_execute(service::query_state& qs, forward_cql_execute_request& req); - future>> forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout, + + struct forward_cql_result { + foreign_ptr> response; + locator::host_id final_host; + }; + future forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout, bool is_write, uint16_t stream, tracing::trace_state_ptr trace_state, forward_cql_execute_request req); virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) override;