From 13c043903dd3189ea2bb1cbbae63994932d72384 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Thu, 9 Apr 2026 00:08:55 +0200 Subject: [PATCH] strong_consistency: cache leader location for non-replica nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a non-replica node handles a strongly consistent write, it must forward the request to a replica. If the closest replica is not the leader, the request gets redirected again, causing an extra roundtrip. Add a leader location cache in groups_manager, keyed by raft group_id. After a write request is forwarded, the CQL transport layer records the final node as the leader in the cache. Subsequent write requests from the same node for the same group are forwarded directly to the cached leader, eliminating the extra roundtrip. The cache is only used for writes. Reads can be served by any replica, so they skip the cache and use proximity-based routing instead. Cache entries are validated at use time: if the cached leader is no longer a replica (e.g. after tablet migration), the entry is evicted and the normal closest-replica path is taken. This prevents a scenario where two nodes keep redirecting to each other because both think that the other is the leader but actually both are non-replicas - such loop is broken as soon as the tablet maps are updated. On token_metadata updates, entries for groups that no longer exist (e.g. table dropped, tablet merged) are evicted. Entries for groups that still exist are kept — use-time validation handles staleness. An on_node_resolved callback is propagated through the redirect/bounce path so the transport layer can update the cache generically without coupling to the strong-consistency coordinator. The coordinator creates the callback only for writes (capturing the groups_manager and group_id) and attaches it to the bounce message; the transport layer invokes it once the final node is known, keeping the forwarding infrastructure subsystem-agnostic. We also add a test which verifies that after the initial redirect, following requests to the same node avoid the extra redirect and forward directly to the leader. Fixes: SCYLLADB-1064 Closes scylladb/scylladb#29392 --- cql3/query_processor.cc | 9 +- cql3/query_processor.hh | 3 +- .../modification_statement.cc | 6 +- .../strong_consistency/select_statement.cc | 4 +- .../strong_consistency/statement_helpers.cc | 5 +- .../strong_consistency/statement_helpers.hh | 4 +- service/strong_consistency/coordinator.cc | 56 +++++++--- service/strong_consistency/coordinator.hh | 5 +- service/strong_consistency/groups_manager.cc | 102 +++++++++--------- service/strong_consistency/groups_manager.hh | 55 ++++++++++ test/cluster/test_strong_consistency.py | 99 +++++++++++++++++ transport/messages/result_message.hh | 10 +- transport/server.cc | 26 +++-- transport/server.hh | 7 +- 14 files changed, 306 insertions(+), 85 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 31b7a73aeb..33a73a7f7f 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -1289,9 +1289,14 @@ shared_ptr query_processor::bounce_to_s return ::make_shared(my_host_id, shard, std::move(cached_fn_calls)); } -shared_ptr query_processor::bounce_to_node(locator::tablet_replica replica, cql3::computed_function_values cached_fn_calls, seastar::lowres_clock::time_point timeout, bool is_write) { +shared_ptr query_processor::bounce_to_node( + locator::tablet_replica replica, + cql3::computed_function_values cached_fn_calls, + seastar::lowres_clock::time_point timeout, + bool is_write, + noncopyable_function on_node_resolved) { get_cql_stats().forwarded_requests++; - return ::make_shared(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write); + return ::make_shared(replica.host, replica.shard, std::move(cached_fn_calls), timeout, is_write, std::move(on_node_resolved)); } query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) { diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 2bcd8d584f..aca82d011b 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -522,7 +522,8 @@ public: locator::tablet_replica replica, cql3::computed_function_values cached_fn_calls, seastar::lowres_clock::time_point timeout, - bool is_write); + bool is_write, + noncopyable_function on_node_resolved = {}); void update_authorized_prepared_cache_config(); diff --git a/cql3/statements/strong_consistency/modification_statement.cc b/cql3/statements/strong_consistency/modification_statement.cc index 1e0783870f..ae02c1ca7f 100644 --- a/cql3/statements/strong_consistency/modification_statement.cc +++ b/cql3/statements/strong_consistency/modification_statement.cc @@ -62,7 +62,7 @@ future> modification_statement::execute_without_check auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator(); - const auto mutate_result = co_await coordinator.get().mutate(_statement->s, + auto mutate_result = co_await coordinator.get().mutate(_statement->s, keys[0].start()->value().token(), [&](api::timestamp_type ts) { const auto prefetch_data = update_parameters::prefetch_data(_statement->s); @@ -78,9 +78,9 @@ future> modification_statement::execute_without_check }, timeout, qs.get_client_state().get_abort_source()); using namespace service::strong_consistency; - if (const auto* redirect = get_if(&mutate_result)) { + if (auto* redirect = get_if(&mutate_result)) { bool is_write = true; - co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats()); + co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved)); } utils::get_local_injector().inject("sc_modification_statement_timeout", [&] { throw exceptions::mutation_write_timeout_exception{"", "", options.get_consistency(), 0, 0, db::write_type::SIMPLE}; diff --git a/cql3/statements/strong_consistency/select_statement.cc b/cql3/statements/strong_consistency/select_statement.cc index 7d0454b8a8..612eb3978a 100644 --- a/cql3/statements/strong_consistency/select_statement.cc +++ b/cql3/statements/strong_consistency/select_statement.cc @@ -55,9 +55,9 @@ future<::shared_ptr> select_statement::do_execute(query_processo key_ranges, state.get_trace_state(), timeout, state.get_client_state().get_abort_source()); using namespace service::strong_consistency; - if (const auto* redirect = get_if(&query_result)) { + if (auto* redirect = get_if(&query_result)) { bool is_write = false; - co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats()); + co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats(), std::move(redirect->on_node_resolved)); } co_return co_await process_results(get>(std::move(query_result)), diff --git a/cql3/statements/strong_consistency/statement_helpers.cc b/cql3/statements/strong_consistency/statement_helpers.cc index 244eeefd29..e3e1106593 100644 --- a/cql3/statements/strong_consistency/statement_helpers.cc +++ b/cql3/statements/strong_consistency/statement_helpers.cc @@ -20,13 +20,14 @@ future<::shared_ptr> redirect_statement const locator::tablet_replica& target, db::timeout_clock::time_point timeout, bool is_write, - service::strong_consistency::stats& stats) + service::strong_consistency::stats& stats, + noncopyable_function on_node_resolved) { auto&& func_values_cache = const_cast(options).take_cached_pk_function_calls(); const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id(); if (target.host != my_host_id) { ++(is_write ? stats.write_node_bounces : stats.read_node_bounces); - co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write); + co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write, std::move(on_node_resolved)); } ++(is_write ? stats.write_shard_bounces : stats.read_shard_bounces); co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache)); diff --git a/cql3/statements/strong_consistency/statement_helpers.hh b/cql3/statements/strong_consistency/statement_helpers.hh index d0a6b33879..02db635be1 100644 --- a/cql3/statements/strong_consistency/statement_helpers.hh +++ b/cql3/statements/strong_consistency/statement_helpers.hh @@ -10,6 +10,7 @@ #include "cql3/cql_statement.hh" #include "locator/tablets.hh" +#include namespace service::strong_consistency { struct stats; } @@ -21,7 +22,8 @@ future<::shared_ptr> redirect_statement const locator::tablet_replica& target, db::timeout_clock::time_point timeout, bool is_write, - service::strong_consistency::stats& stats); + service::strong_consistency::stats& stats, + noncopyable_function on_node_resolved = {}); bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name); diff --git a/service/strong_consistency/coordinator.cc b/service/strong_consistency/coordinator.cc index ec55b8dcad..4e71f19ece 100644 --- a/service/strong_consistency/coordinator.cc +++ b/service/strong_consistency/coordinator.cc @@ -181,7 +181,24 @@ static locator::tablet_replica select_closest_replica(const gms::gossiper& gossi return *it; } -auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as) +static need_redirect redirect_to_leader(locator::tablet_replica target, groups_manager& gm, raft::group_id group_id) { + return { + .target = target, + // The `local()` here is needed to update the cache on the shard handling + // the client request which may be different from the shard currently + // executing the statement. + .on_node_resolved = [container = &gm.container(), group_id] (locator::host_id leader) { + container->local().leader_cache().put(group_id, leader); + }, + }; +} + +static need_redirect redirect_to_replica(locator::tablet_replica target) { + // When redirecting to a replica, there's no need to update the leader cache + return { .target = target }; +} + +auto coordinator::create_operation_ctx(const schema& schema, const dht::token& token, abort_source& as, bool use_leader_cache) -> future> { auto erm = schema.table().get_effective_replication_map(); @@ -204,14 +221,27 @@ auto coordinator::create_operation_ctx(const schema& schema, const dht::token& t const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(schema.id()); const auto tablet_id = tablet_map.get_tablet_id(token); const auto& tablet_info = tablet_map.get_tablet_info(tablet_id); + const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id); if (!contains(tablet_info.replicas, this_replica)) { - co_return need_redirect { - select_closest_replica(_gossiper, tablet_info.replicas, token, - erm->get_token_metadata().get_topology()) - }; + // For writes, check the leader cache to avoid an extra roundtrip. + // For now, reads skip the cache because any replica can serve them. + if (use_leader_cache) { + if (const auto cached = _groups_manager.leader_cache().get(raft_info.group_id)) { + if (const auto* target = find_replica(tablet_info, *cached)) { + co_return redirect_to_leader(*target, _groups_manager, raft_info.group_id); + } + // Cached leader is no longer a replica, evict it. + _groups_manager.leader_cache().erase(raft_info.group_id); + } + } + auto target = select_closest_replica(_gossiper, tablet_info.replicas, token, + erm->get_token_metadata().get_topology()); + if (use_leader_cache) { + co_return redirect_to_leader(target, _groups_manager, raft_info.group_id); + } + co_return redirect_to_replica(target); } - const auto& raft_info = tablet_map.get_tablet_raft_info(tablet_id); co_await utils::get_local_injector().inject("sc_coordinator_wait_before_acquire_server", utils::wait_for_message(5min)); @@ -250,9 +280,9 @@ future> coordinator::mutate(schema_ptr schema, bool commit_status_unknown_ex = false; try { - auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source()); - if (const auto* redirect = get_if(&op_result)) { - co_return *redirect; + auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source(), true); + if (auto* redirect = get_if(&op_result)) { + co_return std::move(*redirect); } auto& op = get(op_result); @@ -270,7 +300,7 @@ future> coordinator::mutate(schema_ptr schema, schema->ks_name(), schema->cf_name(), op.tablet_id, leader_host_id, op.tablet_info.replicas)); } - co_return need_redirect{*target}; + co_return redirect_to_leader(*target, _groups_manager, op.raft_info.group_id); } if (auto* wait_for_leader = get_if(&disposition)) { co_await std::move(wait_for_leader->future); @@ -373,9 +403,9 @@ auto coordinator::query(schema_ptr schema, auto mark_read_latency = defer([this, &lc] { _stats.read.mark(lc.stop().latency()); }); try { - auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source()); - if (const auto* redirect = get_if(&op_result)) { - co_return *redirect; + auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source(), false); + if (auto* redirect = get_if(&op_result)) { + co_return std::move(*redirect); } auto& op = get(op_result); diff --git a/service/strong_consistency/coordinator.hh b/service/strong_consistency/coordinator.hh index 57be3892a2..dd1500b345 100644 --- a/service/strong_consistency/coordinator.hh +++ b/service/strong_consistency/coordinator.hh @@ -12,6 +12,7 @@ #include "query/query-result.hh" #include "utils/histogram.hh" #include +#include namespace gms { @@ -25,6 +26,7 @@ class groups_manager; struct need_redirect { locator::tablet_replica target; + noncopyable_function on_node_resolved; }; template using value_or_redirect = std::variant; @@ -61,7 +63,8 @@ private: struct operation_ctx; future> create_operation_ctx(const schema& schema, const dht::token& token, - abort_source& as); + abort_source& as, + bool use_leader_cache); public: coordinator(groups_manager& groups_manager, replica::database& db, gms::gossiper& gossiper); diff --git a/service/strong_consistency/groups_manager.cc b/service/strong_consistency/groups_manager.cc index bc94a61d01..c395f0716f 100644 --- a/service/strong_consistency/groups_manager.cc +++ b/service/strong_consistency/groups_manager.cc @@ -48,28 +48,6 @@ public: } }; -static void for_each_sc_tablet(const token_metadata& tm, - noncopyable_function&& func) -{ - const auto this_replica = locator::tablet_replica { - .host = tm.get_my_id(), - .shard = this_shard_id() - }; - const auto& tablets = tm.tablets(); - for (const auto& [table_id, _]: tablets.all_table_groups()) { - const auto& tablet_map = tablets.get_tablet_map(table_id); - if (!tablet_map.has_raft_info()) { - continue; - } - for (const auto& tablet_id: tablet_map.tablet_ids()) { - if (tablet_map.has_replica(tablet_id, this_replica)) { - const auto group_id = tablet_map.get_tablet_raft_info(tablet_id).group_id; - func(global_tablet_id{table_id, tablet_id}, group_id); - } - } - } -} - raft_server::raft_server(groups_manager::raft_group_state& state, gate::holder holder) : _state(state) , _holder(std::move(holder)) @@ -368,44 +346,64 @@ void groups_manager::update(token_metadata_ptr new_tm) { state.has_tablet = false; } - for_each_sc_tablet(*new_tm, [&](global_tablet_id tablet, raft::group_id id) { - auto& state = _raft_groups[id]; - state.has_tablet = true; - - // Don't start the raft server if it is already (started or starting) and not stopping. - if (state.gate && !state.gate->is_closed()) { - return; + const auto this_replica = locator::tablet_replica { + .host = new_tm->get_my_id(), + .shard = this_shard_id() + }; + _leader_cache.begin_sweep(); + const auto& tablets = new_tm->tablets(); + for (const auto& [table_id, _]: tablets.all_table_groups()) { + const auto& tablet_map = tablets.get_tablet_map(table_id); + if (!tablet_map.has_raft_info()) { + continue; } + for (const auto& tid: tablet_map.tablet_ids()) { + const auto id = tablet_map.get_tablet_raft_info(tid).group_id; + const auto tablet = global_tablet_id{table_id, tid}; - logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id); - state.gate = make_lw_shared(); - _starting_groups.push_back(state); - state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> { - co_await state.server_control_op.get_future(); - co_await start_raft_group(tablet, id, std::move(new_tm)); - state.server = &_raft_gr.get_server(id); - state.leader_info_updater = leader_info_updater(state, tablet, id); + _leader_cache.mark_seen(id); + if (!tablet_map.has_replica(tid, this_replica)) { + continue; + } + auto& state = _raft_groups[id]; + state.has_tablet = true; - // We want to make sure the server is ready to serve requests before - // we report it as started in wait_for_groups_to_start(). - abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60)); - while (true) { - auto srv = raft_server(state, state.gate->hold()); - auto res = srv.begin_mutate(aoe.abort_source()); - if (auto w = get_if(&res)) { - co_await std::move(w->future); - } else { - break; - } + // Don't start the raft server if it is already (started or starting) and not stopping. + if (state.gate && !state.gate->is_closed()) { + continue; } - _starting_groups.erase(_starting_groups.iterator_to(state)); + logger.info("update(): starting raft server for tablet {}, group id {}", tablet, id); + state.gate = make_lw_shared(); + _starting_groups.push_back(state); + state.server_control_op = futurize_invoke([&state, this, tablet, id, new_tm](this auto) -> future<> { + co_await state.server_control_op.get_future(); + co_await start_raft_group(tablet, id, std::move(new_tm)); + state.server = &_raft_gr.get_server(id); + state.leader_info_updater = leader_info_updater(state, tablet, id); - logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id); - }); - }); + // We want to make sure the server is ready to serve requests before + // we report it as started in wait_for_groups_to_start(). + abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds(60)); + while (true) { + auto srv = raft_server(state, state.gate->hold()); + auto res = srv.begin_mutate(aoe.abort_source()); + if (auto w = get_if(&res)) { + co_await std::move(w->future); + } else { + break; + } + } + + _starting_groups.erase(_starting_groups.iterator_to(state)); + + logger.info("update(): raft server for tablet {} and group id {} is started", tablet, id); + }); + } + } schedule_raft_groups_deletion(false); + _leader_cache.end_sweep(); } future groups_manager::acquire_server(table_id table_id, raft::group_id group_id, abort_source& as) { diff --git a/service/strong_consistency/groups_manager.hh b/service/strong_consistency/groups_manager.hh index 09a231d4a9..e81de65c75 100644 --- a/service/strong_consistency/groups_manager.hh +++ b/service/strong_consistency/groups_manager.hh @@ -29,6 +29,57 @@ namespace service::strong_consistency { class raft_server; +/// A cache of leader locations for raft groups where this node is not a replica. +/// Populated by the CQL transport layer after a redirect reveals the actual leader. +/// +/// Uses a sweep-based eviction strategy tied to token_metadata updates: +/// begin_sweep() before iterating tablets, mark_seen() for each existing group, +/// end_sweep() to evict entries whose groups no longer exist. +class tablet_group_leader_cache { + struct entry { + locator::host_id leader; + bool seen = false; + }; + std::unordered_map _entries; + +public: + void put(raft::group_id group, locator::host_id leader) { + auto [it, inserted] = _entries.try_emplace(group, entry{leader}); + if (!inserted) { + it->second.leader = leader; + } + } + + std::optional get(raft::group_id group) const { + auto it = _entries.find(group); + if (it != _entries.end()) { + return it->second.leader; + } + return std::nullopt; + } + + void erase(raft::group_id group) { + _entries.erase(group); + } + + void begin_sweep() { + for (auto& [_, e] : _entries) { + e.seen = false; + } + } + + void mark_seen(raft::group_id group) { + auto it = _entries.find(group); + if (it != _entries.end()) { + it->second.seen = true; + } + } + + void end_sweep() { + std::erase_if(_entries, [](const auto& p) { return !p.second.seen; }); + } +}; + /// A sharded service responsible for the lifecycle and access /// management of all Raft groups for strongly consistent tablets hosted on this node. /// @@ -88,6 +139,8 @@ class groups_manager : public peering_sharded_service { locator::token_metadata_ptr _pending_tm = nullptr; bool _started = false; + tablet_group_leader_cache _leader_cache; + // Should be called on the shard that hosts the Raft group future<> start_raft_group(locator::global_tablet_id tablet, raft::group_id group_id, @@ -132,6 +185,8 @@ public: // until the raft groups for those tablets are started and ready to serve queries. // For the local node, waits directly without an RPC. future<> wait_for_table_raft_groups_on_all_hosts(table_id table, lowres_clock::time_point timeout); + + tablet_group_leader_cache& leader_cache() { return _leader_cache; } }; /// A temporary, RAII-style handle to an active Raft group server instance, diff --git a/test/cluster/test_strong_consistency.py b/test/cluster/test_strong_consistency.py index 1ae12e97eb..b4e27dd583 100644 --- a/test/cluster/test_strong_consistency.py +++ b/test/cluster/test_strong_consistency.py @@ -1367,3 +1367,102 @@ async def test_abort_state_machine_apply_during_shutdown(manager: ManagerClient) rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 0", host=target_host) assert len(rows) == 1 assert rows[0].v == 13 + +async def test_leader_cache_eliminates_redirect(manager: ManagerClient): + """ + Verify that after a non-replica node learns the leader location via a redirect, + subsequent write requests from that node go directly to the leader without a redirect. + + Uses 4 nodes in 2 racks (2 per rack) with RF=2 and 1 tablet. + Tablet-aware replication places one replica per rack. + The non-replica node in the non-leader rack always picks the same-rack + (non-leader) replica as closest, causing a redirect to the leader. + After the first write populates the cache, subsequent writes skip the redirect. + + We verify this via the scylla_transport_requests_forwarded_redirected metric + on the non-replica node. + """ + cmdline = [ + '--logger-log-level', 'cql_server=trace', + '--experimental-features', 'strongly-consistent-tables', + ] + # 2 racks with 2 nodes each + property_file = [ + {"dc": "dc1", "rack": "rack1"}, + {"dc": "dc1", "rack": "rack1"}, + {"dc": "dc1", "rack": "rack2"}, + {"dc": "dc1", "rack": "rack2"}, + ] + servers = await manager.servers_add(4, cmdline=cmdline, property_file=property_file) + (cql, hosts) = await manager.get_ready_cql(servers) + host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers]) + + def host_by_host_id(host_id): + for hid, host in zip(host_ids, hosts): + if hid == host_id: + return host + raise RuntimeError(f"Can't find host for host_id {host_id}") + + ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}" + " AND tablets = {'initial': 1} AND consistency = 'global'") + async with new_test_keyspace(manager, ks_opts) as ks: + async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table: + table_name = table.split('.')[-1] + group_id = await get_table_raft_group_id(manager, ks, table_name) + + try: + leader_host_id = await wait_for_leader(manager, servers[0], group_id) + except: + leader_host_id = await wait_for_leader(manager, servers[1], group_id) + + tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, table_name, 0) + assert len(tablet_replicas) == 2 + replica_host_ids = [replica[0] for replica in tablet_replicas] + + # Find the rack of the leader + leader_server = next(s for hid, s in zip(host_ids, servers) if str(hid) == leader_host_id) + leader_rack = leader_server.rack + + # Pick the non-replica in the non-leader rack. + # This node's closest replica is the same-rack (non-leader) replica, + # so writes always cause a redirect on the first attempt. + non_replica_host_id = None + for hid, s in zip(host_ids, servers): + if str(hid) not in replica_host_ids and s.rack != leader_rack: + non_replica_host_id = hid + break + assert non_replica_host_id is not None, "Could not find non-replica in non-leader rack" + non_replica_server = next(s for hid, s in zip(host_ids, servers) if hid == non_replica_host_id) + non_replica_host = host_by_host_id(non_replica_host_id) + + logger.info(f"Non-replica node: {non_replica_host} (rack {non_replica_server.rack}), leader: {leader_host_id} (rack {leader_rack})") + + # Warmup phase: run enough requests to populate the leader cache + # on all shards of the non-replica node. The driver may distribute + # requests across multiple connections (shards), so we need to + # warm them all up. 20 requests should be enough to cover all shards. + for i in range(20): + await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host) + + # Measure redirect counter after warmup (all shards should be warm). + metrics = await manager.metrics.query(non_replica_host.address) + redirects_before = metrics.get('scylla_transport_requests_forwarded_redirected') or 0 + + # Run another batch of requests; all should use the cached leader + # and NOT cause any redirects. + num_requests = 20 + for i in range(20, 20 + num_requests): + await cql.run_async(f"INSERT INTO {table} (pk, value) VALUES ({i}, {i})", host=non_replica_host) + + metrics = await manager.metrics.query(non_replica_host.address) + redirects_after = metrics.get('scylla_transport_requests_forwarded_redirected') or 0 + + new_redirects = redirects_after - redirects_before + logger.info(f"Redirects before: {redirects_before}, after: {redirects_after}, new: {new_redirects}") + assert new_redirects == 0, \ + f"Expected no new redirects after cache warmup, but got {new_redirects}" + + # Verify data correctness + rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = 25", host=non_replica_host) + assert len(rows) == 1 + assert rows[0].value == 25 diff --git a/transport/messages/result_message.hh b/transport/messages/result_message.hh index b402970167..8efd67f7e5 100644 --- a/transport/messages/result_message.hh +++ b/transport/messages/result_message.hh @@ -26,6 +26,7 @@ #include #include #include +#include namespace cql_transport { @@ -100,15 +101,18 @@ class result_message::bounce : public result_message { cql3::computed_function_values _cached_fn_calls; std::optional _timeout; std::optional _is_write; + noncopyable_function _on_node_resolved; public: bounce(locator::host_id host, unsigned shard, cql3::computed_function_values cached_fn_calls, - std::optional timeout = std::nullopt, std::optional is_write = std::nullopt) + std::optional timeout = std::nullopt, std::optional is_write = std::nullopt, + noncopyable_function on_node_resolved = {}) : _host(host) , _shard(shard) , _cached_fn_calls(std::move(cached_fn_calls)) , _timeout(std::move(timeout)) , _is_write(std::move(is_write)) + , _on_node_resolved(std::move(on_node_resolved)) {} virtual void accept(result_message::visitor& v) const override { v.visit(*this); @@ -136,6 +140,10 @@ public: cql3::computed_function_values&& take_cached_pk_function_calls() { return std::move(_cached_fn_calls); } + + const noncopyable_function& on_node_resolved() const { + return _on_node_resolved; + } }; std::ostream& operator<<(std::ostream& os, const result_message::bounce& msg); diff --git a/transport/server.cc b/transport/server.cc index 9c909cfb29..15eb5b21e2 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -538,7 +538,7 @@ future cql_server::handle_forward_execute( }; } -future>> +future cql_server::forward_cql( locator::host_id target_host, unsigned target_shard, @@ -585,7 +585,10 @@ cql_server::forward_cql( _stats.requests_forwarded_successfully++; clogger.trace("Forwarded CQL request executed successfully on replica: {}", target_host); tracing::trace(trace_state, "Forwarded CQL request executed successfully on replica: {}", target_host); - co_return std::make_unique(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body)); + co_return forward_cql_result{ + .response = std::make_unique(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body)), + .final_host = current_host, + }; case forward_cql_status::error: { _stats.requests_forwarded_failed++; clogger.trace("Forwarded CQL request failed on replica: {}", target_host); @@ -593,9 +596,15 @@ cql_server::forward_cql( auto result = std::make_unique(stream, cql_binary_opcode::ERROR, response.response_flags, std::move(response.response_body)); if (response.timeout) { - co_return co_await sleep_until_timeout_passes(*response.timeout, std::move(result)); + co_return forward_cql_result{ + .response = co_await sleep_until_timeout_passes(*response.timeout, std::move(result)), + .final_host = current_host, + }; } - co_return std::move(result); + co_return forward_cql_result{ + .response = std::move(result), + .final_host = current_host, + }; } case forward_cql_status::prepared_not_found: { _stats.requests_forwarded_prepared_not_found++; @@ -1909,11 +1918,16 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c .cached_fn_calls = std::move(cached_fn_calls), }; - auto response = co_await forward_cql( + auto result = co_await forward_cql( target_host, shard, (*bounce_msg)->timeout().value(), (*bounce_msg)->is_write().value(), stream, trace_state, std::move(req)); - co_return cql_server::result_with_foreign_response_ptr(std::move(response)); + const auto& on_node_resolved = (*bounce_msg)->on_node_resolved(); + if (on_node_resolved) { + on_node_resolved(result.final_host); + } + + co_return cql_server::result_with_foreign_response_ptr(std::move(result.response)); } } co_return std::move(msg); diff --git a/transport/server.hh b/transport/server.hh index 7768afca33..ff02593996 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -380,7 +380,12 @@ private: void init_messaging_service(); future<> uninit_messaging_service(); future handle_forward_execute(service::query_state& qs, forward_cql_execute_request& req); - future>> forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout, + + struct forward_cql_result { + foreign_ptr> response; + locator::host_id final_host; + }; + future forward_cql(locator::host_id target_host, unsigned target_shard, seastar::lowres_clock::time_point timeout, bool is_write, uint16_t stream, tracing::trace_state_ptr trace_state, forward_cql_execute_request req); virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) override;