Merge 'Synchronize tablet split and load-and-stream' from Raphael Raph Carvalho
Load-and-stream is broken when running concurrently to the finalization step of tablet split. Consider this: 1) split starts 2) split finalization executes barrier and succeed 3) load-and-stream runs now, starts writing sstable (pre-split) 4) split finalization publishes changes to tablet metadata 5) load-and-stream finishes writing sstable 6) sstable cannot be loaded since it spans two tablets two possible fixes (maybe both): 1) load-and-stream awaits for topology to quiesce 2) perform split compaction on sstable that spans both sibling tablets This patch implements # 1. By awaiting for topology to quiesce, we guarantee that load-and-stream only starts when there's no chance coordinator is handling some topology operation like split finalization. Fixes https://github.com/scylladb/scylladb/issues/26455. Closes scylladb/scylladb#26456 * github.com:scylladb/scylladb: test: Add reproducer for l-a-s and split synchronization issue sstables_loader: Synchronize tablet split and load-and-stream
This commit is contained in:
2
main.cc
2
main.cc
@@ -2133,7 +2133,7 @@ sharded<locator::shared_token_metadata> token_metadata;
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting sstables loader");
|
||||
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
|
||||
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
|
||||
sst_loader.stop().get();
|
||||
});
|
||||
|
||||
@@ -7224,6 +7224,20 @@ future<> storage_service::await_topology_quiesced() {
|
||||
co_await _topology_state_machine.await_not_busy();
|
||||
}
|
||||
|
||||
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
|
||||
auto holder = _async_gate.hold();
|
||||
|
||||
if (this_shard_id() != 0) {
|
||||
// group0 is only set on shard 0.
|
||||
co_return co_await container().invoke_on(0, [&] (auto& ss) {
|
||||
return ss.verify_topology_quiesced(expected_version);
|
||||
});
|
||||
}
|
||||
|
||||
co_await _group0->group0_server().read_barrier(&_group0_as);
|
||||
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
|
||||
}
|
||||
|
||||
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
|
||||
join_node_request_result result;
|
||||
rtlogger.info("received request to join from host_id: {}", params.host_id);
|
||||
|
||||
@@ -1046,7 +1046,11 @@ public:
|
||||
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
||||
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
||||
future<> set_tablet_balancing_enabled(bool);
|
||||
|
||||
future<> await_topology_quiesced();
|
||||
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
|
||||
// by the caller.
|
||||
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
|
||||
|
||||
// In the maintenance mode, other nodes won't be available thus we disabled joining
|
||||
// the token ring and the token metadata won't be populated with the local node's endpoint.
|
||||
|
||||
@@ -1848,6 +1848,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// of token metadata will complete before we update topology.
|
||||
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
|
||||
|
||||
co_await utils::get_local_injector().inject("tablet_resize_finalization_post_barrier", utils::wait_for_message(std::chrono::minutes(2)));
|
||||
|
||||
auto tm = get_token_metadata_ptr();
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "readers/mutation_fragment_v1_stream.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
#include <cfloat>
|
||||
#include <algorithm>
|
||||
@@ -142,11 +143,12 @@ protected:
|
||||
const unlink_sstables _unlink_sstables;
|
||||
const stream_scope _stream_scope;
|
||||
public:
|
||||
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
||||
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
|
||||
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
||||
: _ms(ms)
|
||||
, _db(db)
|
||||
, _table(db.find_column_family(table_id))
|
||||
, _erm(_table.get_effective_replication_map())
|
||||
, _erm(std::move(erm))
|
||||
, _sstables(std::move(sstables))
|
||||
, _primary_replica_only(primary)
|
||||
, _unlink_sstables(unlink)
|
||||
@@ -181,8 +183,9 @@ private:
|
||||
class tablet_sstable_streamer : public sstable_streamer {
|
||||
const locator::tablet_map& _tablet_map;
|
||||
public:
|
||||
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
||||
: sstable_streamer(ms, db, table_id, std::move(sstables), primary, unlink, scope)
|
||||
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
|
||||
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
||||
: sstable_streamer(ms, db, table_id, std::move(erm), std::move(sstables), primary, unlink, scope)
|
||||
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
|
||||
}
|
||||
|
||||
@@ -526,13 +529,42 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
|
||||
return std::make_unique<sstable_streamer>(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
|
||||
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
|
||||
// of a topology operation that changes the token range boundaries, e.g. split or merge.
|
||||
// Split, for example, first executes the barrier and then splits the tablets.
|
||||
// So it can happen a sstable is generated between those steps and will incorrectly span two
|
||||
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
|
||||
|
||||
locator::effective_replication_map_ptr erm;
|
||||
while (true) {
|
||||
auto& t = _db.local().find_column_family(table_id);
|
||||
erm = t.get_effective_replication_map();
|
||||
auto expected_topology_version = erm->get_token_metadata().get_version();
|
||||
auto& ss = _ss.local();
|
||||
|
||||
// optimistically attempt to grab an erm on quiesced topology
|
||||
// The awaiting is only needed with tablet over raft, so we're bypassing the check
|
||||
// when raft is disabled.
|
||||
if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) {
|
||||
break;
|
||||
}
|
||||
erm = nullptr;
|
||||
co_await _ss.local().await_topology_quiesced();
|
||||
}
|
||||
|
||||
co_return std::move(erm);
|
||||
}
|
||||
|
||||
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary, bool unlink, stream_scope scope,
|
||||
shared_ptr<stream_progress> progress) {
|
||||
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
|
||||
// throughout its lifetime.
|
||||
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
|
||||
|
||||
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
|
||||
_messaging, _db.local(), table_id, std::move(sstables),
|
||||
_messaging, _db.local(), table_id, std::move(erm), std::move(sstables),
|
||||
primary_replica_only(primary), unlink_sstables(unlink), scope);
|
||||
|
||||
co_await streamer->stream(progress);
|
||||
@@ -749,6 +781,7 @@ future<> sstables_loader::download_task_impl::run() {
|
||||
}
|
||||
|
||||
sstables_loader::sstables_loader(sharded<replica::database>& db,
|
||||
sharded<service::storage_service>& ss,
|
||||
netw::messaging_service& messaging,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
@@ -756,6 +789,7 @@ sstables_loader::sstables_loader(sharded<replica::database>& db,
|
||||
sstables::storage_manager& sstm,
|
||||
seastar::scheduling_group sg)
|
||||
: _db(db)
|
||||
, _ss(ss)
|
||||
, _messaging(messaging)
|
||||
, _view_builder(vb)
|
||||
, _view_building_worker(vbw)
|
||||
|
||||
@@ -29,6 +29,12 @@ class view_builder;
|
||||
class view_building_worker;
|
||||
}
|
||||
}
|
||||
namespace service {
|
||||
class storage_service;
|
||||
}
|
||||
namespace locator {
|
||||
class effective_replication_map;
|
||||
}
|
||||
|
||||
struct stream_progress {
|
||||
float total = 0.;
|
||||
@@ -66,6 +72,7 @@ public:
|
||||
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
sharded<service::storage_service>& _ss;
|
||||
netw::messaging_service& _messaging;
|
||||
sharded<db::view::view_builder>& _view_builder;
|
||||
sharded<db::view::view_building_worker>& _view_building_worker;
|
||||
@@ -86,8 +93,10 @@ private:
|
||||
bool primary_replica_only, bool unlink_sstables, stream_scope scope,
|
||||
shared_ptr<stream_progress> progress);
|
||||
|
||||
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
|
||||
public:
|
||||
sstables_loader(sharded<replica::database>& db,
|
||||
sharded<service::storage_service>& ss,
|
||||
netw::messaging_service& messaging,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
|
||||
@@ -1703,3 +1703,94 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
||||
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
||||
await asyncio.sleep(.1)
|
||||
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
|
||||
@pytest.mark.parametrize("primary_replica_only", [False, True])
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
|
||||
logger.info("Bootstrapping cluster")
|
||||
cmdline = [
|
||||
'--logger-log-level', 'storage_service=debug',
|
||||
'--logger-log-level', 'table=debug',
|
||||
'--smp', '1',
|
||||
]
|
||||
servers = [await manager.server_add(config={
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}, cmdline=cmdline)]
|
||||
server = servers[0]
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
initial_tablets = 1
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
||||
|
||||
keys = range(100)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check(ks_name: str):
|
||||
logger.info("Checking table")
|
||||
cql = manager.get_cql()
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
||||
await check(ks)
|
||||
|
||||
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
||||
|
||||
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
|
||||
|
||||
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
||||
logger.info(f"Table dir: {table_dir}")
|
||||
|
||||
def move_sstables_to_upload(table_dir: str):
|
||||
logger.info("Moving sstables to upload dir")
|
||||
table_upload_dir = os.path.join(table_dir, "upload")
|
||||
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
||||
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
||||
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
|
||||
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
||||
os.rename(src_path, dst_path)
|
||||
|
||||
move_sstables_to_upload(table_dir)
|
||||
|
||||
await manager.server_start(servers[0].server_id)
|
||||
cql = manager.get_cql()
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
||||
assert len(rows) == 0
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
|
||||
|
||||
s1_log = await manager.server_open_log(servers[0].server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
|
||||
|
||||
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
|
||||
|
||||
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
|
||||
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
|
||||
|
||||
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
|
||||
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
||||
|
||||
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
|
||||
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
|
||||
|
||||
await load_and_stream_task
|
||||
|
||||
await check(ks)
|
||||
|
||||
Reference in New Issue
Block a user