Merge 'Synchronize tablet split and load-and-stream' from Raphael Raph Carvalho

Load-and-stream is broken when running concurrently to the finalization step of tablet split.

Consider this:
1) split starts
2) split finalization executes barrier and succeed
3) load-and-stream runs now, starts writing sstable (pre-split)
4) split finalization publishes changes to tablet metadata
5) load-and-stream finishes writing sstable
6) sstable cannot be loaded since it spans two tablets

two possible fixes (maybe both):

1) load-and-stream awaits for topology to quiesce
2) perform split compaction on sstable that spans both sibling tablets

This patch implements # 1. By awaiting for topology to quiesce,
we guarantee that load-and-stream only starts when there's no
chance coordinator is handling some topology operation like
split finalization.

Fixes https://github.com/scylladb/scylladb/issues/26455.

Closes scylladb/scylladb#26456

* github.com:scylladb/scylladb:
  test: Add reproducer for l-a-s and split synchronization issue
  sstables_loader: Synchronize tablet split and load-and-stream
This commit is contained in:
Botond Dénes
2025-10-21 09:43:38 +03:00
7 changed files with 160 additions and 6 deletions

View File

@@ -2133,7 +2133,7 @@ sharded<locator::shared_token_metadata> token_metadata;
});
checkpoint(stop_signal, "starting sstables loader");
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
sst_loader.stop().get();
});

View File

@@ -7224,6 +7224,20 @@ future<> storage_service::await_topology_quiesced() {
co_await _topology_state_machine.await_not_busy();
}
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.verify_topology_quiesced(expected_version);
});
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
rtlogger.info("received request to join from host_id: {}", params.host_id);

View File

@@ -1046,7 +1046,11 @@ public:
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> set_tablet_balancing_enabled(bool);
future<> await_topology_quiesced();
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
// by the caller.
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
// In the maintenance mode, other nodes won't be available thus we disabled joining
// the token ring and the token metadata won't be populated with the local node's endpoint.

View File

@@ -1848,6 +1848,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
co_await utils::get_local_injector().inject("tablet_resize_finalization_post_barrier", utils::wait_for_message(std::chrono::minutes(2)));
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());

View File

@@ -27,6 +27,7 @@
#include "readers/mutation_fragment_v1_stream.hh"
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include <cfloat>
#include <algorithm>
@@ -142,11 +143,12 @@ protected:
const unlink_sstables _unlink_sstables;
const stream_scope _stream_scope;
public:
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
: _ms(ms)
, _db(db)
, _table(db.find_column_family(table_id))
, _erm(_table.get_effective_replication_map())
, _erm(std::move(erm))
, _sstables(std::move(sstables))
, _primary_replica_only(primary)
, _unlink_sstables(unlink)
@@ -181,8 +183,9 @@ private:
class tablet_sstable_streamer : public sstable_streamer {
const locator::tablet_map& _tablet_map;
public:
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
: sstable_streamer(ms, db, table_id, std::move(sstables), primary, unlink, scope)
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
: sstable_streamer(ms, db, table_id, std::move(erm), std::move(sstables), primary, unlink, scope)
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
}
@@ -526,13 +529,42 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
return std::make_unique<sstable_streamer>(std::forward<Args>(args)...);
}
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
// of a topology operation that changes the token range boundaries, e.g. split or merge.
// Split, for example, first executes the barrier and then splits the tablets.
// So it can happen a sstable is generated between those steps and will incorrectly span two
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
locator::effective_replication_map_ptr erm;
while (true) {
auto& t = _db.local().find_column_family(table_id);
erm = t.get_effective_replication_map();
auto expected_topology_version = erm->get_token_metadata().get_version();
auto& ss = _ss.local();
// optimistically attempt to grab an erm on quiesced topology
// The awaiting is only needed with tablet over raft, so we're bypassing the check
// when raft is disabled.
if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) {
break;
}
erm = nullptr;
co_await _ss.local().await_topology_quiesced();
}
co_return std::move(erm);
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink, stream_scope scope,
shared_ptr<stream_progress> progress) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(sstables),
_messaging, _db.local(), table_id, std::move(erm), std::move(sstables),
primary_replica_only(primary), unlink_sstables(unlink), scope);
co_await streamer->stream(progress);
@@ -749,6 +781,7 @@ future<> sstables_loader::download_task_impl::run() {
}
sstables_loader::sstables_loader(sharded<replica::database>& db,
sharded<service::storage_service>& ss,
netw::messaging_service& messaging,
sharded<db::view::view_builder>& vb,
sharded<db::view::view_building_worker>& vbw,
@@ -756,6 +789,7 @@ sstables_loader::sstables_loader(sharded<replica::database>& db,
sstables::storage_manager& sstm,
seastar::scheduling_group sg)
: _db(db)
, _ss(ss)
, _messaging(messaging)
, _view_builder(vb)
, _view_building_worker(vbw)

View File

@@ -29,6 +29,12 @@ class view_builder;
class view_building_worker;
}
}
namespace service {
class storage_service;
}
namespace locator {
class effective_replication_map;
}
struct stream_progress {
float total = 0.;
@@ -66,6 +72,7 @@ public:
private:
sharded<replica::database>& _db;
sharded<service::storage_service>& _ss;
netw::messaging_service& _messaging;
sharded<db::view::view_builder>& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
@@ -86,8 +93,10 @@ private:
bool primary_replica_only, bool unlink_sstables, stream_scope scope,
shared_ptr<stream_progress> progress);
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
public:
sstables_loader(sharded<replica::database>& db,
sharded<service::storage_service>& ss,
netw::messaging_service& messaging,
sharded<db::view::view_builder>& vb,
sharded<db::view::view_building_worker>& vbw,

View File

@@ -1703,3 +1703,94 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await asyncio.sleep(.1)
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
@pytest.mark.parametrize("primary_replica_only", [False, True])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--smp', '1',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
server = servers[0]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check(ks_name: str):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await check(ks)
node_workdir = await manager.server_get_workdir(servers[0].server_id)
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
def move_sstables_to_upload(table_dir: str):
logger.info("Moving sstables to upload dir")
table_upload_dir = os.path.join(table_dir, "upload")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
move_sstables_to_upload(table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
await load_and_stream_task
await check(ks)