storage_proxy: send hints to pending replicas

Consider the following scenario:
- Current replica set is [A, B, C]
- write succeeds on [A, B], and a hint is logged for node C
- before the hint is replayed, D bootstraps and the token migrates from C to D
- hint is replayed to node C while D is pending, but it's too late, since streaming for that token is already done
- C is cleaned up, replayed data is lost, and D has a stale copy until next repair.
In the scenario we effectively fail to send the hint. This scenario is also more likely to happen with tablets,
as it can happen for every tablet migration.

This issue is particularly detrimental to materialized views. View updates use hints by default and a specific
view update may be sent to just one view replica (when a single base replica has a different row state due to
reordering or missed writes). When we lose a hint for such a view update, we can generate a persistent inconsistency
between the base and view - ghost rows can appear due to a lost tombstone and rows may be missing in the view due
to a lost row update. Such inconsistencies can't be fixed neither by repairing the view or the base table.

To handle this, in this patch we add the pending replicas to the list of targets of each hint, even if the original
target is still alive.

This will cause some updates to be redundant. These updates are probably unavoidable for now, but they shouldn't
be too common either. The scenarios for them are:
1. managing to send the hint to the source of a migrating replica before streaming that its token - the write will
arrive on the pending replica anyway in streaming
2. the hint target not being the source of the migration - if we managed to apply the original write of the hint to
the actual source of the migration, the pending replica will get it during streaming
3. sending the same hint to many targets at a similar time - while sending to each target, we'll see the same pending
replica for the hint so we'll send it multiple times
4. possible retries where even though the hint was successfully sent to the main target, we failed to send it to the
pending replica, so we need to retry the entire write

This patch handles both tablet migrations and tablet rebuilds. In the future, for tablet migrations, we can avoid
sending the hint to pending replias if the hint target is not the source fo the migration, which would allow us to
avoid the redundant writes 2 and 3. For rack-aware RF, this will be as simple as checking whether the replicas are
in the same rack.

We also add a test case reproducing the issue.

Co-Authored-By: Raphael S. Carvalho <raphaelsc@scylladb.com>

Fixes https://github.com/scylladb/scylladb/issues/19835

Closes scylladb/scylladb#25590

(cherry picked from commit 10b8e1c51c)

Closes scylladb/scylladb#25882
This commit is contained in:
Wojciech Mitros
2025-08-20 03:38:09 +02:00
committed by Pavel Emelyanov
parent 81e4c65f8c
commit 055a6c2cee
5 changed files with 68 additions and 7 deletions

View File

@@ -246,9 +246,10 @@ void hint_sender::start() {
future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) {
auto ermp = _db.find_column_family(m.s).get_effective_replication_map();
auto token = dht::get_token(*m.s, m.fm.key());
host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(std::move(token));
host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(token);
host_id_vector_topology_change pending_endpoints = ermp->get_pending_replicas(token);
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> {
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints, &pending_endpoints] () mutable -> future<> {
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
// to be generated as a result of hints sending.
const auto& tm = ermp->get_token_metadata();
@@ -256,7 +257,8 @@ future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) {
if (std::ranges::contains(natural_endpoints, dst) && !tm.is_leaving(dst)) {
manager_logger.trace("hint_sender[{}]:send_one_mutation: Sending directly", dst);
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst);
// dst is not duplicated in pending_endpoints because it's in natural_endpoints
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst, std::move(pending_endpoints));
} else {
if (manager_logger.is_enabled(log_level::trace)) {
if (tm.is_leaving(end_point_key())) {

View File

@@ -4255,12 +4255,12 @@ future<> storage_proxy::send_to_endpoint(
cancellable);
}
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target) {
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target, host_id_vector_topology_change pending_endpoints) {
return send_to_endpoint(
std::make_unique<hint_mutation>(std::move(fm_a_s)),
std::move(ermp),
std::move(target),
host_id_vector_topology_change{},
std::move(pending_endpoints),
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),

View File

@@ -647,7 +647,7 @@ public:
// Send a mutation to a specific remote target as a hint.
// Unlike regular mutations during write operations, hints are sent on the streaming connection
// and use different RPC verb.
future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target);
future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target, host_id_vector_topology_change pending_endpoints);
/**
* Performs the truncate operatoin, which effectively deletes all data from

View File

@@ -6406,7 +6406,9 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
auto& table = db.find_column_family(tablet.table);
return table.maybe_split_compaction_group_of(tablet.tablet);
});
co_await utils::get_local_injector().inject("pause_after_streaming_tablet", [] (auto& handler) {
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(1));
});
co_return tablet_operation_result();
});
}

View File

@@ -16,6 +16,7 @@ from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.util import wait_for
from test.cluster.conftest import skip_mode
@@ -321,3 +322,59 @@ async def test_canceling_hint_draining(manager: ManagerClient):
# Make sure draining finishes successfully.
assert await_sync_point(s1, sync_point, 60)
await s1_log.wait_for(f"Removed hint directory for {host_id2}")
@pytest.mark.asyncio
@skip_mode("release", "error injections are not supported in release mode")
async def test_hint_to_pending(manager: ManagerClient):
"""
This test reproduces the scenario where sending a hint to a pending replica is needed
for consistency as in https://github.com/scylladb/scylladb/issues/19835.
In the test, we have 2 servers and a table with RF=1. One server is stopped, and we
perform a write generating a hint to it. Then, we start the stopped server again and
immediately request a tablet migration from that server. The hint is sent after the
tablet migration performs streaming but before it completes. The order of operations
is induced using error injections.
At the end, we verify that the hint was successfully applied.
"""
servers = await manager.servers_add(2, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
])
cql = await manager.get_cql_exclusive(servers[0])
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
replica = (await get_tablet_replicas(manager, servers[0], ks, "t", 0))[0]
host_ids = [await manager.get_host_id(server.server_id) for server in servers]
if replica[0] != host_ids[1]:
# We'll use server 0 as the source of the hint, so the tablet replica needs to be on server 1
await manager.api.move_tablet(servers[0].ip_addr, ks, "t", replica[0], replica[1], host_ids[1], 0, 0)
await manager.server_stop_gracefully(servers[1].server_id)
await manager.others_not_see_server(servers[1].ip_addr)
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES (0, 0)", consistency_level=ConsistencyLevel.ANY))
await manager.api.enable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay", False)
await manager.server_start(servers[1].server_id)
sync_point = create_sync_point(servers[0])
await manager.api.enable_injection(servers[0].ip_addr, "pause_after_streaming_tablet", False)
tablet_migration = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "t", host_ids[1], 0, host_ids[0], 0, 0))
async def migration_reached_streaming():
stages = await cql.run_async(f"SELECT stage FROM system.tablets WHERE keyspace_name='{ks}' ALLOW FILTERING")
logger.info(f"Current stages: {[row.stage for row in stages]}")
return set(["streaming"]) == set([row.stage for row in stages]) or None
await wait_for(migration_reached_streaming, time.time() + 60)
await manager.api.disable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay")
assert await_sync_point(servers[0], sync_point, 30)
await manager.api.message_injection(servers[0].ip_addr, "pause_after_streaming_tablet")
await asyncio.wait([tablet_migration])
assert list(await cql.run_async(f"SELECT v FROM {table} WHERE pk = 0")) == [(0,)]