sstables_loader: Don't bypass synchronization with busy topology

The patch c543059f86 fixed the synchronization issue between tablet
split and load-and-stream. The synchronization worked only with
raft topology, and therefore was disabled with gossip.
To do the check, storage_service::raft_topology_change_enabled()
but the topology kind is only available/set on shard 0, so it caused
the synchronization to be bypassed when load-and-stream runs on
any shard other than 0.

The reason the reproducer didn't catch it is that it was restricted
to single cpu. It will now run with multi cpu and catch the
problem observed.

Fixes #22707

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes scylladb/scylladb#26730

(cherry picked from commit 7f34366b9d)
This commit is contained in:
Raphael S. Carvalho
2025-10-27 20:34:25 -03:00
committed by GitHub Action
parent 4d3e896eae
commit 4c466ace4f
4 changed files with 262 additions and 6 deletions

View File

@@ -7890,6 +7890,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) {
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
}
bool storage_service::raft_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool storage_service::legacy_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
_protocol_servers.push_back(&server);
if (start_instantly) {

View File

@@ -840,12 +840,8 @@ private:
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
public:
bool raft_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool legacy_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
bool raft_topology_change_enabled() const;
bool legacy_topology_change_enabled() const;
private:
future<> _raft_state_monitor = make_ready_future<>();

View File

@@ -512,6 +512,67 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
return std::make_unique<sstable_streamer>(std::forward<Args>(args)...);
}
<<<<<<< HEAD
||||||| parent of 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology)
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
// of a topology operation that changes the token range boundaries, e.g. split or merge.
// Split, for example, first executes the barrier and then splits the tablets.
// So it can happen a sstable is generated between those steps and will incorrectly span two
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
locator::effective_replication_map_ptr erm;
while (true) {
auto& t = _db.local().find_column_family(table_id);
erm = t.get_effective_replication_map();
auto expected_topology_version = erm->get_token_metadata().get_version();
auto& ss = _ss.local();
// optimistically attempt to grab an erm on quiesced topology
// The awaiting is only needed with tablet over raft, so we're bypassing the check
// when raft is disabled.
if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) {
break;
}
erm = nullptr;
co_await _ss.local().await_topology_quiesced();
}
co_return std::move(erm);
}
=======
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
// of a topology operation that changes the token range boundaries, e.g. split or merge.
// Split, for example, first executes the barrier and then splits the tablets.
// So it can happen a sstable is generated between those steps and will incorrectly span two
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
locator::effective_replication_map_ptr erm;
while (true) {
auto& t = _db.local().find_column_family(table_id);
erm = t.get_effective_replication_map();
auto expected_topology_version = erm->get_token_metadata().get_version();
auto& ss = _ss.local();
// The awaiting only works with raft enabled, and we only need it with tablets,
// so let's bypass the awaiting when tablet is disabled.
if (!t.uses_tablets()) {
break;
}
// optimistically attempt to grab an erm on quiesced topology
if (co_await ss.verify_topology_quiesced(expected_topology_version)) {
break;
}
erm = nullptr;
co_await _ss.local().await_topology_quiesced();
}
co_return std::move(erm);
}
>>>>>>> 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology)
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink, stream_scope scope,
shared_ptr<stream_progress> progress) {

View File

@@ -1974,3 +1974,188 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await asyncio.sleep(.1)
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
<<<<<<< HEAD
||||||| parent of 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology)
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
@pytest.mark.parametrize("primary_replica_only", [False, True])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--smp', '1',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
server = servers[0]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check(ks_name: str):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await check(ks)
node_workdir = await manager.server_get_workdir(servers[0].server_id)
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
def move_sstables_to_upload(table_dir: str):
logger.info("Moving sstables to upload dir")
table_upload_dir = os.path.join(table_dir, "upload")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
move_sstables_to_upload(table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
await load_and_stream_task
await check(ks)
=======
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
@pytest.mark.parametrize("primary_replica_only", [False, True])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
server = servers[0]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check(ks_name: str):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await check(ks)
node_workdir = await manager.server_get_workdir(servers[0].server_id)
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
def move_sstables_to_upload(table_dir: str):
logger.info("Moving sstables to upload dir")
table_upload_dir = os.path.join(table_dir, "upload")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
move_sstables_to_upload(table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
await load_and_stream_task
await check(ks)
>>>>>>> 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology)