tablets: Fix race between repair and split

Consider the following:

T
0   split prepare starts
1                               repair starts
2   split prepare finishes
3                               repair adds unsplit sstables
4                               repair ends
5   split executes

If repair produces sstable after split prepare phase, the replica
will not split that sstable later, as prepare phase is considered
completed already. That causes split execution to fail as replicas
weren't really prepared. This also can be triggered with
load-and-stream which shares the same write (consumer) path.

The approach to fix this is the same employed to prevent a race
between split and migration. If migration happens during prepare
phase, it can happen source misses the split request, but the
tablet will still be split on the destination (if needed).
Similarly, the repair writer becomes responsible for splitting
the data if underlying table is in split mode. That's implemented
in replica::table for correctness, so if node crashes, the new
sstable missing split is still split before added to the set.

Fixes #19378.
Fixes #19416.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 74612ad358)
This commit is contained in:
Raphael S. Carvalho
2024-06-21 15:22:54 -03:00
parent b8099b9e83
commit c67967b65a
5 changed files with 119 additions and 6 deletions

View File

@@ -321,6 +321,7 @@ public:
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups() = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
};

View File

@@ -580,6 +580,11 @@ public:
// be split once it returns.
future<> maybe_split_compaction_group_of(locator::tablet_id);
private:
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// NOTE: it must only be used on new SSTables that weren't added to the set yet.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable&);
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
// each tablet split into two, so this replica will remap all of its compaction groups
// that were previously split.
@@ -619,6 +624,7 @@ private:
return _config.enable_cache && _schema->caching_options().enabled();
}
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction);
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);

View File

@@ -675,6 +675,9 @@ public:
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups() override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
}
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
return get_compaction_group().make_sstable_set();
@@ -774,6 +777,7 @@ public:
bool all_storage_groups_split() override;
future<> split_all_storage_groups() override;
future<> maybe_split_compaction_group_of(size_t idx) override;
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
@@ -856,8 +860,14 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) {
co_return;
}
if (_main_cg->empty()) {
co_return;
}
auto holder = _main_cg->async_gate().hold();
co_await _main_cg->flush();
co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt));
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
co_await _main_cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{});
co_await _main_cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{});
}
lw_shared_ptr<const sstables::sstable_set> storage_group::make_sstable_set() const {
@@ -933,11 +943,27 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
return sg->split(split_compaction_options());
}
future<std::vector<sstables::shared_sstable>>
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
if (!tablet_map().needs_split()) {
co_return std::vector<sstables::shared_sstable>{sst};
}
auto& cg = compaction_group_for_sstable(sst);
auto holder = cg.async_gate().hold();
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.as_table_state(), split_compaction_options());
}
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
auto holder = async_gate().hold();
co_await _sg_manager->maybe_split_compaction_group_of(tablet_id.value());
}
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
auto holder = async_gate().hold();
co_return co_await _sg_manager->maybe_split_sstable(sst);
}
std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
std::unique_ptr<storage_group_manager> ret;
if (uses_tablets()) {
@@ -1125,11 +1151,8 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
}
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
compaction_group& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
bool trigger_compaction) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
@@ -1146,6 +1169,16 @@ table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::o
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}));
}
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
}
}
future<>
table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
bool do_trigger_compaction = offstrategy == sstables::offstrategy::no;

View File

@@ -2466,6 +2466,9 @@ future<bool> topology_coordinator::maybe_start_tablet_split_finalization(group0_
if (plan.finalize_resize.empty()) {
co_return false;
}
if (utils::get_local_injector().enter("tablet_split_finalization_postpone")) {
co_return false;
}
std::vector<canonical_mutation> updates;

View File

@@ -447,6 +447,76 @@ async def test_tablet_repair(manager: ManagerClient):
for r in rows:
assert r.c == repair_cycles - 1
# Reproducer for race between split and repair: https://github.com/scylladb/scylladb/issues/19378
# Verifies repair will not complete with sstables that still require split, causing split
# execution to fail.
@pytest.mark.repair
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_concurrent_tablet_repair_and_split(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'raft_topology=debug',
'--target-tablet-size-in-bytes', '1024',
]
servers = await manager.servers_add(3, cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 2} AND tablets = {'initial': 32};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(5000) # Enough keys to trigger repair digest mismatch with a high chance.
stmt = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ONE
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
await asyncio.gather(*[cql.run_async(stmt, [k, -1]) for k in keys])
# split decision is sstable size based, so data must be flushed first
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, "test")
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", False)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Waiting for split prepare...")
await s0_log.wait_for('Setting split ready sequence number to', from_mark=s0_mark)
s0_mark = await s0_log.mark()
logger.info("Waited for split prepare")
# Balancer is re-enabled later for split execution
await asyncio.create_task(manager.api.disable_tablet_balancing(servers[0].ip_addr))
# Write concurrently with repair to increase the chance of repair having some discrepancy to resolve and send writes.
inserts_future = asyncio.gather(*[cql.run_async(stmt, [k, 1]) for k in keys])
await repair_on_node(manager, servers[0], servers)
await inserts_future
logger.info("Waiting for split execute...")
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await s0_log.wait_for('Detected tablet split for table', from_mark=s0_mark)
await inject_error_one_shot_on(manager, "tablet_split_finalization_postpone", servers)
logger.info("Waited for split execute...")
key_count = len(keys)
stmt = cql.prepare("SELECT * FROM test.test;")
stmt.consistency_level = ConsistencyLevel.ALL
rows = await cql.run_async(stmt)
assert len(rows) == key_count
@pytest.mark.repair
@pytest.mark.asyncio