diff --git a/db/hints/internal/hint_sender.cc b/db/hints/internal/hint_sender.cc index 52927c9d0b..4af54ee145 100644 --- a/db/hints/internal/hint_sender.cc +++ b/db/hints/internal/hint_sender.cc @@ -246,9 +246,10 @@ void hint_sender::start() { future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) { auto ermp = _db.find_column_family(m.s).get_effective_replication_map(); auto token = dht::get_token(*m.s, m.fm.key()); - host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(std::move(token)); + host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(token); + host_id_vector_topology_change pending_endpoints = ermp->get_pending_replicas(token); - return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> { + return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints, &pending_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. const auto& tm = ermp->get_token_metadata(); @@ -256,7 +257,8 @@ future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) { if (std::ranges::contains(natural_endpoints, dst) && !tm.is_leaving(dst)) { manager_logger.trace("hint_sender[{}]:send_one_mutation: Sending directly", dst); - return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst); + // dst is not duplicated in pending_endpoints because it's in natural_endpoints + return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst, std::move(pending_endpoints)); } else { if (manager_logger.is_enabled(log_level::trace)) { if (tm.is_leaving(end_point_key())) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0bab4cbbf0..3bb2745e5a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4255,12 +4255,12 @@ future<> storage_proxy::send_to_endpoint( cancellable); } -future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target) { +future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target, host_id_vector_topology_change pending_endpoints) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(ermp), std::move(target), - host_id_vector_topology_change{}, + std::move(pending_endpoints), db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 58ea704f57..882df04fdf 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -647,7 +647,7 @@ public: // Send a mutation to a specific remote target as a hint. // Unlike regular mutations during write operations, hints are sent on the streaming connection // and use different RPC verb. - future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target); + future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target, host_id_vector_topology_change pending_endpoints); /** * Performs the truncate operatoin, which effectively deletes all data from diff --git a/service/storage_service.cc b/service/storage_service.cc index 839d3945de..e78a7ece55 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6406,7 +6406,9 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto& table = db.find_column_family(tablet.table); return table.maybe_split_compaction_group_of(tablet.tablet); }); - + co_await utils::get_local_injector().inject("pause_after_streaming_tablet", [] (auto& handler) { + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(1)); + }); co_return tablet_operation_result(); }); } diff --git a/test/cluster/test_hints.py b/test/cluster/test_hints.py index 68ccd8fa55..579157059d 100644 --- a/test/cluster/test_hints.py +++ b/test/cluster/test_hints.py @@ -16,6 +16,7 @@ from cassandra.query import SimpleStatement, ConsistencyLevel from test.pylib.internal_types import ServerInfo from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error +from test.pylib.tablets import get_tablet_replicas from test.pylib.util import wait_for from test.cluster.conftest import skip_mode @@ -321,3 +322,59 @@ async def test_canceling_hint_draining(manager: ManagerClient): # Make sure draining finishes successfully. assert await_sync_point(s1, sync_point, 60) await s1_log.wait_for(f"Removed hint directory for {host_id2}") + +@pytest.mark.asyncio +@skip_mode("release", "error injections are not supported in release mode") +async def test_hint_to_pending(manager: ManagerClient): + """ + This test reproduces the scenario where sending a hint to a pending replica is needed + for consistency as in https://github.com/scylladb/scylladb/issues/19835. + In the test, we have 2 servers and a table with RF=1. One server is stopped, and we + perform a write generating a hint to it. Then, we start the stopped server again and + immediately request a tablet migration from that server. The hint is sent after the + tablet migration performs streaming but before it completes. The order of operations + is induced using error injections. + At the end, we verify that the hint was successfully applied. + """ + servers = await manager.servers_add(2, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r1"}, + ]) + cql = await manager.get_cql_exclusive(servers[0]) + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + await manager.api.disable_tablet_balancing(servers[1].ip_addr) + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: + table = f"{ks}.t" + await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)") + replica = (await get_tablet_replicas(manager, servers[0], ks, "t", 0))[0] + host_ids = [await manager.get_host_id(server.server_id) for server in servers] + if replica[0] != host_ids[1]: + # We'll use server 0 as the source of the hint, so the tablet replica needs to be on server 1 + await manager.api.move_tablet(servers[0].ip_addr, ks, "t", replica[0], replica[1], host_ids[1], 0, 0) + + await manager.server_stop_gracefully(servers[1].server_id) + await manager.others_not_see_server(servers[1].ip_addr) + + await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES (0, 0)", consistency_level=ConsistencyLevel.ANY)) + + await manager.api.enable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay", False) + await manager.server_start(servers[1].server_id) + sync_point = create_sync_point(servers[0]) + + await manager.api.enable_injection(servers[0].ip_addr, "pause_after_streaming_tablet", False) + tablet_migration = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "t", host_ids[1], 0, host_ids[0], 0, 0)) + + async def migration_reached_streaming(): + stages = await cql.run_async(f"SELECT stage FROM system.tablets WHERE keyspace_name='{ks}' ALLOW FILTERING") + logger.info(f"Current stages: {[row.stage for row in stages]}") + return set(["streaming"]) == set([row.stage for row in stages]) or None + await wait_for(migration_reached_streaming, time.time() + 60) + + await manager.api.disable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay") + assert await_sync_point(servers[0], sync_point, 30) + + await manager.api.message_injection(servers[0].ip_addr, "pause_after_streaming_tablet") + await asyncio.wait([tablet_migration]) + + assert list(await cql.run_async(f"SELECT v FROM {table} WHERE pk = 0")) == [(0,)]